diff --git a/cmd/operator.go b/cmd/operator.go index d6b2704c8..0d3d142b2 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -27,20 +27,21 @@ import ( ctrlMetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) -var webhookPort int -var k8sLogLevel int -var enableLeaderElection bool -var operatorExecutor bool -var Operator = &cobra.Command{ - Use: "operator", - Short: "Start the kubernetes operator", - Run: run, -} +var ( + webhookPort int + k8sLogLevel int + enableLeaderElection bool + Operator = &cobra.Command{ + Use: "operator", + Short: "Start the kubernetes operator", + Run: run, + } +) func init() { ServerFlags(Operator.Flags()) Operator.Flags().StringVarP(&runner.WatchNamespace, "namespace", "n", "", "Watch only specified namespace, otherwise watch all") - Operator.Flags().BoolVar(&operatorExecutor, "executor", true, "If false, only serve the UI and sync the configs") + Operator.Flags().BoolVar(&runner.OperatorExecutor, "executor", true, "If false, only serve the UI and sync the configs") Operator.Flags().IntVar(&webhookPort, "webhookPort", 8082, "Port for webhooks ") Operator.Flags().IntVar(&k8sLogLevel, "k8s-log-level", -1, "Kubernetes controller log level") Operator.Flags().BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enabling this will ensure there is only one active controller manager") @@ -77,7 +78,7 @@ func run(cmd *cobra.Command, args []string) { cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext) - if operatorExecutor { + if runner.OperatorExecutor { logger.Infof("Starting executors") // Some synchronous jobs can take time @@ -85,6 +86,7 @@ func run(cmd *cobra.Command, args []string) { // to prevent health check from failing go jobs.Start() } + go serve() ctrl.SetLogger(logr.FromSlogHandler(logger.Handler())) diff --git a/cmd/serve.go b/cmd/serve.go index c32c63e25..b3167910e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -32,9 +32,11 @@ import ( "github.com/spf13/cobra" ) -var schedule, configFile string -var executor bool -var propertiesFile = "canary-checker.properties" +var ( + schedule, configFile string + executor bool + propertiesFile = "canary-checker.properties" +) var Serve = &cobra.Command{ Use: "serve config.yaml", diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 7cf2d47e5..28ca70c30 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,6 +1,7 @@ package canary import ( + "errors" "fmt" "sync" "time" @@ -18,10 +19,13 @@ import ( dutyEcho "github.com/flanksource/duty/echo" dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" + "github.com/google/uuid" "go.opentelemetry.io/otel/trace" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/robfig/cron/v3" "go.opentelemetry.io/otel/attribute" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" ) @@ -194,6 +198,56 @@ func logIfError(err error, description string) { } } +var CleanupCRDDeleteCanaries = &dutyjob.Job{ + Name: "CleanupCRDDeletedCanaries", + Schedule: "@every 1d", + RunNow: true, + Singleton: true, + JobHistory: true, + Retention: dutyjob.RetentionBalanced, + Fn: func(ctx dutyjob.JobRuntime) error { + var crdCanaries []models.Canary + if err := ctx.DB().Select("id", "name", "namespace"). + Where("deleted_at IS NULL"). + Where("agent_id = ?", uuid.Nil.String()). + Where("source LIKE 'kubernetes/%'").Find(&crdCanaries).Error; err != nil { + return fmt.Errorf("failed to list all canaries with source=CRD: %w", err) + } + + if len(crdCanaries) == 0 { + return nil + } + + canaryClient, err := ctx.KubernetesDynamicClient().GetClientByGroupVersionKind(v1.GroupVersion.Group, v1.GroupVersion.Version, "Canary") + if err != nil { + return fmt.Errorf("failed to get kubernetes client for canaries: %w", err) + } + + for _, canary := range crdCanaries { + if _, err := canaryClient.Namespace(canary.Namespace).Get(ctx, canary.Name, metav1.GetOptions{}); err != nil { + var statusErr *apierrors.StatusError + if errors.As(err, &statusErr) { + if statusErr.ErrStatus.Reason == metav1.StatusReasonNotFound { + if err := db.DeleteCanary(ctx.Context, canary.ID.String()); err != nil { + ctx.History.AddErrorf("error deleting canary[%s]: %v", canary.ID, err) + } else { + ctx.History.IncrSuccess() + } + + Unschedule(canary.ID.String()) + + continue + } + } + + return fmt.Errorf("failed to delete canary %s/%s from kubernetes: %w", canary.Namespace, canary.Name, err) + } + } + + return nil + }, +} + var CleanupDeletedCanaryChecks = &dutyjob.Job{ Name: "CleanupDeletedCanaryChecks", Schedule: "@every 1h", @@ -217,7 +271,7 @@ var CleanupDeletedCanaryChecks = &dutyjob.Job{ for _, r := range rows { if err := db.DeleteCanary(ctx.Context, r.ID); err != nil { - ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err)) + ctx.History.AddErrorf("error deleting canary[%s]: %v", r.ID, err) } else { ctx.History.IncrSuccess() } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 7074e28bb..860c08d71 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -6,6 +6,7 @@ import ( "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" topologyJobs "github.com/flanksource/canary-checker/pkg/jobs/topology" + "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/canary-checker/pkg/topology" "github.com/flanksource/commons/logger" dutyEcho "github.com/flanksource/duty/echo" @@ -19,9 +20,10 @@ var FuncScheduler = cron.New() func Start() { logger.Infof("Starting jobs ...") dutyEcho.RegisterCron(FuncScheduler) + if canaryJobs.UpstreamConf.Valid() { for _, j := range canaryJobs.UpstreamJobs { - var job = j + job := j job.Context = context.DefaultContext if err := job.AddToScheduler(FuncScheduler); err != nil { logger.Errorf(err.Error()) @@ -30,7 +32,7 @@ func Start() { } for _, j := range db.CheckStatusJobs { - var job = j + job := j job.Context = context.DefaultContext if err := job.AddToScheduler(FuncScheduler); err != nil { logger.Errorf(err.Error()) @@ -38,7 +40,7 @@ func Start() { } for _, j := range topology.Jobs { - var job = j + job := j job.Context = context.DefaultContext if err := job.AddToScheduler(FuncScheduler); err != nil { logger.Errorf(err.Error()) @@ -46,7 +48,15 @@ func Start() { } for _, j := range []*job.Job{topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, canaryJobs.SyncCanaryJobs, canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob} { - var job = j + job := j + job.Context = context.DefaultContext + if err := job.AddToScheduler(FuncScheduler); err != nil { + logger.Errorf(err.Error()) + } + } + + if runner.OperatorExecutor { + job := canaryJobs.CleanupCRDDeleteCanaries job.Context = context.DefaultContext if err := job.AddToScheduler(FuncScheduler); err != nil { logger.Errorf(err.Error()) diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index cd4552927..090aed16f 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -7,6 +7,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// OperatorExecutor when true means the application is serving as the k8s operator +var OperatorExecutor bool + var RunnerName string var Version string