Skip to content

Commit

Permalink
feat!: introduce JobDetail and JobKey for job scheduling (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 5, 2024
1 parent 891d53e commit 19d3ae3
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 199 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ type Scheduler interface {
IsStarted() bool

// ScheduleJob schedules a job using a specified trigger.
ScheduleJob(ctx context.Context, job Job, trigger Trigger) error
ScheduleJob(ctx context.Context, jobDetail *JobDetail, trigger Trigger) error

// GetJobKeys returns the keys of all of the scheduled jobs.
GetJobKeys() []int
GetJobKeys() []*JobKey

// GetScheduledJob returns the scheduled job with the specified key.
GetScheduledJob(key int) (ScheduledJob, error)
GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)

// DeleteJob removes the job with the specified key from the
// scheduler's execution queue.
DeleteJob(ctx context.Context, key int) error
DeleteJob(ctx context.Context, jobKey *JobKey) error

// Clear removes all of the scheduled jobs.
Clear() error
Expand Down Expand Up @@ -85,9 +85,6 @@ type Job interface {

// Description returns the description of the Job.
Description() string

// Key returns the unique key for the Job.
Key() int
}
```

Expand Down Expand Up @@ -159,9 +156,12 @@ func main() {
functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })

// register jobs to scheduler
sched.ScheduleJob(ctx, shellJob, cronTrigger)
sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*5))
sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")),
cronTrigger)
sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")),
quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")),
quartz.NewSimpleTrigger(time.Second*5))

// stop scheduler
sched.Stop()
Expand Down
29 changes: 18 additions & 11 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,30 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) {
return
}

cronJob := PrintJob{"Cron job"}
cronJob := quartz.NewJobDetail(&PrintJob{"Cron job"}, quartz.NewJobKey("cronJob"))
sched.Start(ctx)

_ = sched.ScheduleJob(ctx, &PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
_ = sched.ScheduleJob(ctx, &PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
_ = sched.ScheduleJob(ctx, &PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
_ = sched.ScheduleJob(ctx, &PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3))
_ = sched.ScheduleJob(ctx, &cronJob, cronTrigger)
runOnceJobDetail := quartz.NewJobDetail(&PrintJob{"Ad hoc Job"}, quartz.NewJobKey("runOnceJob"))
jobDetail1 := quartz.NewJobDetail(&PrintJob{"First job"}, quartz.NewJobKey("job1"))
jobDetail2 := quartz.NewJobDetail(&PrintJob{"Second job"}, quartz.NewJobKey("job2"))
jobDetail3 := quartz.NewJobDetail(&PrintJob{"Third job"}, quartz.NewJobKey("job3"))
_ = sched.ScheduleJob(ctx, runOnceJobDetail, quartz.NewRunOnceTrigger(time.Second*5))
_ = sched.ScheduleJob(ctx, jobDetail1, quartz.NewSimpleTrigger(time.Second*12))
_ = sched.ScheduleJob(ctx, jobDetail2, quartz.NewSimpleTrigger(time.Second*6))
_ = sched.ScheduleJob(ctx, jobDetail3, quartz.NewSimpleTrigger(time.Second*3))
_ = sched.ScheduleJob(ctx, cronJob, cronTrigger)

time.Sleep(time.Second * 10)

scheduledJob, err := sched.GetScheduledJob(cronJob.Key())
scheduledJob, err := sched.GetScheduledJob(cronJob.JobKey())
if err != nil {
fmt.Println(err)
return
}

fmt.Println(scheduledJob.Trigger().Description())
fmt.Println("Before delete: ", sched.GetJobKeys())
_ = sched.DeleteJob(ctx, cronJob.Key())
_ = sched.DeleteJob(ctx, cronJob.JobKey())
fmt.Println("After delete: ", sched.GetJobKeys())

time.Sleep(time.Second * 2)
Expand Down Expand Up @@ -89,9 +93,12 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
curlJob := quartz.NewCurlJob(request)
functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })

_ = sched.ScheduleJob(ctx, shellJob, cronTrigger)
_ = sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7))
_ = sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*3))
shellJobDetail := quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob"))
curlJobDetail := quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob"))
functionJobDetail := quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob"))
_ = sched.ScheduleJob(ctx, shellJobDetail, cronTrigger)
_ = sched.ScheduleJob(ctx, curlJobDetail, quartz.NewSimpleTrigger(time.Second*7))
_ = sched.ScheduleJob(ctx, functionJobDetail, quartz.NewSimpleTrigger(time.Second*3))

time.Sleep(time.Second * 10)

Expand Down
10 changes: 4 additions & 6 deletions examples/print_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ type PrintJob struct {
desc string
}

var _ quartz.Job = (*PrintJob)(nil)

// Description returns the description of the PrintJob.
func (pj *PrintJob) Description() string {
return pj.desc
}

// Key returns the unique PrintJob key.
func (pj *PrintJob) Key() int {
return quartz.HashCode(pj.Description())
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (pj *PrintJob) Execute(_ context.Context) {
func (pj *PrintJob) Execute(_ context.Context) error {
fmt.Println("Executing " + pj.Description())
return nil
}
43 changes: 43 additions & 0 deletions examples/readme/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"net/http"
"time"

"github.com/reugn/go-quartz/quartz"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create scheduler
sched := quartz.NewStdScheduler()

// async start scheduler
sched.Start(ctx)

// create jobs
cronTrigger, _ := quartz.NewCronTrigger("1/5 * * * * *")
shellJob := quartz.NewShellJob("ls -la")

request, _ := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil)
curlJob := quartz.NewCurlJob(request)

functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })

// register jobs to scheduler
sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")),
cronTrigger)
sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")),
quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")),
quartz.NewSimpleTrigger(time.Second*5))

// stop scheduler
sched.Stop()

// wait for all workers to exit
sched.Wait(ctx)
}
10 changes: 4 additions & 6 deletions quartz/function_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type FunctionJob[R any] struct {
jobStatus JobStatus
}

var _ Job = (*FunctionJob[any])(nil)

// NewFunctionJob returns a new FunctionJob without an explicit description.
func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] {
return &FunctionJob[R]{
Expand All @@ -44,14 +46,9 @@ func (f *FunctionJob[R]) Description() string {
return f.desc
}

// Key returns the unique FunctionJob key.
func (f *FunctionJob[R]) Key() int {
return HashCode(fmt.Sprintf("%s:%p", f.desc, f.function))
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
// It invokes the held function, setting the results in Result and Error members.
func (f *FunctionJob[R]) Execute(ctx context.Context) {
func (f *FunctionJob[R]) Execute(ctx context.Context) error {
result, err := (*f.function)(ctx)
f.Lock()
if err != nil {
Expand All @@ -64,6 +61,7 @@ func (f *FunctionJob[R]) Execute(ctx context.Context) {
f.err = nil
}
f.Unlock()
return err
}

// Result returns the result of the FunctionJob.
Expand Down
11 changes: 6 additions & 5 deletions quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ func TestFunctionJob(t *testing.T) {

sched := quartz.NewStdScheduler()
sched.Start(ctx)
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob1, quartz.NewJobKey("funcJob1")),
quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob2, quartz.NewJobKey("funcJob2")),
quartz.NewRunOnceTrigger(time.Millisecond*800))
time.Sleep(time.Second)
_ = sched.Clear()
sched.Stop()
Expand All @@ -44,7 +46,7 @@ func TestFunctionJob(t *testing.T) {
assertEqual(t, int(atomic.LoadInt32(&n)), 6)
}

func TestNewFunctionJobWithDescAndKey(t *testing.T) {
func TestNewFunctionJobWithDesc(t *testing.T) {
jobDesc := "test job"

funcJob1 := quartz.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) {
Expand All @@ -56,8 +58,7 @@ func TestNewFunctionJobWithDescAndKey(t *testing.T) {
})

assertEqual(t, funcJob1.Description(), jobDesc)
assertEqual(t, funcJob1.Key(), funcJob1.Key())
assertNotEqual(t, funcJob1.Key(), funcJob2.Key())
assertEqual(t, funcJob2.Description(), jobDesc)
}

func TestFunctionJobRespectsContext(t *testing.T) {
Expand Down
37 changes: 15 additions & 22 deletions quartz/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quartz
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -11,22 +12,17 @@ import (
"strings"
"sync"
"sync/atomic"

"github.com/reugn/go-quartz/quartz/logger"
)

// Job represents an interface to be implemented by structs which
// represent a 'job' to be performed.
type Job interface {
// Execute is called by a Scheduler when the Trigger associated
// with this job fires.
Execute(context.Context)
Execute(context.Context) error

// Description returns the description of the Job.
Description() string

// Key returns the unique key for the Job.
Key() int
}

// JobStatus represents a Job status.
Expand Down Expand Up @@ -55,6 +51,8 @@ type ShellJob struct {
callback func(context.Context, *ShellJob)
}

var _ Job = (*ShellJob)(nil)

// NewShellJob returns a new ShellJob for the given command.
func NewShellJob(cmd string) *ShellJob {
return &ShellJob{
Expand All @@ -77,11 +75,6 @@ func (sh *ShellJob) Description() string {
return fmt.Sprintf("ShellJob: %s", sh.cmd)
}

// Key returns the unique ShellJob key.
func (sh *ShellJob) Key() int {
return HashCode(sh.Description())
}

var (
shellOnce = sync.Once{}
shellPath = "bash"
Expand All @@ -99,7 +92,7 @@ func getShell() string {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (sh *ShellJob) Execute(ctx context.Context) {
func (sh *ShellJob) Execute(ctx context.Context) error {
shell := getShell()

var stdout, stderr bytes.Buffer
Expand All @@ -124,6 +117,7 @@ func (sh *ShellJob) Execute(ctx context.Context) {
if sh.callback != nil {
sh.callback(ctx, sh)
}
return nil
}

// ExitCode returns the exit code of the ShellJob.
Expand Down Expand Up @@ -167,6 +161,8 @@ type CurlJob struct {
callback func(context.Context, *CurlJob)
}

var _ Job = (*CurlJob)(nil)

// HTTPHandler sends an HTTP request and returns an HTTP response,
// following policy (such as redirects, cookies, auth) as configured
// on the implementing HTTP client.
Expand Down Expand Up @@ -204,11 +200,6 @@ func (cu *CurlJob) Description() string {
return fmt.Sprintf("CurlJob:\n%s", cu.description)
}

// Key returns the unique CurlJob key.
func (cu *CurlJob) Key() int {
return HashCode(cu.description)
}

// DumpResponse returns the response of the job in its HTTP/1.x wire
// representation.
// If body is true, DumpResponse also returns the body.
Expand Down Expand Up @@ -241,7 +232,7 @@ func formatRequest(r *http.Request) string {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (cu *CurlJob) Execute(ctx context.Context) {
func (cu *CurlJob) Execute(ctx context.Context) error {
cu.Lock()
cu.request = cu.request.WithContext(ctx)
var err error
Expand All @@ -257,6 +248,7 @@ func (cu *CurlJob) Execute(ctx context.Context) {
if cu.callback != nil {
cu.callback(ctx, cu)
}
return nil
}

type isolatedJob struct {
Expand All @@ -265,15 +257,16 @@ type isolatedJob struct {
isRunning *atomic.Value
}

var _ Job = (*isolatedJob)(nil)

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (j *isolatedJob) Execute(ctx context.Context) {
func (j *isolatedJob) Execute(ctx context.Context) error {
if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) {
logger.Debugf("Executed job %d is running.", j.Job.Key())
return
return errors.New("job is running")
}
defer j.isRunning.Store(false)

j.Job.Execute(ctx)
return j.Job.Execute(ctx)
}

// NewIsolatedJob wraps a job object and ensures that only one
Expand Down
Loading

0 comments on commit 19d3ae3

Please sign in to comment.