diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 25f71cf4eff3..cdb524739a6f 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/factory" + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -53,6 +54,7 @@ type AutoscalerOptions struct { ExpanderStrategy expander.Strategy EstimatorBuilder estimator.EstimatorBuilder Processors *ca_processors.AutoscalingProcessors + LoopStartNotifier *loopstart.ObserversList Backoff backoff.Backoff DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter RemainingPdbTracker pdb.RemainingPdbTracker @@ -84,6 +86,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor opts.ClusterSnapshot, opts.AutoscalingKubeClients, opts.Processors, + opts.LoopStartNotifier, opts.CloudProvider, opts.ExpanderStrategy, opts.EstimatorBuilder, @@ -101,6 +104,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers if opts.Processors == nil { opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions) } + if opts.LoopStartNotifier == nil { + opts.LoopStartNotifier = loopstart.NewObserversList(nil) + } if opts.AutoscalingKubeClients == nil { opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 471a341077f2..262723e1db64 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -47,6 +47,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator" @@ -90,6 +91,7 @@ type StaticAutoscaler struct { scaleDownActuator scaledown.Actuator scaleUpOrchestrator scaleup.Orchestrator processors *ca_processors.AutoscalingProcessors + loopStartNotifier *loopstart.ObserversList processorCallbacks *staticAutoscalerProcessorCallbacks initialized bool taintConfig taints.TaintConfig @@ -136,6 +138,7 @@ func NewStaticAutoscaler( clusterSnapshot clustersnapshot.ClusterSnapshot, autoscalingKubeClients *context.AutoscalingKubeClients, processors *ca_processors.AutoscalingProcessors, + loopStartNotifier *loopstart.ObserversList, cloudProvider cloudprovider.CloudProvider, expanderStrategy expander.Strategy, estimatorBuilder estimator.EstimatorBuilder, @@ -205,6 +208,7 @@ func NewStaticAutoscaler( scaleDownActuator: scaleDownActuator, scaleUpOrchestrator: scaleUpOrchestrator, processors: processors, + loopStartNotifier: loopStartNotifier, processorCallbacks: processorCallbacks, clusterStateRegistry: clusterStateRegistry, taintConfig: taintConfig, @@ -337,6 +341,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.Errorf("Failed to refresh cloud provider config: %v", err) return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err) } + a.loopStartNotifier.Refresh() // Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh maxNodesCount := 0 diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 2dfb88047603..6e31f5803930 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,7 @@ import ( . "k8s.io/autoscaler/cluster-autoscaler/core/test" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" @@ -281,6 +282,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -374,6 +376,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, initialized: true, } @@ -573,6 +576,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, initialized: true, } @@ -798,6 +802,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, initialized: true, } @@ -948,6 +953,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -1096,6 +1102,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { scaleDownActuator: sdActuator, scaleUpOrchestrator: suOrchestrator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -1224,6 +1231,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) scaleDownPlanner: sdPlanner, scaleDownActuator: sdActuator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -1322,6 +1330,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * scaleDownPlanner: sdPlanner, scaleDownActuator: sdActuator, processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -1427,6 +1436,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) { scaleDownPlanner: sdPlanner, scaleDownActuator: sdActuator, processors: NewTestProcessors(&context), + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } @@ -2023,6 +2033,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { scaleDownActuator: actuator, scaleDownPlanner: planner, processors: NewTestProcessors(&ctx), + loopStartNotifier: loopstart.NewObserversList(nil), processorCallbacks: processorCallbacks, } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d0480eb1f99a..d7aaf65b7416 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -49,6 +50,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" @@ -469,15 +471,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) - scaleUpOrchestrator := orchestrator.New() - if *provisioningRequestsEnabled { - kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient) - if err != nil { - return nil, err - } - } - opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(), @@ -487,14 +480,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter PredicateChecker: predicateChecker, DeleteOptions: deleteOptions, DrainabilityRules: drainabilityRules, - ScaleUpOrchestrator: scaleUpOrchestrator, + ScaleUpOrchestrator: orchestrator.New(), } opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker) + if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) + + restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig) + if err != nil { + return nil, err + } + opts.ScaleUpOrchestrator = scaleUpOrchestrator + provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) + if err != nil { + return nil, err + } + opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) } opts.Processors.PodListProcessor = podListProcessor scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} diff --git a/cluster-autoscaler/observers/loopstart/loopstart.go b/cluster-autoscaler/observers/loopstart/loopstart.go new file mode 100644 index 000000000000..3db99d181433 --- /dev/null +++ b/cluster-autoscaler/observers/loopstart/loopstart.go @@ -0,0 +1,40 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loopstart + +// Observer interface is used to store object that needed to be refreshed in each CA loop. +// It returns error and a bool value whether the loop should be skipped. +type Observer interface { + Refresh() +} + +// ObserversList interface is used to store objects that needed to be refreshed in each CA loop. +type ObserversList struct { + observers []Observer +} + +// Refresh refresh observers each CA loop. +func (l *ObserversList) Refresh() { + for _, observer := range l.observers { + observer.Refresh() + } +} + +// NewObserversList return new ObserversList. +func NewObserversList(observers []Observer) *ObserversList { + return &ObserversList{observers} +} diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors.go b/cluster-autoscaler/processors/provreq/provisioning_request_pods_filter.go similarity index 100% rename from cluster-autoscaler/processors/provreq/provisioning_request_processors.go rename to cluster-autoscaler/processors/provreq/provisioning_request_pods_filter.go diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go b/cluster-autoscaler/processors/provreq/provisioning_request_pods_filter_test.go similarity index 100% rename from cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go rename to cluster-autoscaler/processors/provreq/provisioning_request_pods_filter_test.go diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go new file mode 100644 index 000000000000..0085c026e365 --- /dev/null +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -0,0 +1,67 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provreq + +import ( + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +// ProvisioningRequestProcessor process ProvisioningRequests in the cluster. +type ProvisioningRequestProcessor interface { + Process([]*provreqwrapper.ProvisioningRequest) + CleanUp() +} + +type provisioningRequestClient interface { + ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) + ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) +} + +// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass +// every CA loop and updating conditions for expired ProvisioningRequests. +type CombinedProvReqProcessor struct { + client provisioningRequestClient + processors []ProvisioningRequestProcessor +} + +// NewCombinedProvReqProcessor return new CombinedProvReqProcessor. +func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) { + client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) + if err != nil { + return nil, err + } + return &CombinedProvReqProcessor{client: client, processors: processors}, nil +} + +// Refresh iterates over ProvisioningRequests and updates its conditions/state. +func (cp *CombinedProvReqProcessor) Refresh() { + provReqs, err := cp.client.ProvisioningRequests() + if err != nil { + klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err) + return + } + for _, p := range cp.processors { + p.Process(provReqs) + } +} + +// CleanUp cleans up internal state +func (cp *CombinedProvReqProcessor) CleanUp() {} diff --git a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go index 881b9ca29b4b..ee6b93e7d31e 100644 --- a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go +++ b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go @@ -175,11 +175,8 @@ type Detail string // The following constants list all currently available Conditions Type values. // See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition const ( - // CapacityFound indicates that all of the requested resources were - // fount in the cluster. - CapacityFound string = "CapacityFound" - // Expired indicates that the ProvisioningRequest had CapacityFound condition before - // and the reservation time is expired. + // BookingExpired indicates that the ProvisioningRequest had Provisioned condition before + // and capacity reservation time is expired. BookingExpired string = "BookingExpired" // Provisioned indicates that all of the requested resources were created // and are available in the cluster. CA will set this condition when the diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go index 896428371095..1638bdfb91e4 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go @@ -17,19 +17,12 @@ limitations under the License. package checkcapacity import ( - "time" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/klog/v2" ) -const ( - defaultReservationTime = 10 * time.Minute - defaultExpirationTime = 7 * 24 * time.Hour // 7 days -) - const ( //CapacityIsNotFoundReason is added when capacity was not found in the cluster. CapacityIsNotFoundReason = "CapacityIsNotFound" @@ -37,6 +30,14 @@ const ( CapacityIsFoundReason = "CapacityIsFound" //FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster. FailedToBookCapacityReason = "FailedToBookCapacity" + //CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired. + CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired" + //CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired. + CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired" + //ExpiredReason is added if ProvisioningRequest is expired. + ExpiredReason = "Expired" + //ExpiredMsg is added if ProvisioningRequest is expired. + ExpiredMsg = "ProvisioningRequest is expired" ) func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { @@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { for _, condition := range pr.Conditions() { if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) { return false - } else if checkConditionType(condition, v1beta1.CapacityFound) { + } else if checkConditionType(condition, v1beta1.Provisioned) { book = true } } return book } -func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) { +func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) { var newConditions []v1.Condition newCondition := v1.Condition{ Type: conditionType, Status: conditionStatus, ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(), - LastTransitionTime: v1.Now(), + LastTransitionTime: now, Reason: reason, Message: message, } prevConditions := pr.Conditions() switch conditionType { - case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed: + case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed: conditionFound := false for _, condition := range prevConditions { if condition.Type == conditionType { diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go index 5a8efe78a770..f95b410637f5 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go @@ -34,7 +34,7 @@ func TestBookCapacity(t *testing.T) { name: "BookingExpired", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -48,7 +48,7 @@ func TestBookCapacity(t *testing.T) { name: "Failed", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -66,7 +66,7 @@ func TestBookCapacity(t *testing.T) { name: "Capacity found and provisioned", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -80,7 +80,7 @@ func TestBookCapacity(t *testing.T) { name: "Capacity is not found", prConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionFalse, }, }, @@ -115,29 +115,29 @@ func TestSetCondition(t *testing.T) { want []v1.Condition }{ { - name: "CapacityFound added, empty conditions before", - newType: v1beta1.CapacityFound, + name: "Provisioned added, empty conditions before", + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, }, { - name: "CapacityFound updated", + name: "Provisioned updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionFalse, }, }, - newType: v1beta1.CapacityFound, + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -146,7 +146,7 @@ func TestSetCondition(t *testing.T) { name: "Failed added, non-empty conditions before", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -154,7 +154,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -163,28 +163,11 @@ func TestSetCondition(t *testing.T) { }, }, }, - { - name: "Provisioned condition type, conditions are not updated", - oldConditions: []v1.Condition{ - { - Type: v1beta1.CapacityFound, - Status: v1.ConditionTrue, - }, - }, - newType: v1beta1.Provisioned, - newStatus: v1.ConditionFalse, - want: []v1.Condition{ - { - Type: v1beta1.CapacityFound, - Status: v1.ConditionTrue, - }, - }, - }, { name: "Unknown condition status, conditions are updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -192,7 +175,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionUnknown, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, { @@ -205,7 +188,7 @@ func TestSetCondition(t *testing.T) { name: "Unknown condition type, conditions are not updated", oldConditions: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -213,7 +196,7 @@ func TestSetCondition(t *testing.T) { newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -233,19 +216,19 @@ func TestSetCondition(t *testing.T) { name: "Capacity found with unknown condition before", oldConditions: []v1.Condition{ { - Type: v1beta1.Provisioned, + Type: "unknown", Status: v1.ConditionTrue, }, }, - newType: v1beta1.CapacityFound, + newType: v1beta1.Provisioned, newStatus: v1.ConditionTrue, want: []v1.Condition{ { - Type: v1beta1.Provisioned, + Type: "unknown", Status: v1.ConditionTrue, }, { - Type: v1beta1.CapacityFound, + Type: v1beta1.Provisioned, Status: v1.ConditionTrue, }, }, @@ -259,7 +242,7 @@ func TestSetCondition(t *testing.T) { Conditions: test.oldConditions, }, }, nil) - setCondition(pr, test.newType, test.newStatus, "", "") + setCondition(pr, test.newType, test.newStatus, "", "", v1.Now()) got := pr.Conditions() if len(got) > 2 || len(got) != len(test.want) || got[0].Type != test.want[0].Type || got[0].Status != test.want[0].Status { t.Errorf("want %v, got: %v", test.want, got) diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go index c5709f8a07b6..b4766fb03085 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go @@ -127,7 +127,7 @@ func (o *provReqOrchestrator) bookCapacity() error { // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. // If there is an error, mark PR as invalid, because we won't be able to book capacity // for it anyway. - setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err)) + setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) continue } podsToCreate = append(podsToCreate, pods...) @@ -151,10 +151,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err } st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) if len(st) < len(unschedulablePods) || err != nil { - setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.") + setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) return false, err } - setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.") + setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now()) return true, nil } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go index 4fe874b96c2e..62ad7a9d017a 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go @@ -56,7 +56,7 @@ func TestScaleUp(t *testing.T) { newCpuProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newCpuProvReq", "5m", "5", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) newMemProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newMemProvReq", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) bookedCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) - bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.CapacityFound, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) + bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.Provisioned, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) expiredProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) expiredProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.BookingExpired, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) differentProvReqClass := provreqwrapper.BuildTestProvisioningRequest("ns", "differentProvReqClass", "1", "1", "", int32(5), false, time.Now(), v1beta1.ProvisioningClassAtomicScaleUp) diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go new file mode 100644 index 000000000000..95d29c7b867c --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go @@ -0,0 +1,91 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "time" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" +) + +const ( + defaultReservationTime = 10 * time.Minute + defaultExpirationTime = 7 * 24 * time.Hour // 7 days + // defaultMaxUpdated is a limit for ProvisioningRequest to update conditions in one ClusterAutoscaler loop. + defaultMaxUpdated = 20 +) + +type checkCapacityProcessor struct { + now func() time.Time + maxUpdated int +} + +// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass. +func NewCheckCapacityProcessor() *checkCapacityProcessor { + return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated} +} + +// Process iterates over ProvisioningRequests and apply: +// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired. +// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime. +// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest +func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) { + expiredProvReq := []*provreqwrapper.ProvisioningRequest{} + failedProvReq := []*provreqwrapper.ProvisioningRequest{} + for _, provReq := range provReqs { + if len(expiredProvReq) >= p.maxUpdated { + break + } + conditions := provReq.Conditions() + if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity || + apimeta.IsStatusConditionTrue(conditions, v1beta1.BookingExpired) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) { + continue + } + provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned) + if provisioned != nil && provisioned.Status == metav1.ConditionTrue { + if provisioned.LastTransitionTime.Add(defaultReservationTime).Before(p.now()) { + expiredProvReq = append(expiredProvReq, provReq) + } + } else if len(failedProvReq) < p.maxUpdated-len(expiredProvReq) { + created := provReq.CreationTimestamp() + if created.Add(defaultExpirationTime).Before(p.now()) { + failedProvReq = append(failedProvReq, provReq) + } + } + } + updated := 0 + for _, provReq := range expiredProvReq { + if updated >= p.maxUpdated { + break + } + setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now())) + updated++ + } + for _, provReq := range failedProvReq { + if updated >= p.maxUpdated { + break + } + setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now())) + updated++ + } +} + +// Cleanup cleans up internal state. +func (p *checkCapacityProcessor) CleanUp() {} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go new file mode 100644 index 000000000000..a225a3fa49d7 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" +) + +func TestProcess(t *testing.T) { + now := time.Now() + dayAgo := now.Add(-1 * 24 * time.Hour) + weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute) + + testCases := []struct { + name string + creationTime time.Time + conditions []metav1.Condition + wantConditions []metav1.Condition + }{ + { + name: "New ProvisioningRequest, empty conditions", + creationTime: now, + }, + { + name: "ProvisioningRequest with empty conditions, expired", + creationTime: weekAgo, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + }, + { + name: "ProvisioningRequest wasn't provisioned, expired", + creationTime: weekAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + }, + { + name: "BookingCapacity time is expired ", + creationTime: dayAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Provisioned, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + { + Type: v1beta1.BookingExpired, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: CapacityReservationTimeExpiredReason, + Message: CapacityReservationTimeExpiredMsg, + }, + }, + }, + { + name: "Failed ProvisioningRequest", + creationTime: dayAgo, + conditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: "Failed", + Message: "Failed", + }, + }, + wantConditions: []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(dayAgo), + Reason: "Failed", + Message: "Failed", + }, + }, + }, + } + for _, test := range testCases { + pr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "name-1") + pr.V1Beta1().Status.Conditions = test.conditions + pr.V1Beta1().CreationTimestamp = metav1.NewTime(test.creationTime) + pr.V1Beta1().Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity + additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") + additionalPr.V1Beta1().CreationTimestamp = metav1.NewTime(weekAgo) + additionalPr.V1Beta1().Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity + processor := checkCapacityProcessor{func() time.Time { return now }, 1} + processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) + assert.ElementsMatch(t, test.wantConditions, pr.Conditions()) + if len(test.conditions) == len(test.wantConditions) { + assert.ElementsMatch(t, []metav1.Condition{ + { + Type: v1beta1.Failed, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now), + Reason: ExpiredReason, + Message: ExpiredMsg, + }, + }, additionalPr.Conditions()) + } else { + assert.ElementsMatch(t, []metav1.Condition{}, additionalPr.Conditions()) + } + } +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index f11c5cd42555..74d9d958b0b8 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -41,15 +41,15 @@ const ( provisioningRequestClientCallTimeout = 4 * time.Second ) -// ProvisioningRequestClientV1beta1 represents client for v1beta1 ProvReq CRD. -type ProvisioningRequestClientV1beta1 struct { +// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD. +type ProvisioningRequestClient struct { client versioned.Interface provReqLister listers.ProvisioningRequestLister podTemplLister v1.PodTemplateLister } // NewProvisioningRequestClient configures and returns a provisioningRequestClient. -func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClientV1beta1, error) { +func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) { prClient, err := newPRClient(kubeConfig) if err != nil { return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err) @@ -70,7 +70,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest return nil, err } - return &ProvisioningRequestClientV1beta1{ + return &ProvisioningRequestClient{ client: prClient, provReqLister: provReqLister, podTemplLister: podTemplLister, @@ -78,7 +78,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest } // ProvisioningRequest gets a specific ProvisioningRequest CR. -func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { +func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name) if err != nil { return nil, err @@ -91,7 +91,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name s } // ProvisioningRequests gets all ProvisioningRequest CRs. -func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { +func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { v1Beta1PRs, err := c.provReqLister.List(labels.Everything()) if err != nil { return nil, fmt.Errorf("error fetching provisioningRequests: %w", err) @@ -108,7 +108,7 @@ func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwra } // FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request. -func (c *ProvisioningRequestClientV1beta1) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { +func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets)) for _, podSpec := range pr.Spec.PodSets { podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name) diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go index 333673ec51bb..3c6e72a23775 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go @@ -25,8 +25,8 @@ import ( ) func TestFetchPodTemplates(t *testing.T) { - pr1 := provisioningRequestBetaForTests("namespace", "name-1") - pr2 := provisioningRequestBetaForTests("namespace", "name-2") + pr1 := ProvisioningRequestWrapperForTesting("namespace", "name-1") + pr2 := ProvisioningRequestWrapperForTesting("namespace", "name-2") mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2} ctx := context.Background() diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go index b2c4a78a8380..7dea1e315447 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go @@ -35,7 +35,7 @@ import ( ) // NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests. -func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClientV1beta1 { +func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClient { t.Helper() provReqClient := fake.NewSimpleClientset() podTemplClient := fake_kubernetes.NewSimpleClientset() @@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ... if err != nil { t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err) } - return &ProvisioningRequestClientV1beta1{ + return &ProvisioningRequestClient{ client: provReqClient, provReqLister: provReqLister, podTemplLister: podTemplLister, @@ -83,7 +83,8 @@ func newFakePodTemplatesLister(t *testing.T, client kubernetes.Interface, channe return podTemplLister, nil } -func provisioningRequestBetaForTests(namespace, name string) *provreqwrapper.ProvisioningRequest { +// ProvisioningRequestWrapperForTesting mock ProvisioningRequest for tests. +func ProvisioningRequestWrapperForTesting(namespace, name string) *provreqwrapper.ProvisioningRequest { if namespace == "" { namespace = "default" }