From 950aaf83c7f3e88fe31a9f67003b227162004f26 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 10 Apr 2020 00:15:51 -0500 Subject: [PATCH 1/5] fix bug in scheduler, rescheduling jobs immediately --- job.go | 8 -------- scheduler.go | 34 +++++++++++++++++++++++++--------- scheduler_test.go | 6 +++--- timeHelper.go | 12 +++++++++--- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/job.go b/job.go index 4427bbe7..7e782758 100644 --- a/job.go +++ b/job.go @@ -20,7 +20,6 @@ type Job struct { fparams map[string][]interface{} // Map for function and params of function lock bool // lock the Job from running at same time form multiple instances tags []string // allow the user to tag Jobs with certain labels - time timeHelper // an instance of timeHelper to interact with the time package } // NewJob creates a new Job with the provided interval @@ -34,18 +33,11 @@ func NewJob(interval uint64) *Job { funcs: make(map[string]interface{}), fparams: make(map[string][]interface{}), tags: []string{}, - time: th, } } -// shouldRun returns true if the Job should be run now -func (j *Job) shouldRun() bool { - return j.time.Now().Unix() >= j.nextRun.Unix() -} - // Run the Job and immediately reschedule it func (j *Job) run() { - j.lastRun = j.time.Now() go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc]) } diff --git a/scheduler.go b/scheduler.go index 4c93c30a..d455e96a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -70,12 +70,9 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) { // scheduleNextRun Compute the instant when this Job should run next func (s *Scheduler) scheduleNextRun(j *Job) error { - now := s.time.Now().In(s.loc) - if j.lastRun == s.time.Unix(0, 0) { - j.lastRun = now - } + now := s.time.NowRoundedDownToSeconds(s.loc) - if j.nextRun.After(now) { + if j.nextRun.Second() > now.Second() { return nil } @@ -89,7 +86,7 @@ func (s *Scheduler) scheduleNextRun(j *Job) error { j.nextRun = j.lastRun.Add(periodDuration) case days: j.nextRun = s.roundToMidnight(j.lastRun) - j.nextRun = j.nextRun.Add(j.atTime) + j.nextRun = j.nextRun.Add(j.atTime).Add(periodDuration) case weeks: j.nextRun = s.roundToMidnight(j.lastRun) dayDiff := int(j.startDay) @@ -118,7 +115,7 @@ func (s *Scheduler) getRunnableJobs() []*Job { var runnableJobs []*Job sort.Sort(s) for _, job := range s.jobs { - if job.shouldRun() { + if s.shouldRun(job) { runnableJobs = append(runnableJobs, job) } else { break @@ -130,7 +127,7 @@ func (s *Scheduler) getRunnableJobs() []*Job { // NextRun datetime when the next Job should run. func (s *Scheduler) NextRun() (*Job, time.Time) { if len(s.jobs) <= 0 { - return nil, s.time.Now() + return nil, s.time.NowRoundedDownToSeconds(s.loc) } sort.Sort(s) return s.jobs[0], s.jobs[0].nextRun @@ -161,6 +158,7 @@ func (s *Scheduler) runJob(job *Job) error { locker.Lock(key) defer locker.Unlock(key) } + job.lastRun = s.time.NowRoundedDownToSeconds(s.loc) job.run() err := s.scheduleNextRun(job) if err != nil { @@ -253,6 +251,19 @@ func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error) j.jobFunc = fname if !j.startsImmediately { + periodDuration, err := j.periodDuration() + if err != nil { + return nil, err + } + + if j.lastRun == s.time.Unix(0, 0) { + j.lastRun = s.time.NowRoundedDownToSeconds(s.loc) + + if j.atTime != 0 { + j.lastRun = j.lastRun.Add(-periodDuration) + } + } + if err := s.scheduleNextRun(j); err != nil { return nil, err } @@ -283,11 +294,16 @@ func (s *Scheduler) StartAt(t time.Time) *Scheduler { // StartImmediately sets the Jobs next run as soon as the scheduler starts func (s *Scheduler) StartImmediately() *Scheduler { job := s.getCurrentJob() - job.nextRun = s.time.Now().In(s.loc) + job.nextRun = s.time.NowRoundedDownToSeconds(s.loc) job.startsImmediately = true return s } +// shouldRun returns true if the Job should be run now +func (s *Scheduler) shouldRun(j *Job) bool { + return s.time.Now(s.loc).Unix() >= j.nextRun.Unix() +} + // setUnit sets the unit type func (s *Scheduler) setUnit(unit timeUnit) { currentJob := s.getCurrentJob() diff --git a/scheduler_test.go b/scheduler_test.go index 011306e3..cb4d31c9 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -75,18 +75,18 @@ func TestStartImmediately(t *testing.T) { func TestAt(t *testing.T) { s := NewScheduler(time.UTC) - // Schedule to run in next minute + // Schedule to run in next 2 seconds now := time.Now().UTC() dayJobDone := make(chan bool, 1) // Schedule every day At - startAt := fmt.Sprintf("%02d:%02d", now.Hour(), now.Add(time.Minute).Minute()) + startAt := fmt.Sprintf("%02d:%02d:%02d", now.Hour(), now.Minute(), now.Add(time.Second*2).Second()) dayJob, _ := s.Every(1).Day().At(startAt).Do(func() { dayJobDone <- true }) // Expected start time - expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Add(time.Minute).Minute(), 0, 0, time.UTC) + expectedStartTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Add(time.Second*2).Second(), 0, time.UTC) nextRun := dayJob.NextScheduledTime() assert.Equal(t, expectedStartTime, nextRun) diff --git a/timeHelper.go b/timeHelper.go index 5acc315b..1404b186 100644 --- a/timeHelper.go +++ b/timeHelper.go @@ -3,7 +3,8 @@ package gocron import "time" type timeHelper interface { - Now() time.Time + Now(*time.Location) time.Time + NowRoundedDownToSeconds(*time.Location) time.Time Unix(int64, int64) time.Time Sleep(time.Duration) Date(int, time.Month, int, int, int, int, int, *time.Location) time.Time @@ -16,8 +17,13 @@ func newTimeHelper() timeHelper { type trueTime struct{} -func (t *trueTime) Now() time.Time { - return time.Now() +func (t *trueTime) Now(location *time.Location) time.Time { + return time.Now().In(location) +} + +func (t *trueTime) NowRoundedDownToSeconds(location *time.Location) time.Time { + n := t.Now(location) + return t.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), 0, location) } func (t *trueTime) Unix(sec int64, nsec int64) time.Time { From 94e1e962299c4b62fec9025062f716cf54ca0e3b Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 10 Apr 2020 09:41:51 -0500 Subject: [PATCH 2/5] scheduleNextRun - nextRun check use .Unix() --- scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler.go b/scheduler.go index d455e96a..c57e5a06 100644 --- a/scheduler.go +++ b/scheduler.go @@ -72,7 +72,7 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) { func (s *Scheduler) scheduleNextRun(j *Job) error { now := s.time.NowRoundedDownToSeconds(s.loc) - if j.nextRun.Second() > now.Second() { + if j.nextRun.Unix() > now.Unix() { return nil } From 46d7545b76cfab41dc5616b015e08fa91e43df01 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 10 Apr 2020 10:04:58 -0500 Subject: [PATCH 3/5] timehelper Now rounds to seconds by default --- scheduler.go | 10 +++++----- timeHelper.go | 7 +------ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/scheduler.go b/scheduler.go index c57e5a06..350ded00 100644 --- a/scheduler.go +++ b/scheduler.go @@ -70,7 +70,7 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) { // scheduleNextRun Compute the instant when this Job should run next func (s *Scheduler) scheduleNextRun(j *Job) error { - now := s.time.NowRoundedDownToSeconds(s.loc) + now := s.time.Now(s.loc) if j.nextRun.Unix() > now.Unix() { return nil @@ -127,7 +127,7 @@ func (s *Scheduler) getRunnableJobs() []*Job { // NextRun datetime when the next Job should run. func (s *Scheduler) NextRun() (*Job, time.Time) { if len(s.jobs) <= 0 { - return nil, s.time.NowRoundedDownToSeconds(s.loc) + return nil, s.time.Now(s.loc) } sort.Sort(s) return s.jobs[0], s.jobs[0].nextRun @@ -158,7 +158,7 @@ func (s *Scheduler) runJob(job *Job) error { locker.Lock(key) defer locker.Unlock(key) } - job.lastRun = s.time.NowRoundedDownToSeconds(s.loc) + job.lastRun = s.time.Now(s.loc) job.run() err := s.scheduleNextRun(job) if err != nil { @@ -257,7 +257,7 @@ func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error) } if j.lastRun == s.time.Unix(0, 0) { - j.lastRun = s.time.NowRoundedDownToSeconds(s.loc) + j.lastRun = s.time.Now(s.loc) if j.atTime != 0 { j.lastRun = j.lastRun.Add(-periodDuration) @@ -294,7 +294,7 @@ func (s *Scheduler) StartAt(t time.Time) *Scheduler { // StartImmediately sets the Jobs next run as soon as the scheduler starts func (s *Scheduler) StartImmediately() *Scheduler { job := s.getCurrentJob() - job.nextRun = s.time.NowRoundedDownToSeconds(s.loc) + job.nextRun = s.time.Now(s.loc) job.startsImmediately = true return s } diff --git a/timeHelper.go b/timeHelper.go index 1404b186..02ff240d 100644 --- a/timeHelper.go +++ b/timeHelper.go @@ -4,7 +4,6 @@ import "time" type timeHelper interface { Now(*time.Location) time.Time - NowRoundedDownToSeconds(*time.Location) time.Time Unix(int64, int64) time.Time Sleep(time.Duration) Date(int, time.Month, int, int, int, int, int, *time.Location) time.Time @@ -18,11 +17,7 @@ func newTimeHelper() timeHelper { type trueTime struct{} func (t *trueTime) Now(location *time.Location) time.Time { - return time.Now().In(location) -} - -func (t *trueTime) NowRoundedDownToSeconds(location *time.Location) time.Time { - n := t.Now(location) + n := time.Now().In(location) return t.Date(n.Year(), n.Month(), n.Day(), n.Hour(), n.Minute(), n.Second(), 0, location) } From 10d5840d029c084467572824e53ae23d48f93411 Mon Sep 17 00:00:00 2001 From: streppel Date: Fri, 10 Apr 2020 13:44:26 -0300 Subject: [PATCH 4/5] removing non-reschedule pathway on scheduling function; small refactors --- job.go | 2 +- scheduler.go | 28 ++++++++++++++++------------ scheduler_test.go | 2 +- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/job.go b/job.go index 7e782758..3a262d09 100644 --- a/job.go +++ b/job.go @@ -38,7 +38,7 @@ func NewJob(interval uint64) *Job { // Run the Job and immediately reschedule it func (j *Job) run() { - go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc]) + callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc]) } // Err returns an error if one ocurred while creating the Job diff --git a/scheduler.go b/scheduler.go index 350ded00..aec7db15 100644 --- a/scheduler.go +++ b/scheduler.go @@ -72,10 +72,6 @@ func (s *Scheduler) SetLocation(newLocation *time.Location) { func (s *Scheduler) scheduleNextRun(j *Job) error { now := s.time.Now(s.loc) - if j.nextRun.Unix() > now.Unix() { - return nil - } - periodDuration, err := j.periodDuration() if err != nil { return err @@ -144,11 +140,21 @@ func (s *Scheduler) Every(interval uint64) *Scheduler { func (s *Scheduler) RunPending() { runnableJobs := s.getRunnableJobs() for _, job := range runnableJobs { - s.runJob(job) + s.runAndReschedule(job) // we should handle this error somehow + } +} + +func (s *Scheduler) runAndReschedule(job *Job) error { + if err := s.run(job); err != nil { + return err + } + if err := s.scheduleNextRun(job); err != nil { + return err } + return nil } -func (s *Scheduler) runJob(job *Job) error { +func (s *Scheduler) run(job *Job) error { if job.lock { if locker == nil { return fmt.Errorf("trying to lock %s with nil locker", job.jobFunc) @@ -158,12 +164,10 @@ func (s *Scheduler) runJob(job *Job) error { locker.Lock(key) defer locker.Unlock(key) } + job.lastRun = s.time.Now(s.loc) - job.run() - err := s.scheduleNextRun(job) - if err != nil { - return err - } + go job.run() + return nil } @@ -175,7 +179,7 @@ func (s *Scheduler) RunAll() { // RunAllWithDelay runs all Jobs with delay seconds func (s *Scheduler) RunAllWithDelay(d int) { for _, job := range s.jobs { - err := s.runJob(job) + err := s.run(job) if err != nil { continue } diff --git a/scheduler_test.go b/scheduler_test.go index cb4d31c9..72e10ab9 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -30,7 +30,7 @@ func TestExecutionSeconds(t *testing.T) { sched := NewScheduler(time.UTC) jobDone := make(chan bool) executionTimes := make([]int64, 0, 2) - numberOfIterations := 2 + numberOfIterations := 20 sched.Every(2).Seconds().Do(func() { executionTimes = append(executionTimes, time.Now().Unix()) From 2874920b6f2a4b1e6e77ddd5093a62ba46e532a9 Mon Sep 17 00:00:00 2001 From: streppel Date: Fri, 10 Apr 2020 13:45:29 -0300 Subject: [PATCH 5/5] refactoring TestExecutionSeconds --- scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler_test.go b/scheduler_test.go index 72e10ab9..ca57e28d 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -29,8 +29,8 @@ func TestExecutionSecond(t *testing.T) { func TestExecutionSeconds(t *testing.T) { sched := NewScheduler(time.UTC) jobDone := make(chan bool) - executionTimes := make([]int64, 0, 2) - numberOfIterations := 20 + var executionTimes []int64 + numberOfIterations := 2 sched.Every(2).Seconds().Do(func() { executionTimes = append(executionTimes, time.Now().Unix())