Skip to content

Commit

Permalink
Get job run API (#281)
Browse files Browse the repository at this point in the history
* fix: get job run from Optimus DB (#278)

* fix: get job run from Optimus DB

* fix: add tests

* fix: lint

* fix: typo (#280)
  • Loading branch information
Mryashbhardwaj authored Oct 17, 2024
1 parent 51a43a2 commit 541691f
Show file tree
Hide file tree
Showing 12 changed files with 1,112 additions and 234 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "ca87ce40849dfc2a7a73ba9d2e0e9084e33dadeb"
PROTON_COMMIT := "7bfbb3b40583db73a7f891e2b7db61931ff2c6a5"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
49 changes: 47 additions & 2 deletions core/scheduler/handler/v1beta1/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/interval"
"github.com/goto/optimus/internal/utils/filter"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

type JobRunService interface {
JobRunInput(context.Context, tenant.ProjectName, scheduler.JobName, scheduler.RunConfig) (*scheduler.ExecutorInput, error)
UpdateJobState(context.Context, *scheduler.Event) error
GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, filters ...filter.FilterOpt) ([]*scheduler.JobRun, error)
GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error)
UploadToScheduler(ctx context.Context, projectName tenant.ProjectName) error
GetInterval(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, referenceTime time.Time) (interval.Interval, error)
Expand Down Expand Up @@ -80,8 +82,51 @@ func (h JobRunHandler) JobRunInput(ctx context.Context, req *pb.JobRunInputReque
}, nil
}

// JobRun currently gets the job runs from scheduler based on the criteria
// TODO: later should collect the job runs from optimus
// GetJobRuns gets job runs from optimus DB based on the criteria
func (h JobRunHandler) GetJobRuns(ctx context.Context, req *pb.GetJobRunsRequest) (*pb.GetJobRunsResponse, error) {
projectName, err := tenant.ProjectNameFrom(req.GetProjectName())
if err != nil {
h.l.Error("error adapting project name [%s]: %s", req.GetProjectName(), err)
return nil, errors.GRPCErr(err, "unable to get job run for "+req.GetJobName())
}

jobName, err := scheduler.JobNameFrom(req.GetJobName())
if err != nil {
h.l.Error("error adapting job name [%s]: %s", req.GetJobName(), err)
return nil, errors.GRPCErr(err, "unable to get job run for "+req.GetJobName())
}

if len(req.GetState()) > 0 {
_, err := scheduler.StateFromString(req.GetState())
if err != nil {
h.l.Error("error adapting job run state [%s]: %s", req.GetState(), err)
return nil, errors.GRPCErr(err, "invalid job run state: "+req.GetState())
}
}

var jobRuns []*scheduler.JobRun
jobRuns, err = h.service.GetJobRunsByFilter(ctx, projectName, jobName,
filter.WithString(filter.RunState, req.GetState()),
filter.WithTime(filter.StartDate, req.GetSince().AsTime()),
filter.WithTime(filter.EndDate, req.GetUntil().AsTime()),
)
if err != nil {
h.l.Error("error getting job runs: %s", err)
return nil, errors.GRPCErr(err, "unable to get job run for "+req.GetJobName())
}

var runs []*pb.JobRun
for _, run := range jobRuns {
ts := timestamppb.New(run.ScheduledAt)
runs = append(runs, &pb.JobRun{
State: run.State.String(),
ScheduledAt: ts,
})
}
return &pb.GetJobRunsResponse{JobRuns: runs}, nil
}

// JobRun gets the job runs from scheduler based on the criteria
func (h JobRunHandler) JobRun(ctx context.Context, req *pb.JobRunRequest) (*pb.JobRunResponse, error) {
projectName, err := tenant.ProjectNameFrom(req.GetProjectName())
if err != nil {
Expand Down
112 changes: 112 additions & 0 deletions core/scheduler/handler/v1beta1/job_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/lib/interval"
"github.com/goto/optimus/internal/lib/window"
"github.com/goto/optimus/internal/utils/filter"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

Expand All @@ -31,6 +32,112 @@ func TestJobRunHandler(t *testing.T) {
projectName := "a-data-proj"
jobName := "a-job-name"

t.Run("GetJobRun", func(t *testing.T) {
t.Run("should return error if project name is invalid", func(t *testing.T) {
jobRunHandler := v1beta1.NewJobRunHandler(logger, nil, nil)
req := &pb.GetJobRunsRequest{
ProjectName: "",
JobName: "job1",
Since: timestamppb.Now(),
Until: timestamppb.Now(),
State: "success",
}
resp, err := jobRunHandler.GetJobRuns(ctx, req)
assert.NotNil(t, err)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = invalid argument for entity project: project name is empty: unable to get job run for job1")
assert.Nil(t, resp)
})

t.Run("should return error if job name is invalid", func(t *testing.T) {
jobRunHandler := v1beta1.NewJobRunHandler(logger, nil, nil)
req := &pb.GetJobRunsRequest{
ProjectName: "proj",
JobName: "",
Since: timestamppb.Now(),
Until: timestamppb.Now(),
State: "success",
}
resp, err := jobRunHandler.GetJobRuns(ctx, req)
assert.NotNil(t, err)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = invalid argument for entity jobRun: job name is empty: unable to get job run for ")
assert.Nil(t, resp)
})

t.Run("should return error if state is invalid", func(t *testing.T) {
jobRunHandler := v1beta1.NewJobRunHandler(logger, nil, nil)
req := &pb.GetJobRunsRequest{
ProjectName: "proj",
JobName: "job1",
Since: timestamppb.Now(),
Until: timestamppb.Now(),
State: "invalid_state",
}
resp, err := jobRunHandler.GetJobRuns(ctx, req)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "invalid job run state: invalid_state")
assert.Nil(t, resp)
})

t.Run("should return job runs successfully", func(t *testing.T) {
jobRuns := []*scheduler.JobRun{
{
State: scheduler.StateSuccess,
ScheduledAt: time.Now(),
},
}
jobRunService := new(mockJobRunService)
jobRunService.On("GetJobRunsByFilter", ctx, tenant.ProjectName("proj"), scheduler.JobName("job1"), mock.Anything).
Return(jobRuns, nil)
defer jobRunService.AssertExpectations(t)

jobRunHandler := v1beta1.NewJobRunHandler(logger, jobRunService, nil)
req := &pb.GetJobRunsRequest{
ProjectName: "proj",
JobName: "job1",
Since: timestamppb.Now(),
Until: timestamppb.Now(),
State: "success",
}
resp, err := jobRunHandler.GetJobRuns(ctx, req)
assert.Nil(t, err)
assert.Equal(t, len(jobRuns), len(resp.JobRuns))
for _, expectedRun := range jobRuns {
var found bool
for _, respRun := range resp.JobRuns {
if expectedRun.ScheduledAt.Equal(respRun.ScheduledAt.AsTime()) &&
expectedRun.State.String() == respRun.State {
found = true
break
}
}
if !found {
assert.Fail(t, fmt.Sprintf("failed to find expected job run %v", expectedRun))
}
}
})

t.Run("should return error if service returns error", func(t *testing.T) {
jobRunService := new(mockJobRunService)
var jobRuns []*scheduler.JobRun
jobRunService.On("GetJobRunsByFilter", ctx, tenant.ProjectName("proj"), scheduler.JobName("job1"), mock.Anything).
Return(jobRuns, fmt.Errorf("service error"))
defer jobRunService.AssertExpectations(t)

jobRunHandler := v1beta1.NewJobRunHandler(logger, jobRunService, nil)
req := &pb.GetJobRunsRequest{
ProjectName: "proj",
JobName: "job1",
Since: timestamppb.Now(),
Until: timestamppb.Now(),
State: "success",
}
resp, err := jobRunHandler.GetJobRuns(ctx, req)
assert.NotNil(t, err)
assert.EqualError(t, err, "rpc error: code = Internal desc = service error: unable to get job run for job1")
assert.Nil(t, resp)
})
})

t.Run("JobRunInput", func(t *testing.T) {
t.Run("returns error when project name is invalid", func(t *testing.T) {
service := new(mockJobRunService)
Expand Down Expand Up @@ -716,6 +823,11 @@ func (m *mockJobRunService) UploadToScheduler(ctx context.Context, projectName t
return args.Error(0)
}

func (m *mockJobRunService) GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, filters ...filter.FilterOpt) ([]*scheduler.JobRun, error) {
args := m.Called(ctx, projectName, jobName, filters)
return args.Get(0).([]*scheduler.JobRun), args.Error(1)
}

func (m *mockJobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error) {
args := m.Called(ctx, projectName, jobName, criteria)
if args.Get(0) == nil {
Expand Down
24 changes: 24 additions & 0 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/goto/optimus/internal/lib/interval"
"github.com/goto/optimus/internal/lib/window"
"github.com/goto/optimus/internal/models"
"github.com/goto/optimus/internal/utils/filter"
)

type metricType string
Expand Down Expand Up @@ -54,6 +55,8 @@ type JobRepository interface {
type JobRunRepository interface {
GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error)
GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error)
GetLatestRun(ctx context.Context, project tenant.ProjectName, name scheduler.JobName, status *scheduler.State) (*scheduler.JobRun, error)
GetRunsByTimeRange(ctx context.Context, project tenant.ProjectName, jobName scheduler.JobName, status *scheduler.State, since, until time.Time) ([]*scheduler.JobRun, error)
GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error)
Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error
Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error
Expand Down Expand Up @@ -146,6 +149,27 @@ func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.Proj
return s.compiler.Compile(ctx, details, config, executedAt)
}

func (s *JobRunService) GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, filters ...filter.FilterOpt) ([]*scheduler.JobRun, error) {
f := filter.NewFilter(filters...)
var runState *scheduler.State
state, err := scheduler.StateFromString(f.GetStringValue(filter.RunState))
if err == nil {
runState = &state
}

if f.Contains(filter.StartDate) && f.Contains(filter.EndDate) {
// get job run by scheduled at between start date and end date, filter by runState if applicable
return s.repo.GetRunsByTimeRange(ctx, projectName, jobName, runState,
f.GetTimeValue(filter.StartDate), f.GetTimeValue(filter.EndDate))
}

jobRun, err := s.repo.GetLatestRun(ctx, projectName, jobName, runState)
if err != nil {
return nil, err
}
return []*scheduler.JobRun{jobRun}, nil
}

func (s *JobRunService) GetJobRuns(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, criteria *scheduler.JobRunsCriteria) ([]*scheduler.JobRunStatus, error) {
jobWithDetails, err := s.jobRepo.GetJobDetails(ctx, projectName, jobName)
if err != nil {
Expand Down
Loading

0 comments on commit 541691f

Please sign in to comment.