Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
evlekht committed Oct 15, 2024
1 parent 7cd1a0d commit 90156dc
Showing 1 changed file with 39 additions and 53 deletions.
92 changes: 39 additions & 53 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ func TestScheduler_Start(t *testing.T) {
ctrl := gomock.NewController(t)
storage := NewMockStorage(ctrl)
epsilon := time.Millisecond
timeout := 100 * time.Millisecond
timeout := 10000 * time.Millisecond

earlyJobExecuted := make(chan string)
nowJobExecuted := make(chan string)
lateJobExecuted := make(chan string)

earlyJob := &Job{
earlyJob := Job{
Name: "early_job",
ExecuteAt: clock.Now().Add(-1),
Period: 13,
Period: 113,
}
nowJob := &Job{
nowJob := Job{
Name: "now_job",
ExecuteAt: clock.Now(),
Period: 17,
Period: 117,
}
lateJob := &Job{
lateJob := Job{
Name: "late_job",
ExecuteAt: clock.Now().Add(1),
Period: 19,
Period: 119,
}
jobs := []*Job{earlyJob, nowJob, lateJob}
jobs := []*Job{&earlyJob, &nowJob, &lateJob}
jobsExecChansMap := map[string]chan string{
earlyJob.Name: earlyJobExecuted,
nowJob.Name: nowJobExecuted,
Expand All @@ -67,7 +67,7 @@ func TestScheduler_Start(t *testing.T) {

type executionStep struct {
time time.Time
jobs []*Job
jobs []Job
}
executionSequence := []executionStep{}

Expand All @@ -85,7 +85,7 @@ func TestScheduler_Start(t *testing.T) {

for i := 0; i < numberOfFullCycles; i++ {
for _, originalJob := range jobs {
currentJob := &Job{
currentJob := Job{
Name: originalJob.Name,
ExecuteAt: originalJob.ExecuteAt.Add(originalJob.Period * time.Duration(i)),
Period: originalJob.Period,
Expand All @@ -100,7 +100,7 @@ func TestScheduler_Start(t *testing.T) {
if len(executionSequence) == 0 || executionSequence[len(executionSequence)-1].time != currentJob.ExecuteAt {
executionSequence = append(executionSequence, executionStep{
time: currentJob.ExecuteAt,
jobs: []*Job{currentJob},
jobs: []Job{currentJob},
})
} else {
executionSequence[len(executionSequence)-1].jobs = append(executionSequence[len(executionSequence)-1].jobs, currentJob)
Expand All @@ -111,11 +111,7 @@ func TestScheduler_Start(t *testing.T) {

storageSession := &dummySession{}
storage.EXPECT().NewSession(ctx).Return(storageSession, nil)
storage.EXPECT().GetJobByName(ctx, storageSession, currentJob.Name).Return(&Job{ // we copy it, because execution will modify it
Name: currentJob.Name,
ExecuteAt: currentJob.ExecuteAt,
Period: currentJob.Period,
}, nil)
storage.EXPECT().GetJobByName(ctx, storageSession, currentJob.Name).Return(&currentJob, nil)
storage.EXPECT().UpsertJob(ctx, storageSession, newJob).Return(nil)
storage.EXPECT().Commit(storageSession).Return(nil)
storage.EXPECT().Abort(storageSession)
Expand Down Expand Up @@ -152,26 +148,44 @@ func TestScheduler_Start(t *testing.T) {
fmt.Printf(", now is %d\n", clock.Now().UnixNano())
require.Equal(step.time, clock.Now())

jobsExecChans := []chan string{}
for _, job := range step.jobs {
jobsExecChans = append(jobsExecChans, jobsExecChansMap[job.Name])
notExecutedJobNames := make([]string, len(step.jobs))
for i, job := range step.jobs {
notExecutedJobNames[i] = job.Name
}
// TODO@ sometimes fails, unexpected mock calls. Why?

requireJobsToBeExecuted(t, clock.Now(), step.jobs, jobsExecChans, timeout)
// setting up select cases for jobs execution
jobExecuteSelectCases := make([]reflect.SelectCase, len(step.jobs)+1)
jobExecuteSelectCases[0] = reflect.SelectCase{Dir: reflect.SelectRecv} // no chan is set yet
for i, job := range step.jobs {
jobExecuteSelectCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(jobsExecChansMap[job.Name])}
}

for jobIndex := 0; jobIndex < len(step.jobs); jobIndex++ {
// select over jobs execution channels and timeout
jobExecuteSelectCases[0].Chan = reflect.ValueOf(time.After(timeout))
selectIndex, _, _ := reflect.Select(jobExecuteSelectCases)
// TODO@ sometimes fails, job not executed within timeout, might be related to timer test failing issue
require.NotZerof(selectIndex, "some jobs wasn't executed within timeout (%v)", notExecutedJobNames)

jobIndex := selectIndex - 1

require.Equalf(clock.Now(), step.jobs[jobIndex].ExecuteAt,
"wrong job execution time: expected %s (%d) to be executed at %d, but got at %d",
step.jobs[jobIndex].Name, step.jobs[jobIndex].Period, step.jobs[jobIndex].ExecuteAt.UnixNano(), clock.Now().UnixNano())

for _, job := range step.jobs {
job.ExecuteAt = clock.Now().Add(job.Period)
notExecutedJobNames = slices.DeleteFunc(notExecutedJobNames, func(jobName string) bool {
return step.jobs[jobIndex].Name == jobName
})
}

require.Equal(step.time, clock.Now())
}

require.NoError(sch.Stop())
time.Sleep(0)

for _, timer := range sch.timers {
for key, timer := range sch.timers {
require.True(timer.IsStopped())
fmt.Printf("Timer %s is stopped\n", key)
}
}

Expand Down Expand Up @@ -213,34 +227,6 @@ func TestScheduler_RegisterJobHandler(t *testing.T) {
checkJobHandlerRegistered(sch, jobName2)
}

func requireJobsToBeExecuted(t *testing.T, execTime time.Time, jobs []*Job, executed []chan string, timeout time.Duration) { //nolint:unparam
t.Helper()

require.Len(t, jobs, len(executed))

notExecutedJobNames := make([]string, len(jobs))
for i, job := range jobs {
notExecutedJobNames[i] = job.Name
}

for i := 0; i < len(jobs); i++ {
cases := make([]reflect.SelectCase, len(executed)+1)
for i, ch := range executed {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
cases[len(executed)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))}

if i, _, _ := reflect.Select(cases); i == len(executed) {
require.FailNowf(t, "some jobs wasn't executed within timeout", "some of %v", notExecutedJobNames)
}
require.Equal(t, execTime, jobs[i].ExecuteAt, "expected %s (%d) to be executed at %d, but got at %d",
jobs[i].Name, jobs[i].Period, jobs[i].ExecuteAt.UnixNano(), execTime.UnixNano())
notExecutedJobNames = slices.DeleteFunc(notExecutedJobNames, func(jobName string) bool {
return jobs[i].Name == jobName
})
}
}

type dummySession struct{}

func (d *dummySession) Commit() error {
Expand Down

0 comments on commit 90156dc

Please sign in to comment.