Skip to content

Commit

Permalink
Add ProvisioningRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 7, 2024
1 parent ed6ebbe commit 5ba9f48
Show file tree
Hide file tree
Showing 17 changed files with 404 additions and 102 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}

if err := a.processors.ProvisioningRequestProcessor.Process(nil); err != nil {
klog.Errorf("Failed to process ProvisioningRequests, err: %v", err)
}

unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)

if err != nil {
Expand Down
24 changes: 13 additions & 11 deletions cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfos"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
Expand Down Expand Up @@ -184,17 +185,18 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
nodes.NewAtomicResizeFilteringProcessor(),
}),
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ProvisioningRequestProcessor: provreq.NewDefaultProvisioningRequestProcessor(),
}
}

Expand Down
25 changes: 15 additions & 10 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand Down Expand Up @@ -469,15 +470,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -487,14 +479,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
ScaleUpOrchestrator: orchestrator.New(),
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
if err != nil {
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provReqProcessor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
return nil, err
}
opts.Processors.ProvisioningRequestProcessor = provReqProcessor
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
25 changes: 15 additions & 10 deletions cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
Expand Down Expand Up @@ -74,6 +75,8 @@ type AutoscalingProcessors struct {
// * scale-up failures per nodegroup
// * scale-down failures per nodegroup
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
//ProvisioningRequestProcessor is used to update conditions/state of ProvisioningRequests.
ProvisioningRequestProcessor provreq.ProvisioningRequestProcessor
}

// DefaultProcessors returns default set of processors.
Expand All @@ -95,16 +98,17 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
nodes.NewAtomicResizeFilteringProcessor(),
},
),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ProvisioningRequestProcessor: provreq.NewDefaultProvisioningRequestProcessor(),
}
}

Expand All @@ -124,4 +128,5 @@ func (ap *AutoscalingProcessors) CleanUp() {
ap.CustomResourcesProcessor.CleanUp()
ap.TemplateNodeInfoProvider.CleanUp()
ap.ActionableClusterProcessor.CleanUp()
ap.ProvisioningRequestProcessor.CleanUp()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
)

// ProvisioningRequestProcessor process ProvisionignRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest) error
CleanUp()
}

// NoOpProvisioningRequestProcessor do nothing.
type NoOpProvisioningRequestProcessor struct {
}

// NewDefaultProvisioningRequestProcessor creates an instance of PodListProcessor.
func NewDefaultProvisioningRequestProcessor() ProvisioningRequestProcessor {
return &NoOpProvisioningRequestProcessor{}
}

// Process processes lists of unschedulable and scheduled pods before scaling of the cluster.
func (p *NoOpProvisioningRequestProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) error {
return nil
}

// CleanUp cleans up the processor's internal structures.
func (p *NoOpProvisioningRequestProcessor) CleanUp() {
}

type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor contains ProvisioningRequestProcessor for each ProvisioningClass.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (ProvisioningRequestProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
}

// Process iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Process(_ []*provreqwrapper.ProvisioningRequest) error {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
return err
}
for _, p := range cp.processors {
if err := p.Process(provReqs); err != nil {
return err
}
}
return nil
}

// CleanUp cleans up internal state
func (cp *CombinedProvReqProcessor) CleanUp() {}
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,8 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// CapacityFound indicates that all of the requested resources were
// fount in the cluster.
CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
Expand Down
23 changes: 12 additions & 11 deletions cluster-autoscaler/provisioningrequest/checkcapacity/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@ limitations under the License.
package checkcapacity

import (
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)

const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)

const (
//CapacityIsNotFoundReason is added when capacity was not found in the cluster.
CapacityIsNotFoundReason = "CapacityIsNotFound"
//CapacityIsFoundReason is added when capacity was found in the cluster.
CapacityIsFoundReason = "CapacityIsFound"
//FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster.
FailedToBookCapacityReason = "FailedToBookCapacity"
//CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired.
CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired"
//CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired.
CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired"
//ExpiredReason is added if ProvisioningRequest is expired.
ExpiredReason = "Expired"
//ExpiredMsg is added if ProvisioningRequest is expired.
ExpiredMsg = "ProvisioningRequest is expired"
)

func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
Expand All @@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
for _, condition := range pr.Conditions() {
if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) {
return false
} else if checkConditionType(condition, v1beta1.CapacityFound) {
} else if checkConditionType(condition, v1beta1.Provisioned) {
book = true
}
}
return book
}

func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) {
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) {
var newConditions []v1.Condition
newCondition := v1.Condition{
Type: conditionType,
Status: conditionStatus,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
LastTransitionTime: now,
Reason: reason,
Message: message,
}
prevConditions := pr.Conditions()
switch conditionType {
case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed:
case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed:
conditionFound := false
for _, condition := range prevConditions {
if condition.Type == conditionType {
Expand Down
Loading

0 comments on commit 5ba9f48

Please sign in to comment.