Skip to content

Commit

Permalink
Issue 778 (#779)
Browse files Browse the repository at this point in the history
  • Loading branch information
rbroggi authored Sep 19, 2024
1 parent 40b8570 commit 838bd51
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
12 changes: 12 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ type oneTimeJobDefinition struct {
func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
sortedTimes := o.startAt(j)
slices.SortStableFunc(sortedTimes, ascendingTime)
// deduplicate the times
sortedTimes = removeSliceDuplicatesTimeOnSortedSlice(sortedTimes)
// keep only schedules that are in the future
idx, found := slices.BinarySearchFunc(sortedTimes, now, ascendingTime)
if found {
Expand All @@ -472,6 +474,16 @@ func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.T
return nil
}

func removeSliceDuplicatesTimeOnSortedSlice(times []time.Time) []time.Time {
ret := make([]time.Time, 0, len(times))
for i, t := range times {
if i == 0 || t != times[i-1] {
ret = append(ret, t)
}
}
return ret
}

// OneTimeJobStartAtOption defines when the one time job is run
type OneTimeJobStartAtOption func(*internalJob) []time.Time

Expand Down
53 changes: 53 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,59 @@ func TestScheduler_AtTimesJob(t *testing.T) {
},
},
},

{
name: "two runs in the future - order is maintained even if times are provided out of order - deduplication",
atTimes: []time.Time{n.Add(3 * time.Millisecond), n.Add(1 * time.Millisecond), n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)

// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(3*time.Millisecond), nextRunAt)
},

func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(2), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond)

nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit 838bd51

Please sign in to comment.