From 51a6cef1059f1d2f105a9b51a94a6705e83cbe5e Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 21 Oct 2024 21:03:11 +0545 Subject: [PATCH] feat: leader election --- cmd/operator.go | 40 ++++++++++++++++++++++++++++------------ cmd/root.go | 10 ++++++---- cmd/run.go | 2 +- cmd/topology.go | 2 +- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/cmd/operator.go b/cmd/operator.go index 0d0f54640..e9091f9a2 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -18,7 +18,9 @@ import ( "github.com/flanksource/canary-checker/pkg/controllers" "github.com/flanksource/canary-checker/pkg/labels" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/job" dutyKubernetes "github.com/flanksource/duty/kubernetes" + "github.com/flanksource/duty/leader" "github.com/flanksource/duty/shutdown" "github.com/spf13/cobra" "go.opentelemetry.io/otel" @@ -37,10 +39,10 @@ var ( Use: "operator", Short: "Start the kubernetes operator", Run: func(cmd *cobra.Command, args []string) { - if err := run(cmd, args); err != nil { + if err := run(); err != nil { shutdown.ShutdownAndExit(1, err.Error()) } else { - shutdown.ShutdownAndExit(0, err.Error()) + shutdown.ShutdownAndExit(0, "") } }, } @@ -56,15 +58,7 @@ func init() { // +kubebuilder:scaffold:scheme } -func run(cmd *cobra.Command, args []string) error { - logger := logger.GetLogger("operator") - logger.SetLogLevel(k8sLogLevel) - - scheme := runtime.NewScheme() - - _ = clientgoscheme.AddToScheme(scheme) - _ = canaryv1.AddToScheme(scheme) - +func run() error { ctx, err := InitContext() if err != nil { return err @@ -78,12 +72,23 @@ func run(cmd *cobra.Command, args []string) error { return errors.New("operator requires a kubernetes connection") } - ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker")) + ctx.WithTracer(otel.GetTracerProvider().Tracer(app)) apicontext.DefaultContext = ctx.WithNamespace(runner.WatchNamespace) cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext) + if enableLeaderElection { + job.DisableCronStartOnSchedule() + + go func() { + err := leader.Register(ctx, app, runner.WatchNamespace, nil, nil, nil) + if err != nil { + shutdown.ShutdownAndExit(1, fmt.Sprintf("leader election failed: %v", err)) + } + }() + } + if runner.OperatorExecutor { logger.Infof("Starting executors") @@ -94,6 +99,16 @@ func run(cmd *cobra.Command, args []string) error { } go serve() + return startControllers() +} + +func startControllers() error { + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = canaryv1.AddToScheme(scheme) + + logger := logger.GetLogger("operator") + logger.SetLogLevel(k8sLogLevel) ctrl.SetLogger(logr.FromSlogHandler(logger.Handler())) setupLog := ctrl.Log.WithName("setup") @@ -102,6 +117,7 @@ func run(cmd *cobra.Command, args []string) error { Scheme: scheme, LeaderElection: enableLeaderElection, LeaderElectionNamespace: runner.WatchNamespace, + LeaderElectionID: "fa62cd4d.flanksource.com", Metrics: ctrlMetrics.Options{ BindAddress: ":0", }, diff --git a/cmd/root.go b/cmd/root.go index 3e5132e87..9770f0189 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -24,10 +24,12 @@ import ( "go.opentelemetry.io/otel" ) +const app = "canary-checker" + func InitContext() (context.Context, error) { - ctx, closer, err := duty.Start("canary-checker", duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode) + ctx, closer, err := duty.Start(app, duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode) if err != nil { - return ctx, fmt.Errorf("Failed to initialize db: %v", err.Error()) + return ctx, fmt.Errorf("failed to initialize db: %v", err.Error()) } shutdown.AddHook(closer) @@ -35,13 +37,13 @@ func InitContext() (context.Context, error) { return ctx, fmt.Errorf("failed to load properties: %v", err) } - ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker")) + ctx.WithTracer(otel.GetTracerProvider().Tracer(app)) return ctx, nil } var Root = &cobra.Command{ - Use: "canary-checker", + Use: app, PersistentPreRun: func(cmd *cobra.Command, args []string) { logger.UseSlog() shutdown.WaitForSignal() diff --git a/cmd/run.go b/cmd/run.go index 531e63804..8daa55091 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -33,7 +33,7 @@ var Run = &cobra.Command{ log.Fatalln("Must specify at least one canary") } - ctx, closer, err := duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode) + ctx, closer, err := duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode) if err != nil { logger.Fatalf("Failed to initialize db: %v", err.Error()) } diff --git a/cmd/topology.go b/cmd/topology.go index 72388d5ed..9c4968f36 100644 --- a/cmd/topology.go +++ b/cmd/topology.go @@ -74,7 +74,7 @@ var RunTopology = &cobra.Command{ defer shutdown.Shutdown() var err error - apicontext.DefaultContext, _, err = duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode) + apicontext.DefaultContext, _, err = duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode) if err != nil { logger.Errorf(err.Error()) return