diff --git a/cmd/main.go b/cmd/main.go index 608dfd08b6..01f2cbff5a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,13 +31,11 @@ import ( "github.com/go-co-op/gocron" "github.com/go-logr/logr" "go.uber.org/zap/zapcore" - "golang.org/x/time/rate" istioclientapiv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" machineryruntime "k8s.io/apimachinery/pkg/runtime" machineryutilruntime "k8s.io/apimachinery/pkg/util/runtime" k8sclientscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" ctrlruntime "sigs.k8s.io/controller-runtime/pkg/controller" @@ -168,7 +166,7 @@ func setupManager(flagVar *flags.FlagVar, cacheOptions cache.Options, scheme *ma eventRecorder := event.NewRecorderWrapper(mgr.GetEventRecorderFor(shared.OperatorName)) skrContextProvider := remote.NewKymaSkrContextProvider(kcpClient, remoteClientCache, eventRecorder) var skrWebhookManager *watcher.SKRWebhookManifestManager - options := controllerOptionsFromFlagVar(flagVar) + var options ctrlruntime.Options if flagVar.EnableKcpWatcher { if skrWebhookManager, err = createSkrWebhookManager(mgr, skrContextProvider, flagVar); err != nil { setupLog.Error(err, "failed to create skr webhook manager") @@ -266,26 +264,16 @@ func scheduleMetricsCleanup(kymaMetrics *metrics.KymaMetrics, cleanupIntervalInM setupLog.V(log.DebugLevel).Info("scheduled job for cleaning up metrics") } -func controllerOptionsFromFlagVar(flagVar *flags.FlagVar) ctrlruntime.Options { - return ctrlruntime.Options{ - RateLimiter: workqueue.NewTypedMaxOfRateLimiter( - workqueue.NewTypedItemExponentialFailureRateLimiter[ctrl.Request](flagVar.FailureBaseDelay, - flagVar.FailureMaxDelay), - &workqueue.TypedBucketRateLimiter[ctrl.Request]{ - Limiter: rate.NewLimiter(rate.Limit(flagVar.RateLimiterFrequency), flagVar.RateLimiterBurst), - }, - ), - - CacheSyncTimeout: flagVar.CacheSyncTimeout, - } -} - func setupKymaReconciler(mgr ctrl.Manager, descriptorProvider *provider.CachedDescriptorProvider, skrContextFactory remote.SkrContextProvider, event event.Event, flagVar *flags.FlagVar, options ctrlruntime.Options, skrWebhookManager *watcher.SKRWebhookManifestManager, kymaMetrics *metrics.KymaMetrics, moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentKymaReconciles + if err := (&kyma.Reconciler{ Client: mgr.GetClient(), SkrContextFactory: skrContextFactory, @@ -361,6 +349,10 @@ func setupPurgeReconciler(mgr ctrl.Manager, options ctrlruntime.Options, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout + if err := (&purge.Reconciler{ Client: mgr.GetClient(), SkrContextFactory: skrContextProvider, @@ -381,9 +373,10 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c sharedMetrics *metrics.SharedMetrics, mandatoryModulesMetrics *metrics.MandatoryModulesMetrics, moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, event event.Event, ) { - options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles - options.RateLimiter = internal.ManifestRateLimiter(flagVar.FailureBaseDelay, + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout + options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles manifestClient := manifestclient.NewManifestClient(event, mgr.GetClient()) @@ -409,6 +402,9 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c func setupKcpWatcherReconciler(mgr ctrl.Manager, options ctrlruntime.Options, event event.Event, flagVar *flags.FlagVar, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentWatcherReconciles if err := (&watcherctrl.Reconciler{ @@ -436,6 +432,9 @@ func setupMandatoryModuleReconciler(mgr ctrl.Manager, metrics *metrics.MandatoryModulesMetrics, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleReconciles if err := (&mandatorymodule.InstallationReconciler{ @@ -463,6 +462,9 @@ func setupMandatoryModuleDeletionReconciler(mgr ctrl.Manager, options ctrlruntime.Options, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleDeletionReconciles if err := (&mandatorymodule.DeletionReconciler{ diff --git a/internal/rate_limiter.go b/internal/rate_limiter.go index aaa072a765..67b0112a16 100644 --- a/internal/rate_limiter.go +++ b/internal/rate_limiter.go @@ -8,7 +8,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func ManifestRateLimiter( +func RateLimiter( failureBaseDelay time.Duration, failureMaxDelay time.Duration, frequency int, burst int, ) workqueue.TypedRateLimiter[ctrl.Request] { diff --git a/tests/integration/controller/kyma/suite_test.go b/tests/integration/controller/kyma/suite_test.go index a45d5303a6..cbcf1910ab 100644 --- a/tests/integration/controller/kyma/suite_test.go +++ b/tests/integration/controller/kyma/suite_test.go @@ -145,7 +145,12 @@ var _ = BeforeSuite(func() { RemoteSyncNamespace: flags.DefaultRemoteSyncNamespace, Metrics: metrics.NewKymaMetrics(metrics.NewSharedMetrics()), ModuleMetrics: metrics.NewModuleMetrics(), - }).SetupWithManager(mgr, ctrlruntime.Options{}, + }).SetupWithManager(mgr, ctrlruntime.Options{ + RateLimiter: internal.RateLimiter( + 1*time.Second, 5*time.Second, + 30, 200, + ), + }, kyma.SetupOptions{ListenerAddr: randomPort}) Expect(err).ToNot(HaveOccurred()) Eventually(CreateNamespace, Timeout, Interval). diff --git a/tests/integration/controller/manifest/custom_resource_check/suite_test.go b/tests/integration/controller/manifest/custom_resource_check/suite_test.go index 3d3a741b7c..e4acc0cc3d 100644 --- a/tests/integration/controller/manifest/custom_resource_check/suite_test.go +++ b/tests/integration/controller/manifest/custom_resource_check/suite_test.go @@ -166,7 +166,7 @@ var _ = BeforeSuite(func() { Watches(&apicorev1.Secret{}, handler.Funcs{}). WithOptions( ctrlruntime.Options{ - RateLimiter: internal.ManifestRateLimiter( + RateLimiter: internal.RateLimiter( 1*time.Second, 5*time.Second, 30, 200, ), diff --git a/tests/integration/controller/manifest/suite_test.go b/tests/integration/controller/manifest/suite_test.go index 45f5a06f7a..6cbae1104e 100644 --- a/tests/integration/controller/manifest/suite_test.go +++ b/tests/integration/controller/manifest/suite_test.go @@ -162,7 +162,7 @@ var _ = BeforeSuite(func() { Watches(&apicorev1.Secret{}, handler.Funcs{}). WithOptions( ctrlruntime.Options{ - RateLimiter: internal.ManifestRateLimiter( + RateLimiter: internal.RateLimiter( 1*time.Second, 5*time.Second, 30, 200, ),