Skip to content

Commit

Permalink
Add backoff mechanism for ProvReq retry
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Sep 9, 2024
1 parent cecb34c commit 03ff085
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
42 changes: 32 additions & 10 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ import (
)

const (
defaultRetryTime = 10 * time.Minute
defaultRetryTime = 1 * time.Minute
maxBackoffTime = 10 * time.Minute
// TODO: replace with timeout for element rather than max size of cache.
maxCacheSize = 1000
)

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
backoffDuration map[string]time.Duration
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand All @@ -53,7 +57,15 @@ func (p *ProvisioningRequestPodsInjector) IsAvailableForProvisioning(pr *provreq
}
provisioned := apimeta.FindStatusCondition(conditions, v1.Provisioned)
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
if provisioned.Status == metav1.ConditionFalse {
return true
}
retryTime, found := p.backoffDuration[key(pr)]
if !found {
retryTime = defaultRetryTime
}
if provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) {
p.backoffDuration[key(pr)] = max(2*retryTime, maxBackoffTime)
return true
}
return false
Expand Down Expand Up @@ -83,33 +95,39 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
isSupportedClass func(*provreqwrapper.ProvisioningRequest) bool,
) ([]*apiv1.Pod, error) {
if len(p.backoffDuration) >= maxCacheSize {
p.backoffDuration = make(map[string]time.Duration)
}
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}

for _, pr := range provReqs {
if !isSupportedClass(pr) {
klog.Warningf("Provisioning Class %s is not supported", pr.Spec.ProvisioningClassName)
continue
}
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1.Provisioned) {
delete(p.backoffDuration, key(pr))
continue
}

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
if !p.IsAvailableForProvisioning(pr) {
continue
}

podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
continue
}

if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return podsFromProvReq, nil
return provreqpods, nil
}
return nil, nil
}
Expand Down Expand Up @@ -143,3 +161,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListPr
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}

func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}
5 changes: 3 additions & 2 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Expand Down Expand Up @@ -124,7 +124,8 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
backoffTime := map[string]time.Duration{key(notProvisionedRecentlyProvReqB): 2 * time.Minute}
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), client, backoffTime}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down

0 comments on commit 03ff085

Please sign in to comment.