From 2404d2bf3a8fe10ee6a954a5090b88e5eff1b554 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Fri, 24 Jan 2025 17:06:33 +0100 Subject: [PATCH] ClusterCache: Increase timeout for informer List+Watch calls from 10s to 11m --- controllers/clustercache/cluster_accessor.go | 11 ++++++--- .../clustercache/cluster_accessor_client.go | 24 +++++++++++++++---- .../clustercache/cluster_accessor_test.go | 8 +++---- controllers/clustercache/cluster_cache.go | 19 ++++++++++++++- .../clustercache/cluster_cache_test.go | 2 ++ controlplane/kubeadm/internal/cluster_test.go | 1 + exp/addons/internal/controllers/suite_test.go | 4 ++++ exp/internal/controllers/suite_test.go | 4 ++++ internal/controllers/cluster/suite_test.go | 4 ++++ .../machine_controller_noderef_test.go | 1 + internal/controllers/machine/suite_test.go | 4 ++++ .../machinedeployment/suite_test.go | 4 ++++ .../machinehealthcheck/suite_test.go | 4 ++++ internal/controllers/machineset/suite_test.go | 4 ++++ .../topology/cluster/suite_test.go | 4 ++++ internal/test/envtest/environment.go | 6 ++--- 16 files changed, 88 insertions(+), 16 deletions(-) diff --git a/controllers/clustercache/cluster_accessor.go b/controllers/clustercache/cluster_accessor.go index ff24b0d6e1e2..9c7e5944f7fa 100644 --- a/controllers/clustercache/cluster_accessor.go +++ b/controllers/clustercache/cluster_accessor.go @@ -54,6 +54,10 @@ type clusterAccessor struct { // and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures). // lockedStateLock must be *always* held (via lock or rLock) before accessing this field. lockedState clusterAccessorLockedState + + // cacheCtx is the ctx used when starting the cache. + // This ctx can be used by the ClusterCache to stop the cache. + cacheCtx context.Context //nolint:containedctx } // clusterAccessorConfig is the config of the clusterAccessor. @@ -211,10 +215,11 @@ type clusterAccessorLockedHealthCheckingState struct { } // newClusterAccessor creates a new clusterAccessor. -func newClusterAccessor(cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor { +func newClusterAccessor(cacheCtx context.Context, cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor { return &clusterAccessor{ - cluster: cluster, - config: clusterAccessorConfig, + cacheCtx: cacheCtx, + cluster: cluster, + config: clusterAccessorConfig, } } diff --git a/controllers/clustercache/cluster_accessor_client.go b/controllers/clustercache/cluster_accessor_client.go index 55fe61ca6024..ad5ac8e5fc3b 100644 --- a/controllers/clustercache/cluster_accessor_client.go +++ b/controllers/clustercache/cluster_accessor_client.go @@ -100,7 +100,7 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect } log.V(6).Info("Creating cached client and cache") - cachedClient, cache, err := createCachedClient(ctx, ca.config, restConfig, httpClient, mapper) + cachedClient, cache, err := createCachedClient(ctx, ca.cacheCtx, ca.config, restConfig, httpClient, mapper) if err != nil { return nil, err } @@ -212,23 +212,37 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien } // createCachedClient creates a cached client for the given cluster, based on the rest.Config. -func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) { +func createCachedClient(ctx, cacheCtx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) { + // This config will only be used for List and Watch calls of informers + // because we don't want these requests to time out after the regular timeout + // of Options.Client.Timeout (default 10s). + // Lists of informers have no timeouts set. + // Watches of informers are timing out per default after [5m, 2*5m]. + // https://github.com/kubernetes/client-go/blob/v0.32.0/tools/cache/reflector.go#L53-L55 + // We are setting 11m to set a timeout for List calls without influencing Watch calls. + configWith11mTimeout := rest.CopyConfig(config) + configWith11mTimeout.Timeout = 11 * time.Minute + httpClientWith11mTimeout, err := rest.HTTPClientFor(configWith11mTimeout) + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating cache: error creating HTTP client") + } + // Create the cache for the cluster. cacheOptions := cache.Options{ - HTTPClient: httpClient, + HTTPClient: httpClientWith11mTimeout, Scheme: clusterAccessorConfig.Scheme, Mapper: mapper, SyncPeriod: clusterAccessorConfig.Cache.SyncPeriod, ByObject: clusterAccessorConfig.Cache.ByObject, } - remoteCache, err := cache.New(config, cacheOptions) + remoteCache, err := cache.New(configWith11mTimeout, cacheOptions) if err != nil { return nil, nil, errors.Wrapf(err, "error creating cache") } // Use a context that is independent of the passed in context, so the cache doesn't get stopped // when the passed in context is canceled. - cacheCtx, cacheCtxCancel := context.WithCancel(context.Background()) + cacheCtx, cacheCtxCancel := context.WithCancel(cacheCtx) // We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache. cache := &stoppableCache{ diff --git a/controllers/clustercache/cluster_accessor_test.go b/controllers/clustercache/cluster_accessor_test.go index 4e48df37cd0d..03a929e77562 100644 --- a/controllers/clustercache/cluster_accessor_test.go +++ b/controllers/clustercache/cluster_accessor_test.go @@ -66,7 +66,7 @@ func TestConnect(t *testing.T) { Indexes: []CacheOptionsIndex{NodeProviderIDIndex}, }, }, nil) - accessor := newClusterAccessor(clusterKey, config) + accessor := newClusterAccessor(context.Background(), clusterKey, config) // Connect when kubeconfig Secret doesn't exist (should fail) err := accessor.Connect(ctx) @@ -164,7 +164,7 @@ func TestDisconnect(t *testing.T) { Timeout: 10 * time.Second, }, }, nil) - accessor := newClusterAccessor(clusterKey, config) + accessor := newClusterAccessor(context.Background(), clusterKey, config) // Connect (so we can disconnect afterward) g.Expect(accessor.Connect(ctx)).To(Succeed()) @@ -271,7 +271,7 @@ func TestHealthCheck(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - accessor := newClusterAccessor(clusterKey, &clusterAccessorConfig{ + accessor := newClusterAccessor(context.Background(), clusterKey, &clusterAccessorConfig{ HealthProbe: &clusterAccessorHealthProbeConfig{ Timeout: 5 * time.Second, FailureThreshold: 5, @@ -324,7 +324,7 @@ func TestWatch(t *testing.T) { Timeout: 10 * time.Second, }, }, nil) - accessor := newClusterAccessor(clusterKey, config) + accessor := newClusterAccessor(context.Background(), clusterKey, config) tw := &testWatcher{} wi := WatcherOptions{ diff --git a/controllers/clustercache/cluster_cache.go b/controllers/clustercache/cluster_cache.go index f907e252569d..eee8dcdc7c0a 100644 --- a/controllers/clustercache/cluster_cache.go +++ b/controllers/clustercache/cluster_cache.go @@ -295,10 +295,14 @@ func SetupWithManager(ctx context.Context, mgr manager.Manager, options Options, log.Info("Couldn't find controller Pod metadata, the ClusterCache will always access the cluster it is running on using the regular apiserver endpoint") } + cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background()) + cc := &clusterCache{ client: mgr.GetClient(), clusterAccessorConfig: buildClusterAccessorConfig(mgr.GetScheme(), options, controllerPodMetadata), clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + cacheCtx: cacheCtx, + cacheCtxCancel: cacheCtxCancel, } err := ctrl.NewControllerManagedBy(mgr). @@ -331,6 +335,12 @@ type clusterCache struct { // This information is necessary so we can enqueue reconcile.Requests for reconcilers that // got a cluster source via GetClusterSource. clusterSources []clusterSource + + // cacheCtx is passed to clusterAccessors to be used when starting caches. + cacheCtx context.Context //nolint:containedctx + + // cacheCtxCancel is used during Shutdown to stop caches. + cacheCtxCancel context.CancelCauseFunc } // clusterSource stores the necessary information so we can enqueue reconcile.Requests for reconcilers that @@ -523,7 +533,7 @@ func (cc *clusterCache) getOrCreateClusterAccessor(cluster client.ObjectKey) *cl accessor, ok := cc.clusterAccessors[cluster] if !ok { - accessor = newClusterAccessor(cluster, cc.clusterAccessorConfig) + accessor = newClusterAccessor(cc.cacheCtx, cluster, cc.clusterAccessorConfig) cc.clusterAccessors[cluster] = accessor } @@ -656,6 +666,13 @@ func (cc *clusterCache) SetConnectionCreationRetryInterval(interval time.Duratio cc.clusterAccessorConfig.ConnectionCreationRetryInterval = interval } +// Shutdown can be used to shut down the ClusterCache in unit tests. +// This method should only be used for tests because it hasn't been designed for production usage +// in a manager (race conditions with manager shutdown etc.). +func (cc *clusterCache) Shutdown() { + cc.cacheCtxCancel(errors.New("ClusterCache is shutdown")) +} + func validateAndDefaultOptions(opts *Options) error { if opts.SecretClient == nil { return errors.New("options.SecretClient must be set") diff --git a/controllers/clustercache/cluster_cache_test.go b/controllers/clustercache/cluster_cache_test.go index 5cff083f2eb9..12ba37145270 100644 --- a/controllers/clustercache/cluster_cache_test.go +++ b/controllers/clustercache/cluster_cache_test.go @@ -77,6 +77,7 @@ func TestReconcile(t *testing.T) { client: env.Manager.GetAPIReader(), clusterAccessorConfig: accessorConfig, clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + cacheCtx: context.Background(), } // Add a Cluster source and start it (queue will be later used to verify the source works correctly) @@ -537,6 +538,7 @@ func TestClusterCacheConcurrency(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) internalClusterCache, ok := cc.(*clusterCache) g.Expect(ok).To(BeTrue()) + defer internalClusterCache.Shutdown() // Generate test clusters. testClusters := generateTestClusters(clusterCount, brokenClusterPercentage) diff --git a/controlplane/kubeadm/internal/cluster_test.go b/controlplane/kubeadm/internal/cluster_test.go index 77e807f78663..eaf740f9a0c6 100644 --- a/controlplane/kubeadm/internal/cluster_test.go +++ b/controlplane/kubeadm/internal/cluster_test.go @@ -225,6 +225,7 @@ func TestGetWorkloadCluster(t *testing.T) { }, }, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)}) g.Expect(err).ToNot(HaveOccurred()) + defer clusterCache.(interface{ Shutdown() }).Shutdown() m := Management{ Client: env.GetClient(), diff --git a/exp/addons/internal/controllers/suite_test.go b/exp/addons/internal/controllers/suite_test.go index 1e560ae533d5..ea7c6d8513f9 100644 --- a/exp/addons/internal/controllers/suite_test.go +++ b/exp/addons/internal/controllers/suite_test.go @@ -97,6 +97,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() reconciler := ClusterResourceSetReconciler{ Client: mgr.GetClient(), diff --git a/exp/internal/controllers/suite_test.go b/exp/internal/controllers/suite_test.go index 3aaff11385ca..a064fe61dc1e 100644 --- a/exp/internal/controllers/suite_test.go +++ b/exp/internal/controllers/suite_test.go @@ -65,6 +65,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() if err := (&MachinePoolReconciler{ Client: mgr.GetClient(), diff --git a/internal/controllers/cluster/suite_test.go b/internal/controllers/cluster/suite_test.go index 122a405cad73..681c9945ce65 100644 --- a/internal/controllers/cluster/suite_test.go +++ b/internal/controllers/cluster/suite_test.go @@ -83,6 +83,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() // Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is // only retried every 30s. If we get unlucky tests are then failing with timeout. diff --git a/internal/controllers/machine/machine_controller_noderef_test.go b/internal/controllers/machine/machine_controller_noderef_test.go index 4b58ead1e75a..4d2ee8195bfa 100644 --- a/internal/controllers/machine/machine_controller_noderef_test.go +++ b/internal/controllers/machine/machine_controller_noderef_test.go @@ -341,6 +341,7 @@ func TestGetNode(t *testing.T) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + defer clusterCache.(interface{ Shutdown() }).Shutdown() r := &Reconciler{ ClusterCache: clusterCache, diff --git a/internal/controllers/machine/suite_test.go b/internal/controllers/machine/suite_test.go index 2d66ba0641b0..bbfe8c989bdc 100644 --- a/internal/controllers/machine/suite_test.go +++ b/internal/controllers/machine/suite_test.go @@ -83,6 +83,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() // Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is // only retried every 30s. If we get unlucky tests are then failing with timeout. diff --git a/internal/controllers/machinedeployment/suite_test.go b/internal/controllers/machinedeployment/suite_test.go index ff041b191bbd..15926c0c94e9 100644 --- a/internal/controllers/machinedeployment/suite_test.go +++ b/internal/controllers/machinedeployment/suite_test.go @@ -89,6 +89,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() if err := (&machinecontroller.Reconciler{ Client: mgr.GetClient(), diff --git a/internal/controllers/machinehealthcheck/suite_test.go b/internal/controllers/machinehealthcheck/suite_test.go index d68122bc76eb..a798e0744e08 100644 --- a/internal/controllers/machinehealthcheck/suite_test.go +++ b/internal/controllers/machinehealthcheck/suite_test.go @@ -75,6 +75,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() // Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is // only retried every 30s. If we get unlucky tests are then failing with timeout. diff --git a/internal/controllers/machineset/suite_test.go b/internal/controllers/machineset/suite_test.go index ec9aa076bfb2..4fda194a24b5 100644 --- a/internal/controllers/machineset/suite_test.go +++ b/internal/controllers/machineset/suite_test.go @@ -88,6 +88,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() if err := (&Reconciler{ Client: mgr.GetClient(), diff --git a/internal/controllers/topology/cluster/suite_test.go b/internal/controllers/topology/cluster/suite_test.go index 715f4a52bd47..6184c8d18555 100644 --- a/internal/controllers/topology/cluster/suite_test.go +++ b/internal/controllers/topology/cluster/suite_test.go @@ -86,6 +86,10 @@ func TestMain(m *testing.M) { if err != nil { panic(fmt.Sprintf("Failed to create ClusterCache: %v", err)) } + go func() { + <-ctx.Done() + clusterCache.(interface{ Shutdown() }).Shutdown() + }() if err := (&Reconciler{ Client: mgr.GetClient(), diff --git a/internal/test/envtest/environment.go b/internal/test/envtest/environment.go index 6f73e038d748..9916300eba1e 100644 --- a/internal/test/envtest/environment.go +++ b/internal/test/envtest/environment.go @@ -146,6 +146,9 @@ func Run(ctx context.Context, input RunInput) int { // Bootstrapping test environment env := newEnvironment(input.ManagerUncachedObjs...) + ctx, cancel := context.WithCancel(ctx) + env.cancelManager = cancel + if input.SetupIndexes != nil { input.SetupIndexes(ctx, env.Manager) } @@ -376,9 +379,6 @@ func newEnvironment(uncachedObjs ...client.Object) *Environment { // start starts the manager. func (e *Environment) start(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - e.cancelManager = cancel - go func() { fmt.Println("Starting the test environment manager") if err := e.Manager.Start(ctx); err != nil {