Skip to content

Commit

Permalink
Merge pull request #11767 from k8s-infra-cherrypick-robot/cherry-pick…
Browse files Browse the repository at this point in the history
…-11757-to-release-1.9

[release-1.9] 🐛 ClusterCache: Increase timeout for informer List+Watch calls from 10s to 11m
  • Loading branch information
k8s-ci-robot authored Jan 28, 2025
2 parents dc0cc60 + 2404d2b commit 4506c52
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 16 deletions.
11 changes: 8 additions & 3 deletions controllers/clustercache/cluster_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type clusterAccessor struct {
// and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures).
// lockedStateLock must be *always* held (via lock or rLock) before accessing this field.
lockedState clusterAccessorLockedState

// cacheCtx is the ctx used when starting the cache.
// This ctx can be used by the ClusterCache to stop the cache.
cacheCtx context.Context //nolint:containedctx
}

// clusterAccessorConfig is the config of the clusterAccessor.
Expand Down Expand Up @@ -211,10 +215,11 @@ type clusterAccessorLockedHealthCheckingState struct {
}

// newClusterAccessor creates a new clusterAccessor.
func newClusterAccessor(cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
func newClusterAccessor(cacheCtx context.Context, cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
return &clusterAccessor{
cluster: cluster,
config: clusterAccessorConfig,
cacheCtx: cacheCtx,
cluster: cluster,
config: clusterAccessorConfig,
}
}

Expand Down
24 changes: 19 additions & 5 deletions controllers/clustercache/cluster_accessor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
}

log.V(6).Info("Creating cached client and cache")
cachedClient, cache, err := createCachedClient(ctx, ca.config, restConfig, httpClient, mapper)
cachedClient, cache, err := createCachedClient(ctx, ca.cacheCtx, ca.config, restConfig, httpClient, mapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,23 +212,37 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
}

// createCachedClient creates a cached client for the given cluster, based on the rest.Config.
func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
func createCachedClient(ctx, cacheCtx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
// This config will only be used for List and Watch calls of informers
// because we don't want these requests to time out after the regular timeout
// of Options.Client.Timeout (default 10s).
// Lists of informers have no timeouts set.
// Watches of informers are timing out per default after [5m, 2*5m].
// https://github.com/kubernetes/client-go/blob/v0.32.0/tools/cache/reflector.go#L53-L55
// We are setting 11m to set a timeout for List calls without influencing Watch calls.
configWith11mTimeout := rest.CopyConfig(config)
configWith11mTimeout.Timeout = 11 * time.Minute
httpClientWith11mTimeout, err := rest.HTTPClientFor(configWith11mTimeout)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating cache: error creating HTTP client")
}

// Create the cache for the cluster.
cacheOptions := cache.Options{
HTTPClient: httpClient,
HTTPClient: httpClientWith11mTimeout,
Scheme: clusterAccessorConfig.Scheme,
Mapper: mapper,
SyncPeriod: clusterAccessorConfig.Cache.SyncPeriod,
ByObject: clusterAccessorConfig.Cache.ByObject,
}
remoteCache, err := cache.New(config, cacheOptions)
remoteCache, err := cache.New(configWith11mTimeout, cacheOptions)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating cache")
}

// Use a context that is independent of the passed in context, so the cache doesn't get stopped
// when the passed in context is canceled.
cacheCtx, cacheCtxCancel := context.WithCancel(context.Background())
cacheCtx, cacheCtxCancel := context.WithCancel(cacheCtx)

// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
cache := &stoppableCache{
Expand Down
8 changes: 4 additions & 4 deletions controllers/clustercache/cluster_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestConnect(t *testing.T) {
Indexes: []CacheOptionsIndex{NodeProviderIDIndex},
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

// Connect when kubeconfig Secret doesn't exist (should fail)
err := accessor.Connect(ctx)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestDisconnect(t *testing.T) {
Timeout: 10 * time.Second,
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

// Connect (so we can disconnect afterward)
g.Expect(accessor.Connect(ctx)).To(Succeed())
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestHealthCheck(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

accessor := newClusterAccessor(clusterKey, &clusterAccessorConfig{
accessor := newClusterAccessor(context.Background(), clusterKey, &clusterAccessorConfig{
HealthProbe: &clusterAccessorHealthProbeConfig{
Timeout: 5 * time.Second,
FailureThreshold: 5,
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestWatch(t *testing.T) {
Timeout: 10 * time.Second,
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

tw := &testWatcher{}
wi := WatcherOptions{
Expand Down
19 changes: 18 additions & 1 deletion controllers/clustercache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,14 @@ func SetupWithManager(ctx context.Context, mgr manager.Manager, options Options,
log.Info("Couldn't find controller Pod metadata, the ClusterCache will always access the cluster it is running on using the regular apiserver endpoint")
}

cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background())

cc := &clusterCache{
client: mgr.GetClient(),
clusterAccessorConfig: buildClusterAccessorConfig(mgr.GetScheme(), options, controllerPodMetadata),
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
cacheCtx: cacheCtx,
cacheCtxCancel: cacheCtxCancel,
}

err := ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -331,6 +335,12 @@ type clusterCache struct {
// This information is necessary so we can enqueue reconcile.Requests for reconcilers that
// got a cluster source via GetClusterSource.
clusterSources []clusterSource

// cacheCtx is passed to clusterAccessors to be used when starting caches.
cacheCtx context.Context //nolint:containedctx

// cacheCtxCancel is used during Shutdown to stop caches.
cacheCtxCancel context.CancelCauseFunc
}

// clusterSource stores the necessary information so we can enqueue reconcile.Requests for reconcilers that
Expand Down Expand Up @@ -523,7 +533,7 @@ func (cc *clusterCache) getOrCreateClusterAccessor(cluster client.ObjectKey) *cl

accessor, ok := cc.clusterAccessors[cluster]
if !ok {
accessor = newClusterAccessor(cluster, cc.clusterAccessorConfig)
accessor = newClusterAccessor(cc.cacheCtx, cluster, cc.clusterAccessorConfig)
cc.clusterAccessors[cluster] = accessor
}

Expand Down Expand Up @@ -656,6 +666,13 @@ func (cc *clusterCache) SetConnectionCreationRetryInterval(interval time.Duratio
cc.clusterAccessorConfig.ConnectionCreationRetryInterval = interval
}

// Shutdown can be used to shut down the ClusterCache in unit tests.
// This method should only be used for tests because it hasn't been designed for production usage
// in a manager (race conditions with manager shutdown etc.).
func (cc *clusterCache) Shutdown() {
cc.cacheCtxCancel(errors.New("ClusterCache is shutdown"))
}

func validateAndDefaultOptions(opts *Options) error {
if opts.SecretClient == nil {
return errors.New("options.SecretClient must be set")
Expand Down
2 changes: 2 additions & 0 deletions controllers/clustercache/cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestReconcile(t *testing.T) {
client: env.Manager.GetAPIReader(),
clusterAccessorConfig: accessorConfig,
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
cacheCtx: context.Background(),
}

// Add a Cluster source and start it (queue will be later used to verify the source works correctly)
Expand Down Expand Up @@ -537,6 +538,7 @@ func TestClusterCacheConcurrency(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
internalClusterCache, ok := cc.(*clusterCache)
g.Expect(ok).To(BeTrue())
defer internalClusterCache.Shutdown()

// Generate test clusters.
testClusters := generateTestClusters(clusterCount, brokenClusterPercentage)
Expand Down
1 change: 1 addition & 0 deletions controlplane/kubeadm/internal/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func TestGetWorkloadCluster(t *testing.T) {
},
}, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)})
g.Expect(err).ToNot(HaveOccurred())
defer clusterCache.(interface{ Shutdown() }).Shutdown()

m := Management{
Client: env.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions exp/addons/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

reconciler := ClusterResourceSetReconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions exp/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&MachinePoolReconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/cluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestGetNode(t *testing.T) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
defer clusterCache.(interface{ Shutdown() }).Shutdown()

r := &Reconciler{
ClusterCache: clusterCache,
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machinedeployment/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&machinecontroller.Reconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machinehealthcheck/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machineset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&Reconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/topology/cluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&Reconciler{
Client: mgr.GetClient(),
Expand Down
6 changes: 3 additions & 3 deletions internal/test/envtest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func Run(ctx context.Context, input RunInput) int {
// Bootstrapping test environment
env := newEnvironment(input.ManagerUncachedObjs...)

ctx, cancel := context.WithCancel(ctx)
env.cancelManager = cancel

if input.SetupIndexes != nil {
input.SetupIndexes(ctx, env.Manager)
}
Expand Down Expand Up @@ -376,9 +379,6 @@ func newEnvironment(uncachedObjs ...client.Object) *Environment {

// start starts the manager.
func (e *Environment) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
e.cancelManager = cancel

go func() {
fmt.Println("Starting the test environment manager")
if err := e.Manager.Start(ctx); err != nil {
Expand Down

0 comments on commit 4506c52

Please sign in to comment.