From 03ff085fd849b6c5d71017671f3b62363ffb1bf0 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Mon, 19 Aug 2024 11:05:33 +0000 Subject: [PATCH] Add backoff mechanism for ProvReq retry --- .../processors/provreq/injector.go | 42 ++++++++++++++----- .../processors/provreq/injector_test.go | 5 ++- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 538563d24cbe..f4e047a094c6 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -36,13 +36,17 @@ import ( ) const ( - defaultRetryTime = 10 * time.Minute + defaultRetryTime = 1 * time.Minute + maxBackoffTime = 10 * time.Minute + // TODO: replace with timeout for element rather than max size of cache. + maxCacheSize = 1000 ) // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client *provreqclient.ProvisioningRequestClient - clock clock.PassiveClock + clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient + backoffDuration map[string]time.Duration } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -53,7 +57,15 @@ func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreq } provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned) if provisioned != nil { - if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) { + if provisioned.Status == metav1.ConditionFalse { + return true + } + retryTime, found := p.backoffDuration[key(pr)] + if !found { + retryTime = defaultRetryTime + } + if provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) { + p.backoffDuration[key(pr)] = max(2*retryTime, maxBackoffTime) return true } return false @@ -83,33 +95,39 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool, ) ([]*apiv1.Pod, error) { + if len(p.backoffDuration) >= maxCacheSize { + p.backoffDuration = make(map[string]time.Duration) + } provReqs, err := p.client.ProvisioningRequests() if err != nil { return nil, err } - for _, pr := range provReqs { if !isSupportedClass(pr) { + klog.Warningf("Provisioning Class %s is not supported", pr.Spec.ProvisioningClassName) + continue + } + conditions := pr.Status.Conditions + if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) { + delete(p.backoffDuration, key(pr)) continue } - //TODO(yaroslava): support exponential backoff // Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime if !p.IsAvailableForProvisioning(pr) { continue } - - podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr) + + provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) if err != nil { klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) continue } - if err := p.MarkAsAccepted(pr); err != nil { continue } - return podsFromProvReq, nil + return provreqpods, nil } return nil, nil } @@ -143,3 +161,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListPr } return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil } + +func key(pr *provreqwrapper.ProvisioningRequest) string { + return string(pr.UID) +} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 533c2b979bdf..dbcdc7ec56ac 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -31,7 +31,7 @@ import ( func TestProvisioningRequestPodsInjector(t *testing.T) { now := time.Now() - minAgo := now.Add(-1 * time.Minute) + minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second) hourAgo := now.Add(-1 * time.Hour) accepted := metav1.Condition{ @@ -124,7 +124,8 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { } for _, tc := range testCases { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)} + backoffTime := map[string]time.Duration{key(notProvisionedRecentlyProvReqB): 2 * time.Minute} + injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), client, backoffTime} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)