From 0bcb64a31c4cf4557eaa033bc00245e4eacf87bd Mon Sep 17 00:00:00 2001 From: Raj Date: Fri, 20 Dec 2024 23:33:33 +0100 Subject: [PATCH 1/4] refactor: Use Shared Rate Limiter Code --- cmd/main.go | 15 +++------------ internal/rate_limiter.go | 2 +- tests/integration/controller/kyma/suite_test.go | 7 ++++++- .../manifest/custom_resource_check/suite_test.go | 2 +- .../integration/controller/manifest/suite_test.go | 2 +- 5 files changed, 12 insertions(+), 16 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 0bfe5aeb4a..1ddb637ba5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,14 +31,12 @@ import ( "github.com/go-co-op/gocron" "github.com/go-logr/logr" "go.uber.org/zap/zapcore" - "golang.org/x/time/rate" istioclientapiv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" machineryruntime "k8s.io/apimachinery/pkg/runtime" machineryutilruntime "k8s.io/apimachinery/pkg/util/runtime" k8sclientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" ctrlruntime "sigs.k8s.io/controller-runtime/pkg/controller" @@ -275,14 +273,8 @@ func scheduleMetricsCleanup(kymaMetrics *metrics.KymaMetrics, cleanupIntervalInM func controllerOptionsFromFlagVar(flagVar *flags.FlagVar) ctrlruntime.Options { return ctrlruntime.Options{ - RateLimiter: workqueue.NewTypedMaxOfRateLimiter( - workqueue.NewTypedItemExponentialFailureRateLimiter[ctrl.Request](flagVar.FailureBaseDelay, - flagVar.FailureMaxDelay), - &workqueue.TypedBucketRateLimiter[ctrl.Request]{ - Limiter: rate.NewLimiter(rate.Limit(flagVar.RateLimiterFrequency), flagVar.RateLimiterBurst), - }, - ), - + RateLimiter: internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst), CacheSyncTimeout: flagVar.CacheSyncTimeout, } } @@ -293,6 +285,7 @@ func setupKymaReconciler(mgr ctrl.Manager, descriptorProvider *provider.CachedDe moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, ) { options.MaxConcurrentReconciles = flagVar.MaxConcurrentKymaReconciles + if err := (&kyma.Reconciler{ Client: mgr.GetClient(), SkrContextFactory: skrContextFactory, @@ -388,8 +381,6 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, event event.Event, ) { options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles - options.RateLimiter = internal.ManifestRateLimiter(flagVar.FailureBaseDelay, - flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) manifestClient := manifestclient.NewManifestClient(event, mgr.GetClient()) diff --git a/internal/rate_limiter.go b/internal/rate_limiter.go index aaa072a765..67b0112a16 100644 --- a/internal/rate_limiter.go +++ b/internal/rate_limiter.go @@ -8,7 +8,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func ManifestRateLimiter( +func RateLimiter( failureBaseDelay time.Duration, failureMaxDelay time.Duration, frequency int, burst int, ) workqueue.TypedRateLimiter[ctrl.Request] { diff --git a/tests/integration/controller/kyma/suite_test.go b/tests/integration/controller/kyma/suite_test.go index a45d5303a6..cbcf1910ab 100644 --- a/tests/integration/controller/kyma/suite_test.go +++ b/tests/integration/controller/kyma/suite_test.go @@ -145,7 +145,12 @@ var _ = BeforeSuite(func() { RemoteSyncNamespace: flags.DefaultRemoteSyncNamespace, Metrics: metrics.NewKymaMetrics(metrics.NewSharedMetrics()), ModuleMetrics: metrics.NewModuleMetrics(), - }).SetupWithManager(mgr, ctrlruntime.Options{}, + }).SetupWithManager(mgr, ctrlruntime.Options{ + RateLimiter: internal.RateLimiter( + 1*time.Second, 5*time.Second, + 30, 200, + ), + }, kyma.SetupOptions{ListenerAddr: randomPort}) Expect(err).ToNot(HaveOccurred()) Eventually(CreateNamespace, Timeout, Interval). diff --git a/tests/integration/controller/manifest/custom_resource_check/suite_test.go b/tests/integration/controller/manifest/custom_resource_check/suite_test.go index 3d3a741b7c..e4acc0cc3d 100644 --- a/tests/integration/controller/manifest/custom_resource_check/suite_test.go +++ b/tests/integration/controller/manifest/custom_resource_check/suite_test.go @@ -166,7 +166,7 @@ var _ = BeforeSuite(func() { Watches(&apicorev1.Secret{}, handler.Funcs{}). WithOptions( ctrlruntime.Options{ - RateLimiter: internal.ManifestRateLimiter( + RateLimiter: internal.RateLimiter( 1*time.Second, 5*time.Second, 30, 200, ), diff --git a/tests/integration/controller/manifest/suite_test.go b/tests/integration/controller/manifest/suite_test.go index 45f5a06f7a..6cbae1104e 100644 --- a/tests/integration/controller/manifest/suite_test.go +++ b/tests/integration/controller/manifest/suite_test.go @@ -162,7 +162,7 @@ var _ = BeforeSuite(func() { Watches(&apicorev1.Secret{}, handler.Funcs{}). WithOptions( ctrlruntime.Options{ - RateLimiter: internal.ManifestRateLimiter( + RateLimiter: internal.RateLimiter( 1*time.Second, 5*time.Second, 30, 200, ), From 9d2348e4cd88c072240c5b28223ec1183fe04bf8 Mon Sep 17 00:00:00 2001 From: Raj Date: Sun, 29 Dec 2024 23:34:21 +0100 Subject: [PATCH 2/4] fix: Introduce Check for IntendedRequeueErrors --- cmd/main.go | 29 ++++++++++++++++++--------- internal/declarative/v2/reconciler.go | 11 +++++++++- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index bc35f12e19..01f2cbff5a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -166,7 +166,7 @@ func setupManager(flagVar *flags.FlagVar, cacheOptions cache.Options, scheme *ma eventRecorder := event.NewRecorderWrapper(mgr.GetEventRecorderFor(shared.OperatorName)) skrContextProvider := remote.NewKymaSkrContextProvider(kcpClient, remoteClientCache, eventRecorder) var skrWebhookManager *watcher.SKRWebhookManifestManager - options := controllerOptionsFromFlagVar(flagVar) + var options ctrlruntime.Options if flagVar.EnableKcpWatcher { if skrWebhookManager, err = createSkrWebhookManager(mgr, skrContextProvider, flagVar); err != nil { setupLog.Error(err, "failed to create skr webhook manager") @@ -264,19 +264,14 @@ func scheduleMetricsCleanup(kymaMetrics *metrics.KymaMetrics, cleanupIntervalInM setupLog.V(log.DebugLevel).Info("scheduled job for cleaning up metrics") } -func controllerOptionsFromFlagVar(flagVar *flags.FlagVar) ctrlruntime.Options { - return ctrlruntime.Options{ - RateLimiter: internal.RateLimiter(flagVar.FailureBaseDelay, - flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst), - CacheSyncTimeout: flagVar.CacheSyncTimeout, - } -} - func setupKymaReconciler(mgr ctrl.Manager, descriptorProvider *provider.CachedDescriptorProvider, skrContextFactory remote.SkrContextProvider, event event.Event, flagVar *flags.FlagVar, options ctrlruntime.Options, skrWebhookManager *watcher.SKRWebhookManifestManager, kymaMetrics *metrics.KymaMetrics, moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentKymaReconciles if err := (&kyma.Reconciler{ @@ -354,6 +349,10 @@ func setupPurgeReconciler(mgr ctrl.Manager, options ctrlruntime.Options, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout + if err := (&purge.Reconciler{ Client: mgr.GetClient(), SkrContextFactory: skrContextProvider, @@ -374,6 +373,9 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c sharedMetrics *metrics.SharedMetrics, mandatoryModulesMetrics *metrics.MandatoryModulesMetrics, moduleMetrics *metrics.ModuleMetrics, setupLog logr.Logger, event event.Event, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentManifestReconciles manifestClient := manifestclient.NewManifestClient(event, mgr.GetClient()) @@ -400,6 +402,9 @@ func setupManifestReconciler(mgr ctrl.Manager, flagVar *flags.FlagVar, options c func setupKcpWatcherReconciler(mgr ctrl.Manager, options ctrlruntime.Options, event event.Event, flagVar *flags.FlagVar, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentWatcherReconciles if err := (&watcherctrl.Reconciler{ @@ -427,6 +432,9 @@ func setupMandatoryModuleReconciler(mgr ctrl.Manager, metrics *metrics.MandatoryModulesMetrics, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleReconciles if err := (&mandatorymodule.InstallationReconciler{ @@ -454,6 +462,9 @@ func setupMandatoryModuleDeletionReconciler(mgr ctrl.Manager, options ctrlruntime.Options, setupLog logr.Logger, ) { + options.RateLimiter = internal.RateLimiter(flagVar.FailureBaseDelay, + flagVar.FailureMaxDelay, flagVar.RateLimiterFrequency, flagVar.RateLimiterBurst) + options.CacheSyncTimeout = flagVar.CacheSyncTimeout options.MaxConcurrentReconciles = flagVar.MaxConcurrentMandatoryModuleDeletionReconciles if err := (&mandatorymodule.DeletionReconciler{ diff --git a/internal/declarative/v2/reconciler.go b/internal/declarative/v2/reconciler.go index aa771d8149..0e15a9fd20 100644 --- a/internal/declarative/v2/reconciler.go +++ b/internal/declarative/v2/reconciler.go @@ -35,6 +35,10 @@ var ( ErrResourceSyncDiffInSameOCILayer = errors.New("resource syncTarget diff detected but in " + "same oci layer, prevent sync resource to be deleted") errStateRequireUpdate = errors.New("manifest state requires update") + + intendedRequeueErrors = map[error]struct{}{ + skrresources.ErrWarningResourceSyncStateDiff: {}, + } ) const ( @@ -551,7 +555,7 @@ func (r *Reconciler) finishReconcile(ctx context.Context, manifest *v1beta2.Mani if err := r.manifestClient.PatchStatusIfDiffExist(ctx, manifest, previousStatus); err != nil { return ctrl.Result{}, err } - if originalErr != nil { + if originalErr != nil && !isIntendedRequeueError(originalErr) { r.ManifestMetrics.RecordRequeueReason(requeueReason, queue.UnexpectedRequeue) return ctrl.Result{}, originalErr } @@ -560,6 +564,11 @@ func (r *Reconciler) finishReconcile(ctx context.Context, manifest *v1beta2.Mani return ctrl.Result{RequeueAfter: requeueAfter}, nil } +func isIntendedRequeueError(err error) bool { + _, ok := intendedRequeueErrors[err] + return ok +} + func (r *Reconciler) ssaSpec(ctx context.Context, manifest *v1beta2.Manifest, requeueReason metrics.ManifestRequeueReason, ) (ctrl.Result, error) { From dffabb9221f4a0f84357ecaabea072fd50ad4470 Mon Sep 17 00:00:00 2001 From: Raj Date: Sun, 29 Dec 2024 23:40:42 +0100 Subject: [PATCH 3/4] fix: Remove Global Variable --- internal/declarative/v2/reconciler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/declarative/v2/reconciler.go b/internal/declarative/v2/reconciler.go index 0e15a9fd20..6237cb8057 100644 --- a/internal/declarative/v2/reconciler.go +++ b/internal/declarative/v2/reconciler.go @@ -35,10 +35,6 @@ var ( ErrResourceSyncDiffInSameOCILayer = errors.New("resource syncTarget diff detected but in " + "same oci layer, prevent sync resource to be deleted") errStateRequireUpdate = errors.New("manifest state requires update") - - intendedRequeueErrors = map[error]struct{}{ - skrresources.ErrWarningResourceSyncStateDiff: {}, - } ) const ( @@ -565,6 +561,10 @@ func (r *Reconciler) finishReconcile(ctx context.Context, manifest *v1beta2.Mani } func isIntendedRequeueError(err error) bool { + intendedRequeueErrors := map[error]struct{}{ + skrresources.ErrWarningResourceSyncStateDiff: {}, + } + _, ok := intendedRequeueErrors[err] return ok } From 4e20ea4e19d9b50fe614fd01977b0a1d2ea98505 Mon Sep 17 00:00:00 2001 From: Raj Date: Mon, 30 Dec 2024 11:16:12 +0100 Subject: [PATCH 4/4] chore: Remove Intended Requeue Error Check --- internal/declarative/v2/reconciler.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/internal/declarative/v2/reconciler.go b/internal/declarative/v2/reconciler.go index 6237cb8057..aa771d8149 100644 --- a/internal/declarative/v2/reconciler.go +++ b/internal/declarative/v2/reconciler.go @@ -551,7 +551,7 @@ func (r *Reconciler) finishReconcile(ctx context.Context, manifest *v1beta2.Mani if err := r.manifestClient.PatchStatusIfDiffExist(ctx, manifest, previousStatus); err != nil { return ctrl.Result{}, err } - if originalErr != nil && !isIntendedRequeueError(originalErr) { + if originalErr != nil { r.ManifestMetrics.RecordRequeueReason(requeueReason, queue.UnexpectedRequeue) return ctrl.Result{}, originalErr } @@ -560,15 +560,6 @@ func (r *Reconciler) finishReconcile(ctx context.Context, manifest *v1beta2.Mani return ctrl.Result{RequeueAfter: requeueAfter}, nil } -func isIntendedRequeueError(err error) bool { - intendedRequeueErrors := map[error]struct{}{ - skrresources.ErrWarningResourceSyncStateDiff: {}, - } - - _, ok := intendedRequeueErrors[err] - return ok -} - func (r *Reconciler) ssaSpec(ctx context.Context, manifest *v1beta2.Manifest, requeueReason metrics.ManifestRequeueReason, ) (ctrl.Result, error) {