diff --git a/Makefile b/Makefile index debc306bc..f7d7f9fa0 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,9 @@ gen-schemas: # Generate manifests e.g. CRD, RBAC etc. manifests: .bin/controller-gen + # For debugging + yq -V + schemaPath=.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties .bin/controller-gen crd paths="./api/..." output:stdout | yq ea -P '[.] | sort_by(.metadata.name) | .[] | splitDoc' - > config/deploy/crd.yaml $(MAKE) gen-schemas diff --git a/api/v1/canary_types.go b/api/v1/canary_types.go index b3a5feec0..6283ede46 100644 --- a/api/v1/canary_types.go +++ b/api/v1/canary_types.go @@ -36,6 +36,10 @@ const ( // CanarySpec defines the desired state of Canary type CanarySpec struct { + //+kubebuilder:default=1 + //+optional + Replicas int `yaml:"replicas,omitempty" json:"replicas,omitempty"` + Env map[string]VarSource `yaml:"env,omitempty" json:"env,omitempty"` HTTP []HTTPCheck `yaml:"http,omitempty" json:"http,omitempty"` DNS []DNSCheck `yaml:"dns,omitempty" json:"dns,omitempty"` @@ -281,6 +285,8 @@ type CanaryStatus struct { Latency1H string `json:"latency1h,omitempty"` // used for keeping history of the checks runnerName string `json:"-"` + // Replicas keep track of the number of replicas + Replicas int `json:"replicas,omitempty"` } func (c Canary) GetCheckID(checkName string) string { @@ -305,6 +311,7 @@ type CheckStatus struct { // +kubebuilder:object:root=true // Canary is the Schema for the canaries API +// +kubebuilder:printcolumn:name="Replicas",type=integer,priority=1,JSONPath=`.spec.replicas` // +kubebuilder:printcolumn:name="Interval",type=string,JSONPath=`.spec.interval` // +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.status` // +kubebuilder:printcolumn:name="Last Check",type=date,JSONPath=`.status.lastCheck` @@ -314,6 +321,7 @@ type CheckStatus struct { // +kubebuilder:printcolumn:name="Message",type=string,priority=1,JSONPath=`.status.message` // +kubebuilder:printcolumn:name="Error",type=string,priority=1,JSONPath=`.status.errorMessage` // +kubebuilder:subresource:status +// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas type Canary struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/config/deploy/crd.yaml b/config/deploy/crd.yaml index 667926214..6029530c4 100644 --- a/config/deploy/crd.yaml +++ b/config/deploy/crd.yaml @@ -1,3 +1,4 @@ +--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: @@ -15,6 +16,10 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: + - jsonPath: .spec.replicas + name: Replicas + priority: 1 + type: integer - jsonPath: .spec.interval name: Interval type: string @@ -5191,6 +5196,9 @@ spec: - name type: object type: array + replicas: + default: 1 + type: integer restic: items: properties: @@ -5729,6 +5737,9 @@ spec: type: integer persistedID: type: string + replicas: + description: Replicas keep track of the number of replicas + type: integer status: type: string uptime1h: @@ -5739,6 +5750,9 @@ spec: served: true storage: true subresources: + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas status: {} --- apiVersion: apiextensions.k8s.io/v1 diff --git a/config/deploy/manifests.yaml b/config/deploy/manifests.yaml index 64a9329a8..b15615d77 100644 --- a/config/deploy/manifests.yaml +++ b/config/deploy/manifests.yaml @@ -15,6 +15,10 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: + - jsonPath: .spec.replicas + name: Replicas + priority: 1 + type: integer - jsonPath: .spec.interval name: Interval type: string @@ -5191,6 +5195,9 @@ spec: - name type: object type: array + replicas: + default: 1 + type: integer restic: items: properties: @@ -5729,6 +5736,9 @@ spec: type: integer persistedID: type: string + replicas: + description: Replicas keep track of the number of replicas + type: integer status: type: string uptime1h: @@ -5739,6 +5749,9 @@ spec: served: true storage: true subresources: + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas status: {} --- apiVersion: v1 diff --git a/config/schemas/canary.schema.json b/config/schemas/canary.schema.json index c317261e4..8497671a8 100644 --- a/config/schemas/canary.schema.json +++ b/config/schemas/canary.schema.json @@ -427,6 +427,9 @@ }, "CanarySpec": { "properties": { + "replicas": { + "type": "integer" + }, "env": { "patternProperties": { ".*": { @@ -726,6 +729,9 @@ }, "latency1h": { "type": "string" + }, + "replicas": { + "type": "integer" } }, "additionalProperties": false, diff --git a/config/schemas/component.schema.json b/config/schemas/component.schema.json index 5728c0490..1d9139a31 100644 --- a/config/schemas/component.schema.json +++ b/config/schemas/component.schema.json @@ -406,6 +406,9 @@ }, "CanarySpec": { "properties": { + "replicas": { + "type": "integer" + }, "env": { "patternProperties": { ".*": { diff --git a/config/schemas/topology.schema.json b/config/schemas/topology.schema.json index e9ad2d276..c3cb4e0aa 100644 --- a/config/schemas/topology.schema.json +++ b/config/schemas/topology.schema.json @@ -406,6 +406,9 @@ }, "CanarySpec": { "properties": { + "replicas": { + "type": "integer" + }, "env": { "patternProperties": { ".*": { diff --git a/pkg/api.go b/pkg/api.go index 24ea0e38c..265b67954 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -12,6 +12,7 @@ import ( v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg/labels" "github.com/flanksource/canary-checker/pkg/utils" + "github.com/flanksource/commons/collections" "github.com/flanksource/commons/console" "github.com/flanksource/commons/logger" cUtils "github.com/flanksource/commons/utils" @@ -118,17 +119,18 @@ type Timeseries struct { } type Canary struct { - ID uuid.UUID `gorm:"default:generate_ulid()"` - AgentID uuid.UUID - Spec types.JSON `json:"spec"` - Labels types.JSONStringMap `json:"labels"` - Source string - Name string - Namespace string - Checks types.JSONStringMap `gorm:"-"` - CreatedAt time.Time - UpdatedAt time.Time `json:"updated_at"` - DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"` + ID uuid.UUID `gorm:"default:generate_ulid()"` + AgentID uuid.UUID + Spec types.JSON `json:"spec"` + Labels types.JSONStringMap `json:"labels"` + Source string + Name string + Namespace string + Checks types.JSONStringMap `gorm:"-"` + Annotations types.JSONStringMap `json:"annotations,omitempty"` + CreatedAt time.Time + UpdatedAt time.Time `json:"updated_at"` + DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"` } func (c Canary) GetCheckID(checkName string) string { @@ -156,7 +158,10 @@ func (c Canary) ToV1() (*v1.Canary, error) { logger.Debugf("Failed to unmarshal canary spec: %s", err) return nil, err } + canary.Status.Checks = c.Checks + canary.ObjectMeta.Annotations = collections.MergeMap(canary.ObjectMeta.Annotations, c.Annotations) + return &canary, nil } diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 56ec2958c..6815674b1 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -18,6 +18,7 @@ package controllers import ( gocontext "context" + "fmt" "time" "github.com/flanksource/canary-checker/pkg/db" @@ -25,10 +26,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/flanksource/canary-checker/api/context" + v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" + dutyContext "github.com/flanksource/duty/context" "github.com/flanksource/kommons" "github.com/go-logr/logr" jsontime "github.com/liamylian/jsontime/v2/v2" @@ -67,7 +70,9 @@ const FinalizerName = "canary.canaries.flanksource.com" // +kubebuilder:rbac:groups=canaries.flanksource.com,resources=canaries/status,verbs=get;update;patch // +kubebuilder:rbac:groups="",resources=pods/exec,verbs=* // +kubebuilder:rbac:groups="",resources=pods/logs,verbs=* -func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Request) (ctrl.Result, error) { + ctx := context.DefaultContext.Wrap(parentCtx) + logger := r.Log.WithValues("canary", req.NamespacedName) canary := &v1.Canary{} err := r.Get(ctx, req.NamespacedName, canary) @@ -82,6 +87,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c } canary.SetRunnerName(r.RunnerName) + // Add finalizer first if not exist to avoid the race condition between init and delete if !controllerutil.ContainsFinalizer(canary, FinalizerName) { controllerutil.AddFinalizer(canary, FinalizerName) @@ -99,14 +105,14 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c return ctrl.Result{}, r.Update(ctx, canary) } - dbCanary, err := r.updateCanaryInDB(canary) + dbCanary, err := r.updateCanaryInDB(ctx, canary) if err != nil { return ctrl.Result{Requeue: true}, err } // Sync jobs if canary is created or updated if canary.Generation == 1 { - if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil { logger.Error(err, "failed to sync canary job") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err } @@ -121,12 +127,55 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c } patch := client.MergeFrom(canaryForStatus.DeepCopy()) + if val, ok := canary.Annotations["next-runtime"]; ok { + runAt, err := time.Parse(time.RFC3339, val) + if err != nil { + return ctrl.Result{}, err + } + + var syncCanaryJobOptions = []canaryJobs.SyncCanaryJobOption{} + syncCanaryJobOptions = append(syncCanaryJobOptions, canaryJobs.WithSchedule( + fmt.Sprintf("%d %d %d %d *", runAt.Minute(), runAt.Hour(), runAt.Day(), runAt.Month()), + )) + if !runAt.After(time.Now()) { + syncCanaryJobOptions = append(syncCanaryJobOptions, canaryJobs.WithRunNow(true)) + } + + if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary, syncCanaryJobOptions...); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err + } + + delete(canary.Annotations, "next-runtime") + if err := r.Update(ctx, canary); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err + } + } + + if canaryForStatus.Status.Replicas != canary.Spec.Replicas { + if canary.Spec.Replicas == 0 { + canaryJobs.DeleteCanaryJob(canary.GetPersistedID()) + if err := db.SuspendCanary(ctx, canary.GetPersistedID(), true); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err + } + } else { + if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err + } + if err := db.SuspendCanary(ctx, canary.GetPersistedID(), false); err != nil { + logger.Error(err, "failed to suspend canary") + } + } + + canaryForStatus.Status.Replicas = canary.Spec.Replicas + } + canaryForStatus.Status.Checks = dbCanary.Checks canaryForStatus.Status.ObservedGeneration = canary.Generation if err = r.Status().Patch(ctx, &canaryForStatus, patch); err != nil { logger.Error(err, "failed to update status for canary") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err } + return ctrl.Result{}, nil } @@ -137,26 +186,26 @@ func (r *CanaryReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary, error) { +func (r *CanaryReconciler) persistAndCacheCanary(ctx dutyContext.Context, canary *v1.Canary) (*pkg.Canary, error) { dbCanary, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID()) if err != nil { return nil, err } r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration) - if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil { return nil, err } return dbCanary, nil } -func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, error) { +func (r *CanaryReconciler) updateCanaryInDB(ctx dutyContext.Context, canary *v1.Canary) (*pkg.Canary, error) { var dbCanary *pkg.Canary var err error // Get DBCanary from cache if exists else persist in database and update cache if cacheObj, exists := r.CanaryCache.Get(canary.GetPersistedID()); !exists { - dbCanary, err = r.persistAndCacheCanary(canary) + dbCanary, err = r.persistAndCacheCanary(ctx, canary) if err != nil { return nil, err } @@ -172,7 +221,7 @@ func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, err } opts := jsondiff.DefaultJSONOptions() if diff, _ := jsondiff.Compare(canarySpecJSON, dbCanary.Spec, &opts); diff != jsondiff.FullMatch { - dbCanary, err = r.persistAndCacheCanary(canary) + dbCanary, err = r.persistAndCacheCanary(ctx, canary) if err != nil { return nil, err } diff --git a/pkg/db/canary.go b/pkg/db/canary.go index a781a5ddd..a1c16993d 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -511,3 +511,27 @@ func CleanupCanaries() { jobHistory.IncrSuccess() } } + +// SuspendCanary sets the suspend annotation on the canary table. +func SuspendCanary(ctx context.Context, id string, suspend bool) error { + query := ` + UPDATE canaries + SET annotations = + CASE + WHEN annotations IS NULL THEN '{"suspend": "true"}'::jsonb + ELSE jsonb_set(annotations, '{suspend}', '"true"') + END + WHERE id = ?; + ` + + if !suspend { + query = ` + UPDATE canaries + SET annotations = + CASE WHEN annotations IS NOT NULL THEN annotations - 'suspend' END + WHERE id = ?; + ` + } + + return ctx.DB().Exec(query, id).Error +} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 3a98fd670..e80d5463f 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -318,8 +318,35 @@ func ScanCanaryConfigs() { var canaryUpdateTimeCache = sync.Map{} +type SyncCanaryJobConfig struct { + RunNow bool + + // Schedule to override the schedule from the spec + Schedule string +} + +func WithRunNow(value bool) SyncCanaryJobOption { + return func(config *SyncCanaryJobConfig) { + config.RunNow = value + } +} + +func WithSchedule(schedule string) SyncCanaryJobOption { + return func(config *SyncCanaryJobConfig) { + config.Schedule = schedule + } +} + +type SyncCanaryJobOption func(*SyncCanaryJobConfig) + // TODO: Refactor to use database object instead of kubernetes -func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { +func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary, options ...SyncCanaryJobOption) error { + // Apply options to the configuration + syncOption := &SyncCanaryJobConfig{} + for _, option := range options { + option(syncOption) + } + canary, err := dbCanary.ToV1() if err != nil { return err @@ -331,7 +358,17 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { _ = cache.PostgresCache.Add(pkg.FromV1(*canary, canary.Spec.Webhook), pkg.CheckStatusFromResult(*result)) } - if canary.Spec.GetSchedule() == "@never" { + var ( + schedule = syncOption.Schedule + scheduleID = dbCanary.ID.String() + "-scheduled" + ) + + if schedule == "" { + schedule = canary.Spec.GetSchedule() + scheduleID = dbCanary.ID.String() + } + + if schedule == "@never" { DeleteCanaryJob(canary.GetPersistedID()) return nil } @@ -357,9 +394,15 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { // Create new job context from empty context to create root spans for jobs jobCtx := ctx.Wrap(gocontext.Background()).WithObject(canary.ObjectMeta) - newJob := dutyjob.NewJob(jobCtx, "SyncCanaryJob", canary.Spec.GetSchedule(), cj.Run).SetID(dbCanary.ID.String()) - entry := findCronEntry(dbCanary.ID.String()) - if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { + newJob := dutyjob.NewJob(jobCtx, "SyncCanaryJob", schedule, cj.Run).SetID(scheduleID) + entry := findCronEntry(scheduleID) + + shouldSchedule := !exists || // updated time cache was not found. So we reschedule anyway. + dbCanary.UpdatedAt.After(updateTime.(time.Time)) || // the spec has been updated since it was last scheduled + entry == nil || // the canary is not scheduled yet + syncOption.Schedule != "" // custom schedule so we always need to reschedule + + if shouldSchedule { // Remove entry if it exists if entry != nil { CanaryScheduler.Remove(entry.ID) @@ -371,13 +414,13 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { } entry = newJob.GetEntry(CanaryScheduler) - logger.Infof("Scheduled %s: %s", canary, canary.Spec.GetSchedule()) + logger.Infof("Scheduled %s (%s). Next run: %v", canary, schedule, entry.Next) canaryUpdateTimeCache.Store(dbCanary.ID.String(), dbCanary.UpdatedAt) } // Run all regularly scheduled canaries on startup (<1h) and not daily/weekly schedules - if entry != nil && time.Until(entry.Next) < 1*time.Hour && !exists { + if (entry != nil && time.Until(entry.Next) < 1*time.Hour && !exists) || syncOption.RunNow { go entry.Job.Run() }