diff --git a/api/context/context.go b/api/context/context.go index 46700dce0..16afffbd2 100644 --- a/api/context/context.go +++ b/api/context/context.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/commons/logger" ctemplate "github.com/flanksource/commons/template" "github.com/flanksource/duty" + dutyCtx "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" "github.com/flanksource/kommons" @@ -19,6 +20,8 @@ import ( "k8s.io/client-go/kubernetes" ) +var DefaultContext dutyCtx.Context + type KubernetesContext struct { gocontext.Context Kommons *kommons.Client diff --git a/api/v1/canary_types.go b/api/v1/canary_types.go index 7528739ba..adca8d8d3 100644 --- a/api/v1/canary_types.go +++ b/api/v1/canary_types.go @@ -25,6 +25,7 @@ import ( "github.com/flanksource/canary-checker/api/external" "github.com/flanksource/commons/logger" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) type ResultMode string @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string { return check.GetEndpoint() } +func (c Canary) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: c.Name, Namespace: c.Namespace} +} + func (c *Canary) SetRunnerName(name string) { c.Status.runnerName = name } diff --git a/cmd/operator.go b/cmd/operator.go index 6c9c473c8..3c47638b8 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -1,9 +1,11 @@ package cmd import ( + gocontext "context" "os" "time" + apicontext "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs" @@ -16,9 +18,12 @@ import ( "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/controllers" "github.com/flanksource/canary-checker/pkg/labels" + commonsCtx "github.com/flanksource/commons/context" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" "github.com/go-logr/zapr" "github.com/spf13/cobra" + "go.opentelemetry.io/otel" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) { if err := db.Init(); err != nil { logger.Fatalf("error connecting with postgres: %v", err) } + kommonsClient, k8s, err := pkg.NewKommonsClient() + if err != nil { + logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err) + } + + apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))). + WithDB(db.Gorm, db.Pool). + WithKubernetes(k8s). + WithKommons(kommonsClient). + WithNamespace(runner.WatchNamespace) + cache.PostgresCache = cache.NewPostgresCache(db.Pool) if operatorExecutor { logger.Infof("Starting executors") diff --git a/go.mod b/go.mod index 5abc126f6..6c18e5811 100644 --- a/go.mod +++ b/go.mod @@ -269,3 +269,5 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) + +replace github.com/flanksource/duty => ../duty diff --git a/go.sum b/go.sum index 8240c2962..4ef143d62 100644 --- a/go.sum +++ b/go.sum @@ -817,6 +817,14 @@ github.com/flanksource/commons v1.17.0 h1:rSahn6c4vyq3bPC5jsayET4y8TECRz6Q8NbooI github.com/flanksource/commons v1.17.0/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U= github.com/flanksource/duty v1.0.201 h1:c8r02bfuF47E2svK+qXCLHKaSqOCZZHKPj+v54eimqc= github.com/flanksource/duty v1.0.201/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= +github.com/flanksource/commons v1.15.0 h1:p74hrKzIz0r3H8YN3CuB8ePJOjzPFO0BRLVmpXmeqvY= +github.com/flanksource/commons v1.15.0/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= +github.com/flanksource/commons v1.15.1 h1:cFvxQd5SBFe+q16ciz8Q2IeBMeQ7+atdACGanbW27hg= +github.com/flanksource/commons v1.15.1/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs= +github.com/flanksource/duty v1.0.191 h1:acnvyTeQlfqmtyXxWprNFGK/vBTUlqkYwxEPLtXSPrk= +github.com/flanksource/duty v1.0.191/go.mod h1:ikyl/TcRy6Cc0R5b0wEHT7CecV7gyJvrDGq/4oIZHoc= +github.com/flanksource/duty v1.0.197 h1:KRw4EPAD2kcqNPkipnkHzlbf5wmLqg3JgtXqiPzCLhw= +github.com/flanksource/duty v1.0.197/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.18 h1:qYiznMxhq+Zau5iWnVzW1yDzA1deHOsmo6yldCN7JhQ= github.com/flanksource/gomplate/v3 v3.20.18/go.mod h1:2GgHZ2vWmtDspJMBfUIryOuzJSwc8jU7Kw9fDLr0TMA= @@ -1127,6 +1135,10 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI= github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= +github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= +github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= +github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo= +github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/henvic/httpretty v0.1.2 h1:EQo556sO0xeXAjP10eB+BZARMuvkdGqtfeS4Ntjvkiw= github.com/henvic/httpretty v0.1.2/go.mod h1:ViEsly7wgdugYtymX54pYp6Vv2wqZmNHayJ6q8tlKCc= github.com/hirochachacha/go-smb2 v1.1.0 h1:b6hs9qKIql9eVXAiN0M2wSFY5xnhbHAQoCwRKbaRTZI= @@ -1507,6 +1519,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= +github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA= github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= @@ -2300,6 +2314,13 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0= +gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8= +gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= +gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= +gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= +gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/plugin/prometheus v0.0.0-20230504115745-1aec2356381b h1:uHPZdwwf4+AVvAEgZ/LQR1UTub8LJ2nh0wQDW3Dt4jE= @@ -2345,6 +2366,8 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 98686bb52..56ec2958c 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c // Sync jobs if canary is created or updated if canary.Generation == 1 { - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { logger.Error(err, "failed to sync canary job") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err } @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary } r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration) - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { return nil, err } return dbCanary, nil diff --git a/pkg/db/canary.go b/pkg/db/canary.go index fa21d4ddc..c98ce72c2 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -1,7 +1,7 @@ package db import ( - "context" + gocontext "context" "encoding/json" "errors" "fmt" @@ -15,6 +15,7 @@ import ( "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" "github.com/flanksource/duty" + "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" dutyTypes "github.com/flanksource/duty/types" "github.com/google/uuid" @@ -23,7 +24,7 @@ import ( "gorm.io/gorm/clause" ) -func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { +func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) { query := ` SELECT json_agg( jsonb_set_lax(to_jsonb(canaries),'{checks}', ( @@ -49,7 +50,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { args["namespace"] = namespace } - rows, err := Pool.Query(context.Background(), query, args) + rows, err := ctx.Pool().Query(ctx, query, args) if err != nil { return nil, err } @@ -125,9 +126,9 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) { return check.ID, nil } -func GetTransformedCheckIDs(canaryID string) ([]string, error) { +func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) { var ids []string - err := Gorm.Table("checks"). + err := ctx.DB().Table("checks"). Select("id"). Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID). Find(&ids). @@ -135,7 +136,7 @@ func GetTransformedCheckIDs(canaryID string) ([]string, error) { return ids, err } -func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { +func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error { if len(ids) == 0 { return nil } @@ -158,12 +159,12 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { }) } } - return Gorm.Table("check_statuses"). + return ctx.DB().Table("check_statuses"). Create(objs). Error } -func RemoveTransformedChecks(ids []string) error { +func RemoveTransformedChecks(ctx context.Context, ids []string) error { if len(ids) == 0 { return nil } @@ -171,7 +172,7 @@ func RemoveTransformedChecks(ids []string) error { "deleted_at": gorm.Expr("NOW()"), } - return Gorm.Table("checks"). + return ctx.DB().Table("checks"). Where("id in (?)", ids). Where("transformed = true"). Updates(updates). @@ -278,7 +279,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) { return &model, nil } -func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) { +func FindDeletedChecksSince(ctx gocontext.Context, since time.Time) ([]string, error) { var ids []string err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error return ids, err diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 5fb8fd60a..04767d649 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/flanksource/canary-checker/api/context" + canarycontext "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/checks" "github.com/flanksource/canary-checker/pkg" @@ -17,6 +17,8 @@ import ( "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" + dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/kommons" "github.com/robfig/cron/v3" @@ -51,6 +53,7 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) { type CanaryJob struct { *kommons.Client + dutyjob.JobRuntime Kubernetes kubernetes.Interface Canary v1.Canary DBCanary pkg.Canary @@ -65,21 +68,35 @@ func (job CanaryJob) GetNamespacedName() types.NamespacedName { var minimumTimeBetweenCanaryRuns = 10 * time.Second var canaryLastRuntimes = sync.Map{} -func (job CanaryJob) Run() { - if runner.IsCanaryIgnored(&job.Canary.ObjectMeta) { - return +func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error { + if len(args) != 2 { + return fmt.Errorf("wrong arg count for SyncCanary: %d", len(args)) + } + dbCanary, ok := args[0].(pkg.Canary) + if !ok { + return fmt.Errorf("wrong arg type for dbCanary: %T", args[0]) + } + + canary, ok := args[1].(v1.Canary) + if !ok { + return fmt.Errorf("wrong arg type for canary: %T", args[1]) + } + + if runner.IsCanaryIgnored(&canary.ObjectMeta) { + return nil } - canaryID := job.DBCanary.ID.String() + + canaryID := dbCanary.ID.String() val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{}) lock, ok := val.(*sync.Mutex) if !ok { logger.Warnf("expected mutex but got %T for canary(id=%s)", lock, canaryID) - return + return nil } if !lock.TryLock() { logger.Debugf("canary (id=%s) is already running. skipping this run ...", canaryID) - return + return nil } defer lock.Unlock() @@ -93,25 +110,28 @@ func (job CanaryJob) Run() { // Skip run if job ran too recently if lastRunDelta < minimumTimeBetweenCanaryRuns { - logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, job.GetNamespacedName(), lastRunDelta.Seconds()) - return + logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, canary.GetNamespacedName(), lastRunDelta.Seconds()) + return nil } // Get transformed checks before and after, and then delete the olds ones that are not in new set - existingTransformedChecks, _ := db.GetTransformedCheckIDs(canaryID) + existingTransformedChecks, _ := db.GetTransformedCheckIDs(ctx.Context, canaryID) var transformedChecksCreated []string // Transformed checks have a delete strategy // On deletion they can either be marked healthy, unhealthy or left as is checkIDDeleteStrategyMap := make(map[string]string) - results, err := checks.RunChecks(job.NewContext()) + canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), canary) + results, err := checks.RunChecks(canaryCtx) if err != nil { - logger.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - job.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - return + logger.Errorf("error running checks for canary %s: %v", canaryID, err) + return nil } + // TODO: Use ctx with object here + logPass := canary.IsTrace() || canary.IsDebug() || LogPass + logFail := canary.IsTrace() || canary.IsDebug() || LogFail for _, result := range results { - if job.LogPass && result.Pass || job.LogFail && !result.Pass { + if logPass && result.Pass || logFail && !result.Pass { logger.Infof(result.String()) } transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) @@ -120,7 +140,7 @@ func (job CanaryJob) Run() { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } - job.updateStatusAndEvent(results) + updateCanaryStatusAndEvent(canary, results) checkDeleteStrategyGroup := make(map[string][]string) checksToRemove := utils.SetDifference(existingTransformedChecks, transformedChecksCreated) @@ -137,10 +157,10 @@ func (job CanaryJob) Run() { checkDeleteStrategyGroup[status] = append(checkDeleteStrategyGroup[status], checkID) } for status, checkIDs := range checkDeleteStrategyGroup { - if err := db.AddCheckStatuses(checkIDs, models.CheckHealthStatus(status)); err != nil { + if err := db.AddCheckStatuses(ctx.Context, checkIDs, models.CheckHealthStatus(status)); err != nil { logger.Errorf("error adding statuses for transformed checks: %v", err) } - if err := db.RemoveTransformedChecks(checkIDs); err != nil { + if err := db.RemoveTransformedChecks(ctx.Context, checkIDs); err != nil { logger.Errorf("error deleting transformed checks for canary %s: %v", canaryID, err) } } @@ -148,13 +168,14 @@ func (job CanaryJob) Run() { // Update last runtime map canaryLastRuntimes.Store(canaryID, time.Now()) + return nil } -func (job *CanaryJob) NewContext() *context.Context { - return context.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) +func (job *CanaryJob) NewContext() *canarycontext.Context { + return canarycontext.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) } -func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { +func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { if CanaryStatusChannel == nil { return } @@ -175,8 +196,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { duration += result.Duration // Set uptime and latency - uptime, latency := metrics.Record(job.Canary, result) - checkKey := job.Canary.GetKey(result.Check) + uptime, latency := metrics.Record(canary, result) + checkKey := canary.GetKey(result.Check) checkStatus[checkKey] = &v1.CheckStatus{} checkStatus[checkKey].Uptime1H = uptime.String() checkStatus[checkKey].Latency1H = latency.String() @@ -192,8 +213,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { // Transition q := cache.QueryParams{Check: checkKey, StatusCount: 1} - if job.Canary.Status.LastTransitionedTime != nil { - q.Start = job.Canary.Status.LastTransitionedTime.Format(time.RFC3339) + if canary.Status.LastTransitionedTime != nil { + q.Start = canary.Status.LastTransitionedTime.Format(time.RFC3339) } lastStatus, err := cache.PostgresCache.Query(q) if err != nil || len(lastStatus) == 0 || len(lastStatus[0].Statuses) == 0 { @@ -206,7 +227,7 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { lastTransitionedTime = &metav1.Time{Time: time.Now()} } - push.Queue(pkg.FromV1(job.Canary, result.Check), pkg.FromResult(*result)) + push.Queue(pkg.FromV1(canary, result.Check), pkg.FromResult(*result)) // Update status message if len(messages) == 1 { @@ -235,7 +256,7 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { ErrorMessage: errorMsg, Uptime: uptimeAgg.String(), Latency: utils.Age(time.Duration(highestLatency) * time.Millisecond), - NamespacedName: job.GetNamespacedName(), + NamespacedName: canary.GetNamespacedName(), } CanaryStatusChannel <- payload @@ -255,7 +276,7 @@ type CanaryStatusPayload struct { func findCronEntry(id string) *cron.Entry { for _, entry := range CanaryScheduler.Entries() { - if entry.Job.(CanaryJob).DBCanary.ID.String() == id { + if entry.Job.(*dutyjob.Job).ID == id { return &entry } } @@ -265,7 +286,7 @@ func findCronEntry(id string) *cron.Entry { func getAllCanaryIDsInCron() []string { var ids []string for _, entry := range CanaryScheduler.Entries() { - ids = append(ids, entry.Job.(CanaryJob).DBCanary.ID.String()) + ids = append(ids, entry.Job.(*dutyjob.Job).ID) } return ids } @@ -295,7 +316,7 @@ func ScanCanaryConfigs() { var canaryUpdateTimeCache = sync.Map{} // TODO: Refactor to use database object instead of kubernetes -func SyncCanaryJob(dbCanary pkg.Canary) error { +func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { canary, err := dbCanary.ToV1() if err != nil { return err @@ -313,21 +334,14 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { if Kommons == nil { var err error Kommons, Kubernetes, err = pkg.NewKommonsClient() + ctx = ctx.WithKommons(Kommons).WithKubernetes(Kubernetes) if err != nil { logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err) } } - job := CanaryJob{ - Client: Kommons, - Kubernetes: Kubernetes, - Canary: *canary, - DBCanary: dbCanary, - LogPass: canary.IsTrace() || canary.IsDebug() || LogPass, - LogFail: canary.IsTrace() || canary.IsDebug() || LogFail, - } - updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) + newJob := dutyjob.NewJob(ctx, "SyncCanaryJob", canary.Spec.GetSchedule(), SyncCanary, dbCanary, canary).SetID(dbCanary.ID.String()) entry := findCronEntry(dbCanary.ID.String()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists @@ -336,11 +350,11 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { } // Schedule canary for the first time - entryID, err := CanaryScheduler.AddJob(canary.Spec.GetSchedule(), job) - if err != nil { + if err := newJob.AddToScheduler(CanaryScheduler); err != nil { return fmt.Errorf("failed to schedule canary %s/%s: %v", canary.Namespace, canary.Name, err) } - entry = utils.Ptr(CanaryScheduler.Entry(entryID)) + + entry = newJob.GetEntry(CanaryScheduler) logger.Infof("Scheduled %s: %s", canary, canary.Spec.GetSchedule()) canaryUpdateTimeCache.Store(dbCanary.ID.String(), dbCanary.UpdatedAt) @@ -354,17 +368,17 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { return nil } -func SyncCanaryJobs() { - logger.Debugf("Syncing canary jobs") +func SyncCanaryJobs(ctx dutyjob.JobRuntime, _ ...any) error { + ctx.Debugf("Syncing canary jobs") - canaries, err := db.GetAllCanariesForSync(runner.WatchNamespace) + canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) if err != nil { logger.Errorf("Failed to get canaries: %v", err) jobHistory := models.NewJobHistory("SyncCanaries", "canary", "").Start() logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [SyncCanaries]") - return + return err } existingIDsInCron := getAllCanaryIDsInCron() @@ -373,7 +387,7 @@ func SyncCanaryJobs() { jobHistory := models.NewJobHistory("CanarySync", "canary", c.ID.String()).Start() idsInNewFetch = append(idsInNewFetch, c.ID.String()) - if err := SyncCanaryJob(c); err != nil { + if err := SyncCanaryJob(ctx.Context, c); err != nil { logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error()) logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [CanarySync]") continue @@ -386,6 +400,7 @@ func SyncCanaryJobs() { } logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries())) + return nil } func DeleteCanaryJob(id string) { diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 7a31b448a..e16d95f11 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -37,12 +37,12 @@ const ( // that are missing on the upstream. func ReconcileChecks() { jobHistory := models.NewJobHistory("PushChecksToUpstream", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - ctx := dutyContext.NewContext(gocontext.TODO()).WithDB(db.Gorm, db.Pool) reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5) - if err := reconciler.SyncAfter(ctx, "checks", ReconcileMaxAge); err != nil { + if err := reconciler.SyncAfter(context.DefaultContext, "checks", ReconcileMaxAge); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("failed to sync table 'checks': %v", err) } else { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index bcc06ebde..a4cd258e8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" + "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system" @@ -10,6 +11,7 @@ import ( "github.com/flanksource/canary-checker/pkg/topology/checks" "github.com/flanksource/canary-checker/pkg/topology/configs" "github.com/flanksource/commons/logger" + dutyjob "github.com/flanksource/duty/job" "github.com/robfig/cron/v3" ) @@ -67,9 +69,11 @@ func Start() { } } - if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil { - logger.Errorf("Failed to schedule sync jobs for canary: %v", err) + if err := dutyjob.NewJob(context.DefaultContext, "SyncCanaryJobs", SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs). + RunOnStart().AddToScheduler(FuncScheduler); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.SyncCanaryJobs]: %v", err) } + if _, err := ScheduleFunc(SyncSystemsJobsSchedule, systemJobs.SyncTopologyJobs); err != nil { logger.Errorf("Failed to schedule sync jobs for systems: %v", err) } @@ -114,7 +118,6 @@ func Start() { } canaryJobs.CleanupMetricsGauges() - canaryJobs.SyncCanaryJobs() systemJobs.SyncTopologyJobs() } diff --git a/pkg/topology/checks/component_check.go b/pkg/topology/checks/component_check.go index 553e671b3..e1d8e8cb1 100644 --- a/pkg/topology/checks/component_check.go +++ b/pkg/topology/checks/component_check.go @@ -3,6 +3,7 @@ package checks import ( "time" + "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -70,7 +71,7 @@ func GetCheckComponentRelationshipsForComponent(component *pkg.Component) (relat logger.Debugf("error creating canary from inline: %v", err) } - if err := canaryJobs.SyncCanaryJob(*canary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *canary); err != nil { logger.Debugf("error creating canary job: %v", err) }