Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.9] 🐛 ClusterCache: Increase timeout for informer List+Watch calls from 10s to 11m #11767

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions controllers/clustercache/cluster_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type clusterAccessor struct {
// and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures).
// lockedStateLock must be *always* held (via lock or rLock) before accessing this field.
lockedState clusterAccessorLockedState

// cacheCtx is the ctx used when starting the cache.
// This ctx can be used by the ClusterCache to stop the cache.
cacheCtx context.Context //nolint:containedctx
}

// clusterAccessorConfig is the config of the clusterAccessor.
Expand Down Expand Up @@ -211,10 +215,11 @@ type clusterAccessorLockedHealthCheckingState struct {
}

// newClusterAccessor creates a new clusterAccessor.
func newClusterAccessor(cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
func newClusterAccessor(cacheCtx context.Context, cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
return &clusterAccessor{
cluster: cluster,
config: clusterAccessorConfig,
cacheCtx: cacheCtx,
cluster: cluster,
config: clusterAccessorConfig,
}
}

Expand Down
24 changes: 19 additions & 5 deletions controllers/clustercache/cluster_accessor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
}

log.V(6).Info("Creating cached client and cache")
cachedClient, cache, err := createCachedClient(ctx, ca.config, restConfig, httpClient, mapper)
cachedClient, cache, err := createCachedClient(ctx, ca.cacheCtx, ca.config, restConfig, httpClient, mapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,23 +212,37 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
}

// createCachedClient creates a cached client for the given cluster, based on the rest.Config.
func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
func createCachedClient(ctx, cacheCtx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
// This config will only be used for List and Watch calls of informers
// because we don't want these requests to time out after the regular timeout
// of Options.Client.Timeout (default 10s).
// Lists of informers have no timeouts set.
// Watches of informers are timing out per default after [5m, 2*5m].
// https://github.com/kubernetes/client-go/blob/v0.32.0/tools/cache/reflector.go#L53-L55
// We are setting 11m to set a timeout for List calls without influencing Watch calls.
configWith11mTimeout := rest.CopyConfig(config)
configWith11mTimeout.Timeout = 11 * time.Minute
httpClientWith11mTimeout, err := rest.HTTPClientFor(configWith11mTimeout)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating cache: error creating HTTP client")
}

// Create the cache for the cluster.
cacheOptions := cache.Options{
HTTPClient: httpClient,
HTTPClient: httpClientWith11mTimeout,
Scheme: clusterAccessorConfig.Scheme,
Mapper: mapper,
SyncPeriod: clusterAccessorConfig.Cache.SyncPeriod,
ByObject: clusterAccessorConfig.Cache.ByObject,
}
remoteCache, err := cache.New(config, cacheOptions)
remoteCache, err := cache.New(configWith11mTimeout, cacheOptions)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating cache")
}

// Use a context that is independent of the passed in context, so the cache doesn't get stopped
// when the passed in context is canceled.
cacheCtx, cacheCtxCancel := context.WithCancel(context.Background())
cacheCtx, cacheCtxCancel := context.WithCancel(cacheCtx)

// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
cache := &stoppableCache{
Expand Down
8 changes: 4 additions & 4 deletions controllers/clustercache/cluster_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestConnect(t *testing.T) {
Indexes: []CacheOptionsIndex{NodeProviderIDIndex},
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

// Connect when kubeconfig Secret doesn't exist (should fail)
err := accessor.Connect(ctx)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestDisconnect(t *testing.T) {
Timeout: 10 * time.Second,
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

// Connect (so we can disconnect afterward)
g.Expect(accessor.Connect(ctx)).To(Succeed())
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestHealthCheck(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

accessor := newClusterAccessor(clusterKey, &clusterAccessorConfig{
accessor := newClusterAccessor(context.Background(), clusterKey, &clusterAccessorConfig{
HealthProbe: &clusterAccessorHealthProbeConfig{
Timeout: 5 * time.Second,
FailureThreshold: 5,
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestWatch(t *testing.T) {
Timeout: 10 * time.Second,
},
}, nil)
accessor := newClusterAccessor(clusterKey, config)
accessor := newClusterAccessor(context.Background(), clusterKey, config)

tw := &testWatcher{}
wi := WatcherOptions{
Expand Down
19 changes: 18 additions & 1 deletion controllers/clustercache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,14 @@ func SetupWithManager(ctx context.Context, mgr manager.Manager, options Options,
log.Info("Couldn't find controller Pod metadata, the ClusterCache will always access the cluster it is running on using the regular apiserver endpoint")
}

cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background())

cc := &clusterCache{
client: mgr.GetClient(),
clusterAccessorConfig: buildClusterAccessorConfig(mgr.GetScheme(), options, controllerPodMetadata),
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
cacheCtx: cacheCtx,
cacheCtxCancel: cacheCtxCancel,
}

err := ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -331,6 +335,12 @@ type clusterCache struct {
// This information is necessary so we can enqueue reconcile.Requests for reconcilers that
// got a cluster source via GetClusterSource.
clusterSources []clusterSource

// cacheCtx is passed to clusterAccessors to be used when starting caches.
cacheCtx context.Context //nolint:containedctx

// cacheCtxCancel is used during Shutdown to stop caches.
cacheCtxCancel context.CancelCauseFunc
}

// clusterSource stores the necessary information so we can enqueue reconcile.Requests for reconcilers that
Expand Down Expand Up @@ -523,7 +533,7 @@ func (cc *clusterCache) getOrCreateClusterAccessor(cluster client.ObjectKey) *cl

accessor, ok := cc.clusterAccessors[cluster]
if !ok {
accessor = newClusterAccessor(cluster, cc.clusterAccessorConfig)
accessor = newClusterAccessor(cc.cacheCtx, cluster, cc.clusterAccessorConfig)
cc.clusterAccessors[cluster] = accessor
}

Expand Down Expand Up @@ -656,6 +666,13 @@ func (cc *clusterCache) SetConnectionCreationRetryInterval(interval time.Duratio
cc.clusterAccessorConfig.ConnectionCreationRetryInterval = interval
}

// Shutdown can be used to shut down the ClusterCache in unit tests.
// This method should only be used for tests because it hasn't been designed for production usage
// in a manager (race conditions with manager shutdown etc.).
func (cc *clusterCache) Shutdown() {
cc.cacheCtxCancel(errors.New("ClusterCache is shutdown"))
}

func validateAndDefaultOptions(opts *Options) error {
if opts.SecretClient == nil {
return errors.New("options.SecretClient must be set")
Expand Down
2 changes: 2 additions & 0 deletions controllers/clustercache/cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestReconcile(t *testing.T) {
client: env.Manager.GetAPIReader(),
clusterAccessorConfig: accessorConfig,
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
cacheCtx: context.Background(),
}

// Add a Cluster source and start it (queue will be later used to verify the source works correctly)
Expand Down Expand Up @@ -537,6 +538,7 @@ func TestClusterCacheConcurrency(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
internalClusterCache, ok := cc.(*clusterCache)
g.Expect(ok).To(BeTrue())
defer internalClusterCache.Shutdown()

// Generate test clusters.
testClusters := generateTestClusters(clusterCount, brokenClusterPercentage)
Expand Down
1 change: 1 addition & 0 deletions controlplane/kubeadm/internal/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func TestGetWorkloadCluster(t *testing.T) {
},
}, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)})
g.Expect(err).ToNot(HaveOccurred())
defer clusterCache.(interface{ Shutdown() }).Shutdown()

m := Management{
Client: env.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions exp/addons/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

reconciler := ClusterResourceSetReconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions exp/internal/controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&MachinePoolReconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/cluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestGetNode(t *testing.T) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
defer clusterCache.(interface{ Shutdown() }).Shutdown()

r := &Reconciler{
ClusterCache: clusterCache,
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machinedeployment/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&machinecontroller.Reconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machinehealthcheck/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
// only retried every 30s. If we get unlucky tests are then failing with timeout.
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/machineset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&Reconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 4 additions & 0 deletions internal/controllers/topology/cluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func TestMain(m *testing.M) {
if err != nil {
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
}
go func() {
<-ctx.Done()
clusterCache.(interface{ Shutdown() }).Shutdown()
}()

if err := (&Reconciler{
Client: mgr.GetClient(),
Expand Down
6 changes: 3 additions & 3 deletions internal/test/envtest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func Run(ctx context.Context, input RunInput) int {
// Bootstrapping test environment
env := newEnvironment(input.ManagerUncachedObjs...)

ctx, cancel := context.WithCancel(ctx)
env.cancelManager = cancel

if input.SetupIndexes != nil {
input.SetupIndexes(ctx, env.Manager)
}
Expand Down Expand Up @@ -376,9 +379,6 @@ func newEnvironment(uncachedObjs ...client.Object) *Environment {

// start starts the manager.
func (e *Environment) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
e.cancelManager = cancel

go func() {
fmt.Println("Starting the test environment manager")
if err := e.Manager.Start(ctx); err != nil {
Expand Down