Skip to content

Commit

Permalink
refactor: canary job scheduling (#1760)
Browse files Browse the repository at this point in the history
* refactor: canary job scheduling

* fix: ctx for canary job test

* chore: run canary jobs when scheduled

* chore: update test delay in canary_job
  • Loading branch information
yashmehrotra authored Apr 1, 2024
1 parent ef348d4 commit ce480c5
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/fergusstrange/embedded-postgres v1.25.0
github.com/flanksource/artifacts v1.0.4
github.com/flanksource/commons v1.22.1
github.com/flanksource/duty v1.0.403
github.com/flanksource/duty v1.0.407
github.com/flanksource/gomplate/v3 v3.24.2
github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7
github.com/flanksource/kommons v0.31.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ github.com/flanksource/artifacts v1.0.4 h1:KjQTwsvQ73uHqTK7o4Jwt/RW8fyxJOTJ6JLgz
github.com/flanksource/artifacts v1.0.4/go.mod h1:wkbdseaTkDo4Q6k6T86vXd4Uy47M6NPCmexgHvCTDl0=
github.com/flanksource/commons v1.22.1 h1:Ycg8r26bx537UTdAEFgngDW1r2j5bX6Lr3NGxLICpiw=
github.com/flanksource/commons v1.22.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/duty v1.0.403 h1:/vOyN5/F3uIos+H4/zNBKDnvVphNWClXyw3xn238rdg=
github.com/flanksource/duty v1.0.403/go.mod h1:qQm7wt7TlqV6TFn+PU98bQIYCCau5/3baepXGTQRtV4=
github.com/flanksource/duty v1.0.407 h1:j8tjpZV0IZsmA36GupH/gA+NurAErtBX8V37GXf5SFQ=
github.com/flanksource/duty v1.0.407/go.mod h1:qQm7wt7TlqV6TFn+PU98bQIYCCau5/3baepXGTQRtV4=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.24.2 h1:WZSriw1MaBhzrDV1IOP9eNsupIPxIHy0yTaMOVhCvsk=
github.com/flanksource/gomplate/v3 v3.24.2/go.mod h1:94BxYobZqouGdVezuz6LNto5C+yLMG0LnNnM9CUPyoo=
Expand Down
2 changes: 1 addition & 1 deletion hack/generate-schemas/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
github.com/exaring/otelpgx v0.5.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flanksource/duty v1.0.403 // indirect
github.com/flanksource/duty v1.0.407 // indirect
github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 // indirect
github.com/flanksource/kommons v0.31.4 // indirect
github.com/flanksource/kubectl-neat v1.0.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions hack/generate-schemas/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flanksource/commons v1.22.1 h1:Ycg8r26bx537UTdAEFgngDW1r2j5bX6Lr3NGxLICpiw=
github.com/flanksource/commons v1.22.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/duty v1.0.403 h1:/vOyN5/F3uIos+H4/zNBKDnvVphNWClXyw3xn238rdg=
github.com/flanksource/duty v1.0.403/go.mod h1:qQm7wt7TlqV6TFn+PU98bQIYCCau5/3baepXGTQRtV4=
github.com/flanksource/duty v1.0.407 h1:j8tjpZV0IZsmA36GupH/gA+NurAErtBX8V37GXf5SFQ=
github.com/flanksource/duty v1.0.407/go.mod h1:qQm7wt7TlqV6TFn+PU98bQIYCCau5/3baepXGTQRtV4=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.24.2 h1:WZSriw1MaBhzrDV1IOP9eNsupIPxIHy0yTaMOVhCvsk=
github.com/flanksource/gomplate/v3 v3.24.2/go.mod h1:94BxYobZqouGdVezuz6LNto5C+yLMG0LnNnM9CUPyoo=
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/canary/canary_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

canaryCtx "github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/duty/models"
Expand Down Expand Up @@ -57,13 +58,14 @@ var _ = ginkgo.Describe("Canary Job sync", ginkgo.Ordered, func() {
ginkgo.It("schedule the canary job", func() {
MinimumTimeBetweenCanaryRuns = 0 // reset this for now so it doesn't hinder test with small schedules
SyncCanaryJobs.Context = DefaultContext
canaryCtx.DefaultContext = DefaultContext
SyncCanaryJobs.Run()
setup.ExpectJobToPass(SyncCanaryJobs)
})

ginkgo.It("should verify that the endpoint wasn't called more than once after 3 seconds", func() {
time.Sleep(time.Second * 3)
CanaryScheduler.Stop()
// The job will be called on first schedule and all concurrent jobs would be aborted
Expect(requestCount).To(Equal(1))
})
})
94 changes: 59 additions & 35 deletions pkg/jobs/canary/sync.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package canary

import (
gocontext "context"
"fmt"
"path"
"reflect"
"sync"
"time"

canaryCtx "github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
Expand All @@ -18,12 +21,14 @@ import (
"github.com/robfig/cron/v3"
)

var cronJobs = make(map[string]*job.Job)
var canaryJobs sync.Map

const DefaultCanarySchedule = "@every 5m"

func Unschedule(id string) {
if job := cronJobs[id]; job != nil {
job.Unschedule()
delete(cronJobs, id)
if j, exists := canaryJobs.Load(id); exists {
j.(*job.Job).Unschedule()
canaryJobs.Delete(id)
}
}

Expand All @@ -48,11 +53,16 @@ func TriggerAt(ctx context.Context, dbCanary pkg.Canary, runAt time.Time) error
}

func findJob(dbCanary pkg.Canary) *job.Job {
return cronJobs[dbCanary.ID.String()]
j, exists := canaryJobs.Load(dbCanary.ID.String())
if !exists {
return nil
}
return j.(*job.Job)
}

// TODO: Refactor to use database object instead of kubernetes
func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
id := dbCanary.ID.String()

if disabled := ctx.Properties()["check.*.disabled"]; disabled == "true" {
return nil
}
Expand All @@ -71,21 +81,18 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
_ = cache.PostgresCache.Add(pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result))
}

var schedule = canary.Spec.GetSchedule()

j := cronJobs[canary.GetPersistedID()]
var existingJob *job.Job
if j, ok := canaryJobs.Load(id); ok {
existingJob = j.(*job.Job)
}

if schedule == "@never" {
if j != nil {
Unschedule(canary.GetPersistedID())
}
if canary.Spec.GetSchedule() == "@never" || dbCanary.DeletedAt != nil {
Unschedule(id)
return nil
}

if runner.IsCanaryIgnored(&canary.ObjectMeta) {
if j != nil {
Unschedule(canary.GetPersistedID())
}
Unschedule(id)
return nil
}

Expand All @@ -94,30 +101,47 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
DBCanary: dbCanary,
}

if j == nil {
// Create new job context from empty context to create root spans for cronJobs
jobCtx := ctx.Wrap(gocontext.Background()).WithObject(canary.ObjectMeta)
jobCtx.WithAnyValue("canaryJob", canaryJob)
j = job.NewJob(jobCtx, "Canary", schedule, canaryJob.Run).
SetID(fmt.Sprintf("%s/%s", canary.Namespace, canary.Name))
j.Singleton = true
j.Retention = job.RetentionDay
cronJobs[canary.GetPersistedID()] = j
if err := j.AddToScheduler(CanaryScheduler); err != nil {
return err
}
} else {
j.Context = j.Context.WithAnyValue("canaryJob", canaryJob)
if existingJob == nil {
newCanaryJob(canaryJob)
return nil
}

if j.Schedule != schedule {
if err := j.Reschedule(schedule, CanaryScheduler); err != nil {
return err
}
existingCanary := existingJob.Context.Value("canary")
if existingCanary != nil && !reflect.DeepEqual(existingCanary.(v1.Canary).Spec, canary.Spec) {
ctx.Debugf("Rescheduling %s canary with updated specs", canary)
Unschedule(id)
newCanaryJob(canaryJob)
}

return nil
}

func newCanaryJob(c CanaryJob) {
schedule := c.Canary.Spec.Schedule
if schedule == "" {
schedule = DefaultCanarySchedule
}

j := &job.Job{
Name: "Canary",
Context: canaryCtx.DefaultContext.WithObject(c.Canary.ObjectMeta).WithAnyValue("canary", c.Canary),
Schedule: schedule,
RunNow: true,
Singleton: true,
JobHistory: true,
Retention: job.RetentionDay,
ResourceID: c.DBCanary.ID.String(),
ResourceType: "canary",
ID: fmt.Sprintf("%s/%s", c.Canary.Namespace, c.Canary.Name),
Fn: c.Run,
}

canaryJobs.Store(c.DBCanary.ID.String(), j)
if err := j.AddToScheduler(CanaryScheduler); err != nil {
logger.Errorf("[%s] failed to schedule %v", j.Name, err)
}
}

var SyncCanaryJobs = &job.Job{
Name: "SyncCanaryJobs",
JobHistory: true,
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/topology/topology_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ var SyncTopology = &job.Job{
}

func SyncTopologyJob(ctx context.Context, t pkg.Topology) error {
id := t.ID
id := t.ID.String()
var existingJob *job.Job
if j, ok := topologyJobs.Load(id.String()); ok {
if j, ok := topologyJobs.Load(id); ok {
existingJob = j.(*job.Job)
}

Expand Down

0 comments on commit ce480c5

Please sign in to comment.