Skip to content

Commit

Permalink
Merge pull request #23 from go-co-op/scheduler-bug
Browse files Browse the repository at this point in the history
fix bug in scheduler, rescheduling jobs immediately
  • Loading branch information
Streppel authored Apr 10, 2020
2 parents ced6391 + 2874920 commit 602934f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 36 deletions.
10 changes: 1 addition & 9 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Job struct {
fparams map[string][]interface{} // Map for function and params of function
lock bool // lock the Job from running at same time form multiple instances
tags []string // allow the user to tag Jobs with certain labels
time timeHelper // an instance of timeHelper to interact with the time package
}

// NewJob creates a new Job with the provided interval
Expand All @@ -34,19 +33,12 @@ func NewJob(interval uint64) *Job {
funcs: make(map[string]interface{}),
fparams: make(map[string][]interface{}),
tags: []string{},
time: th,
}
}

// shouldRun returns true if the Job should be run now
func (j *Job) shouldRun() bool {
return j.time.Now().Unix() >= j.nextRun.Unix()
}

// Run the Job and immediately reschedule it
func (j *Job) run() {
j.lastRun = j.time.Now()
go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
}

// Err returns an error if one ocurred while creating the Job
Expand Down
60 changes: 40 additions & 20 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) {

// scheduleNextRun Compute the instant when this Job should run next
func (s *Scheduler) scheduleNextRun(j *Job) error {
now := s.time.Now().In(s.loc)
if j.lastRun == s.time.Unix(0, 0) {
j.lastRun = now
}

if j.nextRun.After(now) {
return nil
}
now := s.time.Now(s.loc)

periodDuration, err := j.periodDuration()
if err != nil {
Expand All @@ -89,7 +82,7 @@ func (s *Scheduler) scheduleNextRun(j *Job) error {
j.nextRun = j.lastRun.Add(periodDuration)
case days:
j.nextRun = s.roundToMidnight(j.lastRun)
j.nextRun = j.nextRun.Add(j.atTime)
j.nextRun = j.nextRun.Add(j.atTime).Add(periodDuration)
case weeks:
j.nextRun = s.roundToMidnight(j.lastRun)
dayDiff := int(j.startDay)
Expand Down Expand Up @@ -118,7 +111,7 @@ func (s *Scheduler) getRunnableJobs() []*Job {
var runnableJobs []*Job
sort.Sort(s)
for _, job := range s.jobs {
if job.shouldRun() {
if s.shouldRun(job) {
runnableJobs = append(runnableJobs, job)
} else {
break
Expand All @@ -130,7 +123,7 @@ func (s *Scheduler) getRunnableJobs() []*Job {
// NextRun datetime when the next Job should run.
func (s *Scheduler) NextRun() (*Job, time.Time) {
if len(s.jobs) <= 0 {
return nil, s.time.Now()
return nil, s.time.Now(s.loc)
}
sort.Sort(s)
return s.jobs[0], s.jobs[0].nextRun
Expand All @@ -147,11 +140,21 @@ func (s *Scheduler) Every(interval uint64) *Scheduler {
func (s *Scheduler) RunPending() {
runnableJobs := s.getRunnableJobs()
for _, job := range runnableJobs {
s.runJob(job)
s.runAndReschedule(job) // we should handle this error somehow
}
}

func (s *Scheduler) runJob(job *Job) error {
func (s *Scheduler) runAndReschedule(job *Job) error {
if err := s.run(job); err != nil {
return err
}
if err := s.scheduleNextRun(job); err != nil {
return err
}
return nil
}

func (s *Scheduler) run(job *Job) error {
if job.lock {
if locker == nil {
return fmt.Errorf("trying to lock %s with nil locker", job.jobFunc)
Expand All @@ -161,11 +164,10 @@ func (s *Scheduler) runJob(job *Job) error {
locker.Lock(key)
defer locker.Unlock(key)
}
job.run()
err := s.scheduleNextRun(job)
if err != nil {
return err
}

job.lastRun = s.time.Now(s.loc)
go job.run()

return nil
}

Expand All @@ -177,7 +179,7 @@ func (s *Scheduler) RunAll() {
// RunAllWithDelay runs all Jobs with delay seconds
func (s *Scheduler) RunAllWithDelay(d int) {
for _, job := range s.jobs {
err := s.runJob(job)
err := s.run(job)
if err != nil {
continue
}
Expand Down Expand Up @@ -253,6 +255,19 @@ func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error)
j.jobFunc = fname

if !j.startsImmediately {
periodDuration, err := j.periodDuration()
if err != nil {
return nil, err
}

if j.lastRun == s.time.Unix(0, 0) {
j.lastRun = s.time.Now(s.loc)

if j.atTime != 0 {
j.lastRun = j.lastRun.Add(-periodDuration)
}
}

if err := s.scheduleNextRun(j); err != nil {
return nil, err
}
Expand Down Expand Up @@ -283,11 +298,16 @@ func (s *Scheduler) StartAt(t time.Time) *Scheduler {
// StartImmediately sets the Jobs next run as soon as the scheduler starts
func (s *Scheduler) StartImmediately() *Scheduler {
job := s.getCurrentJob()
job.nextRun = s.time.Now().In(s.loc)
job.nextRun = s.time.Now(s.loc)
job.startsImmediately = true
return s
}

// shouldRun returns true if the Job should be run now
func (s *Scheduler) shouldRun(j *Job) bool {
return s.time.Now(s.loc).Unix() >= j.nextRun.Unix()
}

// setUnit sets the unit type
func (s *Scheduler) setUnit(unit timeUnit) {
currentJob := s.getCurrentJob()
Expand Down
8 changes: 4 additions & 4 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestExecutionSecond(t *testing.T) {
func TestExecutionSeconds(t *testing.T) {
sched := NewScheduler(time.UTC)
jobDone := make(chan bool)
executionTimes := make([]int64, 0, 2)
var executionTimes []int64
numberOfIterations := 2

sched.Every(2).Seconds().Do(func() {
Expand Down Expand Up @@ -75,18 +75,18 @@ func TestStartImmediately(t *testing.T) {
func TestAt(t *testing.T) {
s := NewScheduler(time.UTC)

// Schedule to run in next minute
// Schedule to run in next 2 seconds
now := time.Now().UTC()
dayJobDone := make(chan bool, 1)

// Schedule every day At
startAt := fmt.Sprintf("%02d:%02d", now.Hour(), now.Add(time.Minute).Minute())
startAt := fmt.Sprintf("%02d:%02d:%02d", now.Hour(), now.Minute(), now.Add(time.Second*2).Second())
dayJob, _ := s.Every(1).Day().At(startAt).Do(func() {
dayJobDone <- true
})

// Expected start time
expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Add(time.Minute).Minute(), 0, 0, time.UTC)
expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Add(time.Second*2).Second(), 0, time.UTC)
nextRun := dayJob.NextScheduledTime()
assert.Equal(t, expectedStartTime, nextRun)

Expand Down
7 changes: 4 additions & 3 deletions timeHelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gocron
import "time"

type timeHelper interface {
Now() time.Time
Now(*time.Location) time.Time
Unix(int64, int64) time.Time
Sleep(time.Duration)
Date(int, time.Month, int, int, int, int, int, *time.Location) time.Time
Expand All @@ -16,8 +16,9 @@ func newTimeHelper() timeHelper {

type trueTime struct{}

func (t *trueTime) Now() time.Time {
return time.Now()
func (t *trueTime) Now(location *time.Location) time.Time {
n := time.Now().In(location)
return t.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), 0, location)
}

func (t *trueTime) Unix(sec int64, nsec int64) time.Time {
Expand Down

0 comments on commit 602934f

Please sign in to comment.