From 4487754b625445c0b61db3d8beb44fef7e14af57 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 21 Oct 2024 21:03:11 +0545 Subject: [PATCH] feat: leader election [skip ci] --- cmd/operator.go | 40 ++++++++++++++++++++++++++++------------ cmd/root.go | 10 ++++++---- cmd/run.go | 2 +- cmd/topology.go | 2 +- go.mod | 2 +- go.sum | 2 -- 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/cmd/operator.go b/cmd/operator.go index 0d0f54640..e9091f9a2 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -18,7 +18,9 @@ import ( "github.com/flanksource/canary-checker/pkg/controllers" "github.com/flanksource/canary-checker/pkg/labels" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/job" dutyKubernetes "github.com/flanksource/duty/kubernetes" + "github.com/flanksource/duty/leader" "github.com/flanksource/duty/shutdown" "github.com/spf13/cobra" "go.opentelemetry.io/otel" @@ -37,10 +39,10 @@ var ( Use: "operator", Short: "Start the kubernetes operator", Run: func(cmd *cobra.Command, args []string) { - if err := run(cmd, args); err != nil { + if err := run(); err != nil { shutdown.ShutdownAndExit(1, err.Error()) } else { - shutdown.ShutdownAndExit(0, err.Error()) + shutdown.ShutdownAndExit(0, "") } }, } @@ -56,15 +58,7 @@ func init() { // +kubebuilder:scaffold:scheme } -func run(cmd *cobra.Command, args []string) error { - logger := logger.GetLogger("operator") - logger.SetLogLevel(k8sLogLevel) - - scheme := runtime.NewScheme() - - _ = clientgoscheme.AddToScheme(scheme) - _ = canaryv1.AddToScheme(scheme) - +func run() error { ctx, err := InitContext() if err != nil { return err @@ -78,12 +72,23 @@ func run(cmd *cobra.Command, args []string) error { return errors.New("operator requires a kubernetes connection") } - ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker")) + ctx.WithTracer(otel.GetTracerProvider().Tracer(app)) apicontext.DefaultContext = ctx.WithNamespace(runner.WatchNamespace) cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext) + if enableLeaderElection { + job.DisableCronStartOnSchedule() + + go func() { + err := leader.Register(ctx, app, runner.WatchNamespace, nil, nil, nil) + if err != nil { + shutdown.ShutdownAndExit(1, fmt.Sprintf("leader election failed: %v", err)) + } + }() + } + if runner.OperatorExecutor { logger.Infof("Starting executors") @@ -94,6 +99,16 @@ func run(cmd *cobra.Command, args []string) error { } go serve() + return startControllers() +} + +func startControllers() error { + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = canaryv1.AddToScheme(scheme) + + logger := logger.GetLogger("operator") + logger.SetLogLevel(k8sLogLevel) ctrl.SetLogger(logr.FromSlogHandler(logger.Handler())) setupLog := ctrl.Log.WithName("setup") @@ -102,6 +117,7 @@ func run(cmd *cobra.Command, args []string) error { Scheme: scheme, LeaderElection: enableLeaderElection, LeaderElectionNamespace: runner.WatchNamespace, + LeaderElectionID: "fa62cd4d.flanksource.com", Metrics: ctrlMetrics.Options{ BindAddress: ":0", }, diff --git a/cmd/root.go b/cmd/root.go index 3e5132e87..9770f0189 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -24,10 +24,12 @@ import ( "go.opentelemetry.io/otel" ) +const app = "canary-checker" + func InitContext() (context.Context, error) { - ctx, closer, err := duty.Start("canary-checker", duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode) + ctx, closer, err := duty.Start(app, duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode) if err != nil { - return ctx, fmt.Errorf("Failed to initialize db: %v", err.Error()) + return ctx, fmt.Errorf("failed to initialize db: %v", err.Error()) } shutdown.AddHook(closer) @@ -35,13 +37,13 @@ func InitContext() (context.Context, error) { return ctx, fmt.Errorf("failed to load properties: %v", err) } - ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker")) + ctx.WithTracer(otel.GetTracerProvider().Tracer(app)) return ctx, nil } var Root = &cobra.Command{ - Use: "canary-checker", + Use: app, PersistentPreRun: func(cmd *cobra.Command, args []string) { logger.UseSlog() shutdown.WaitForSignal() diff --git a/cmd/run.go b/cmd/run.go index 531e63804..8daa55091 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -33,7 +33,7 @@ var Run = &cobra.Command{ log.Fatalln("Must specify at least one canary") } - ctx, closer, err := duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode) + ctx, closer, err := duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode) if err != nil { logger.Fatalf("Failed to initialize db: %v", err.Error()) } diff --git a/cmd/topology.go b/cmd/topology.go index 72388d5ed..9c4968f36 100644 --- a/cmd/topology.go +++ b/cmd/topology.go @@ -74,7 +74,7 @@ var RunTopology = &cobra.Command{ defer shutdown.Shutdown() var err error - apicontext.DefaultContext, _, err = duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode) + apicontext.DefaultContext, _, err = duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode) if err != nil { logger.Errorf(err.Error()) return diff --git a/go.mod b/go.mod index 14a94ea55..266e53603 100644 --- a/go.mod +++ b/go.mod @@ -327,7 +327,7 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) -// replace github.com/flanksource/duty => ../duty +replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/artifacts => ../artifacts diff --git a/go.sum b/go.sum index 9f2fb6bb1..4660f281d 100644 --- a/go.sum +++ b/go.sum @@ -861,8 +861,6 @@ github.com/flanksource/artifacts v1.0.15 h1:3ImJr2y0ZCXw/QrMhfJJktAT7pYD3sMZR5ix github.com/flanksource/artifacts v1.0.15/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= github.com/flanksource/commons v1.30.5 h1:p8PXGiNt7SurBBh9K3ea8/ZrDvacXSYHJSs/cqJLDK8= github.com/flanksource/commons v1.30.5/go.mod h1:26zdVkmMPsGpvfcsvst5WgsqcyRL8KqFNxkumagBN+A= -github.com/flanksource/duty v1.0.719 h1:Gqs9p0YI6YT+fMpvGrlgGZ2FX8qXbXeFJebx5LPO34s= -github.com/flanksource/duty v1.0.719/go.mod h1:1+h6/KDFTtkmqitfV8cuQJWhLakxTMvKqBjgqW0y2Ps= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.24.36 h1:aJx4MEzPyzZyJ0VQI11SUFU7QaTUVcusCskwNsxexzM= github.com/flanksource/gomplate/v3 v3.24.36/go.mod h1:7m2vIaBstYGIGoanua6Q3s0GbXs7KbBEVRHrEcdHxW4=