Skip to content

Commit

Permalink
refact: schedules jobs with time.AfterFunc() (#99)
Browse files Browse the repository at this point in the history
refact: schedules jobs with time.AfterFunc()

Co-authored-by: John Roesler <[email protected]>
  • Loading branch information
Streppel and JohnRoesler authored Dec 30, 2020
1 parent 23eab11 commit 7eacf6d
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 125 deletions.
15 changes: 14 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Job struct {
tags []string // allow the user to tag Jobs with certain labels
runConfig runConfig // configuration for how many times to run the job
runCount int // number of times the job ran
timer *time.Timer
}

type runConfig struct {
Expand All @@ -51,8 +52,8 @@ func NewJob(interval uint64) *Job {
func (j *Job) run() {
j.Lock()
defer j.Unlock()
callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
j.runCount++
go callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc])
}

func (j *Job) neverRan() bool {
Expand All @@ -73,6 +74,18 @@ func (j *Job) setStartsImmediately(b bool) {
j.startsImmediately = b
}

func (j *Job) getTimer() *time.Timer {
j.RLock()
defer j.RUnlock()
return j.timer
}

func (j *Job) setTimer(t *time.Timer) {
j.Lock()
defer j.Unlock()
j.timer = t
}

func (j *Job) getAtTime() time.Duration {
j.RLock()
defer j.RUnlock()
Expand Down
126 changes: 54 additions & 72 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,50 @@ func NewScheduler(loc *time.Location) *Scheduler {
jobs: make([]*Job, 0),
location: loc,
running: false,
stopChan: make(chan struct{}),
stopChan: make(chan struct{}, 1),
time: &trueTime{},
}
}

// StartBlocking starts all the pending jobs using a second-long ticker and blocks the current thread
// StartBlocking starts all jobs and blocks the current thread
func (s *Scheduler) StartBlocking() {
<-s.StartAsync()
}

// StartAsync starts a goroutine that runs all the pending using a second-long ticker
// StartAsync starts all jobs without blocking the current thread
func (s *Scheduler) StartAsync() chan struct{} {
if s.IsRunning() {
return s.stopChan
}
s.start()
go func() {
<-s.stopChan
s.setRunning(false)
return
}()
return s.stopChan
}

//start starts the scheduler, scheduling and running jobs
func (s *Scheduler) start() {
s.setRunning(true)
s.runJobs(s.Jobs())
}

s.scheduleAllJobs()
ticker := s.time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
s.RunPending()
case <-s.stopChan:
ticker.Stop()
s.setRunning(false)
return
func (s *Scheduler) runJobs(jobs []*Job) {
for _, j := range jobs {
if j.getStartsImmediately() {
s.run(j)
j.setStartsImmediately(false)
}
if !j.shouldRun() {
if j.getRemoveAfterLastRun() { // TODO: this method seems unnecessary as we could always remove after the run cout has expired. Maybe remove this in the future?
s.RemoveByReference(j)
}
continue
}
}()

return s.stopChan
s.scheduleNextRun(j)
}
}

func (s *Scheduler) setRunning(b bool) {
Expand Down Expand Up @@ -126,41 +137,40 @@ func (s *Scheduler) Location() *time.Location {

// scheduleNextRun Compute the instant when this Job should run next
func (s *Scheduler) scheduleNextRun(job *Job) {
now := s.time.Now(s.Location())
now := s.now()
lastRun := job.LastRun()

// job can be scheduled with .StartAt()
if job.neverRan() {
if !job.NextRun().IsZero() {
return // scheduled for future run and should skip scheduling
}
// default is for jobs to start immediately unless scheduled at a specific time or day
if job.getStartsImmediately() {
job.setNextRun(now)
return
}
lastRun = now
}

job.setLastRun(now)

durationToNextRun := s.durationToNextRun(job)
job.setNextRun(job.LastRun().Add(durationToNextRun))
durationToNextRun := s.durationToNextRun(lastRun, job)
job.setNextRun(lastRun.Add(durationToNextRun))
job.setTimer(time.AfterFunc(durationToNextRun, func() {
s.run(job)
s.scheduleNextRun(job)
}))
}

func (s *Scheduler) durationToNextRun(job *Job) time.Duration {
lastRun := job.LastRun()
func (s *Scheduler) durationToNextRun(t time.Time, job *Job) time.Duration {
var duration time.Duration
switch job.unit {
case seconds, minutes, hours:
duration = s.calculateDuration(job)
case days:
duration = s.calculateDays(job, lastRun)
duration = s.calculateDays(job, t)
case weeks:
if job.scheduledWeekday != nil { // weekday selected, Every().Monday(), for example
duration = s.calculateWeekday(job, lastRun)
duration = s.calculateWeekday(job, t)
} else {
duration = s.calculateWeeks(job, lastRun)
duration = s.calculateWeeks(job, t)
}
case months:
duration = s.calculateMonths(job, lastRun)
duration = s.calculateMonths(job, t)
}
return duration
}
Expand Down Expand Up @@ -280,18 +290,6 @@ func (s *Scheduler) roundToMidnight(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, s.Location())
}

// Get the current runnable Jobs, which shouldRun is True
func (s *Scheduler) runnableJobs() []*Job {
var runnableJobs []*Job
sort.Sort(s)
for _, job := range s.Jobs() {
if s.shouldRun(job) {
runnableJobs = append(runnableJobs, job)
}
}
return runnableJobs
}

// NextRun datetime when the next Job should run.
func (s *Scheduler) NextRun() (*Job, time.Time) {
if len(s.Jobs()) <= 0 {
Expand All @@ -310,25 +308,13 @@ func (s *Scheduler) Every(interval uint64) *Scheduler {
return s
}

// RunPending runs all the Jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
for _, job := range s.runnableJobs() {
s.runAndReschedule(job) // we should handle this error somehow
func (s *Scheduler) run(job *Job) {
if !s.IsRunning() {
return
}
}

func (s *Scheduler) runAndReschedule(job *Job) error {
if err := s.run(job); err != nil {
return err
}
s.scheduleNextRun(job)
return nil
}

func (s *Scheduler) run(job *Job) error {
job.setLastRun(s.time.Now(s.Location()))
go job.run()
return nil
job.setLastRun(s.now())
job.run()
}

// RunAll run all Jobs regardless if they are scheduled to run or not
Expand All @@ -339,10 +325,7 @@ func (s *Scheduler) RunAll() {
// RunAllWithDelay runs all Jobs with delay seconds
func (s *Scheduler) RunAllWithDelay(d int) {
for _, job := range s.Jobs() {
err := s.run(job)
if err != nil {
continue
}
s.run(job)
s.time.Sleep(time.Duration(d) * time.Second)
}
}
Expand Down Expand Up @@ -418,11 +401,12 @@ func (s *Scheduler) Clear() {
// Stop stops the scheduler. This is a no-op if the scheduler is already stopped .
func (s *Scheduler) Stop() {
if s.IsRunning() {
s.stopScheduler()
s.stop()
}
}

func (s *Scheduler) stopScheduler() {
func (s *Scheduler) stop() {
s.setRunning(false)
s.stopChan <- struct{}{}
}

Expand Down Expand Up @@ -626,8 +610,6 @@ func (s *Scheduler) getCurrentJob() *Job {
return s.Jobs()[len(s.jobs)-1]
}

func (s *Scheduler) scheduleAllJobs() {
for _, j := range s.Jobs() {
s.scheduleNextRun(j)
}
func (s *Scheduler) now() time.Time {
return s.time.Now(s.Location())
}
Loading

0 comments on commit 7eacf6d

Please sign in to comment.