Skip to content

Commit

Permalink
limit number of updated ProvisioningRequest in one loop
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 7, 2024
1 parent 5ba9f48 commit 7ef5d0e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}

if err := a.processors.ProvisioningRequestProcessor.Process(nil); err != nil {
if err := a.processors.ProvisioningRequestProcessor.Process(); err != nil {
klog.Errorf("Failed to process ProvisioningRequests, err: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provReqProcessor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
provReqProcessor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningClassProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (

// ProvisioningRequestProcessor process ProvisionignRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process() error
CleanUp()
}

// ProvisioningClassProcessor process ProvisionignRequests in the cluster.
type ProvisioningClassProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest) error
CleanUp()
}
Expand All @@ -38,7 +44,7 @@ func NewDefaultProvisioningRequestProcessor() ProvisioningRequestProcessor {
}

// Process processes lists of unschedulable and scheduled pods before scaling of the cluster.
func (p *NoOpProvisioningRequestProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) error {
func (p *NoOpProvisioningRequestProcessor) Process() error {
return nil
}

Expand All @@ -51,14 +57,14 @@ type provisioningRequestClient interface {
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor contains ProvisioningRequestProcessor for each ProvisioningClass.
// CombinedProvReqProcessor contains ProvisioningClassProcessor for each ProvisioningClass.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
processors []ProvisioningClassProcessor
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (ProvisioningRequestProcessor, error) {
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningClassProcessor) (ProvisioningRequestProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
Expand All @@ -67,7 +73,7 @@ func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []Provision
}

// Process iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Process(_ []*provreqwrapper.ProvisioningRequest) error {
func (cp *CombinedProvReqProcessor) Process() error {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
return err
Expand Down
40 changes: 30 additions & 10 deletions cluster-autoscaler/provisioningrequest/checkcapacity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,63 @@ import (
const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
// maxFailed is a limit for Failed ProvisioningRequest in one loop.
maxFailed = 20
// defaultMaxUpdated is a limit for ProvisioningRequest to update conditions in one ClusterAutoscaler loop.
defaultMaxUpdated = 20
)

type checkCapacityProcessor struct {
now func() time.Time
now func() time.Time
maxUpdated int
}

// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass.
func NewCheckCapacityProcessor() *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now}
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated}
}

// Process iterates over ProvisioningRequests and apply:
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) error {
failed := 0
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
if len(expiredProvReq) >= p.maxUpdated {
break
}
conditions := provReq.Conditions()
if apimeta.IsStatusConditionTrue(conditions, v1beta1.BookingExpired) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) {
if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity ||
apimeta.IsStatusConditionTrue(conditions, v1beta1.BookingExpired) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) {
continue
}
provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)
if provisioned != nil && provisioned.Status == metav1.ConditionTrue {
if provisioned.LastTransitionTime.Add(defaultReservationTime).Before(p.now()) {
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
expiredProvReq = append(expiredProvReq, provReq)
}
} else if failed < maxFailed {
} else if len(failedProvReq) < p.maxUpdated-len(expiredProvReq) {
created := provReq.CreationTimestamp()
if created.Add(defaultExpirationTime).Before(p.now()) {
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
failed++
failedProvReq = append(failedProvReq, provReq)
}
}
}
updated := 0
for _, provReq := range expiredProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,23 @@ func TestProcess(t *testing.T) {
pr := provreqclient.ProvisioningRequestWrapperTests("namespace", "name-1")
pr.V1Beta1().Status.Conditions = test.conditions
pr.V1Beta1().CreationTimestamp = metav1.NewTime(test.creationTime)
processor := checkCapacityProcessor{func() time.Time { return now }}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr})
additionalPr := provreqclient.ProvisioningRequestWrapperTests("namespace", "additional")
additionalPr.V1Beta1().CreationTimestamp = metav1.NewTime(weekAgo)
processor := checkCapacityProcessor{func() time.Time { return now }, 1}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Conditions())
if len(test.conditions) == len(test.wantConditions) {
assert.ElementsMatch(t, []metav1.Condition{
{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
Reason: ExpiredReason,
Message: ExpiredMsg,
},
}, additionalPr.Conditions())
} else {
assert.ElementsMatch(t, []metav1.Condition{}, additionalPr.Conditions())
}
}
}

0 comments on commit 7ef5d0e

Please sign in to comment.