Skip to content

Commit

Permalink
Add ProvisioningRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 12, 2024
1 parent ed6ebbe commit 9492b23
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 92 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand All @@ -53,6 +54,7 @@ type AutoscalerOptions struct {
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Processors *ca_processors.AutoscalingProcessors
LoopStartNotifier *loopstart.ObserversList
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
opts.LoopStartNotifier,
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
Expand All @@ -101,6 +104,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions)
}
if opts.LoopStartNotifier == nil {
opts.LoopStartNotifier = loopstart.NewObserversList(nil)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
Expand Down
13 changes: 13 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
Expand Down Expand Up @@ -90,6 +91,7 @@ type StaticAutoscaler struct {
scaleDownActuator scaledown.Actuator
scaleUpOrchestrator scaleup.Orchestrator
processors *ca_processors.AutoscalingProcessors
loopStartNotifier *loopstart.ObserversList
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
taintConfig taints.TaintConfig
Expand Down Expand Up @@ -136,6 +138,7 @@ func NewStaticAutoscaler(
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *context.AutoscalingKubeClients,
processors *ca_processors.AutoscalingProcessors,
loopStartNotifier *loopstart.ObserversList,
cloudProvider cloudprovider.CloudProvider,
expanderStrategy expander.Strategy,
estimatorBuilder estimator.EstimatorBuilder,
Expand Down Expand Up @@ -206,6 +209,7 @@ func NewStaticAutoscaler(
scaleDownActuator: scaleDownActuator,
scaleUpOrchestrator: scaleUpOrchestrator,
processors: processors,
loopStartNotifier: loopStartNotifier,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
taintConfig: taintConfig,
Expand Down Expand Up @@ -336,6 +340,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
if a.loopStartNotifier != nil {
stop, err := a.loopStartNotifier.Refresh()
if stop {
return caerrors.ToAutoscalerError(caerrors.TransientError, err)
}
if err != nil {
klog.Errorf("Failed to refresh loopstart observers: %v", err)
}
}

// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
maxNodesCount := 0
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module k8s.io/autoscaler/cluster-autoscaler

go 1.21
go 1.21.3

require (
cloud.google.com/go/compute/metadata v0.2.3
Expand Down
26 changes: 16 additions & 10 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand All @@ -49,6 +50,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
Expand Down Expand Up @@ -469,15 +471,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -487,14 +480,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
ScaleUpOrchestrator: orchestrator.New(),
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
if err != nil {
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
51 changes: 51 additions & 0 deletions cluster-autoscaler/observers/loopstart/loopstart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package loopstart

import "errors"

// Observer interface is used to store object that needed to be refreshed in each CA loop.
// It returns error and a bool value whether the loop should be skipped.
type Observer interface {
Refresh() (bool, error)
}

// ObserversList interface is used to store objects that needed to be refreshed in each CA loop.
type ObserversList struct {
observers []Observer
}

// Refresh refresh observers each CA loop.
func (l *ObserversList) Refresh() (stopLoop bool, err error) {
var resultErr error
if l.observers == nil {
return false, nil
}
for _, observer := range l.observers {
stopLoop, err := observer.Refresh()
if stopLoop {
return true, nil
}
resultErr = errors.Join(resultErr, err)
}
return false, resultErr
}

// NewObserversList return new ObserversList.
func NewObserversList(observers []Observer) *ObserversList {
return &ObserversList{observers}
}
20 changes: 10 additions & 10 deletions cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
nodes.NewAtomicResizeFilteringProcessor(),
},
),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
)

// ProvisioningRequestProcessor process ProvisionignRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process([]*provreqwrapper.ProvisioningRequest) error
CleanUp()
}

type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor contains ProvisioningClassProcessor for each ProvisioningClass.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
processors []ProvisioningRequestProcessor
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
func (cp *CombinedProvReqProcessor) Refresh() (bool, error) {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
return true, err
}
for _, p := range cp.processors {
if err := p.Process(provReqs); err != nil {
return false, err
}
}
return false, nil
}

// CleanUp cleans up internal state
func (cp *CombinedProvReqProcessor) CleanUp() {}
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,8 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// CapacityFound indicates that all of the requested resources were
// fount in the cluster.
CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
Expand Down
23 changes: 12 additions & 11 deletions cluster-autoscaler/provisioningrequest/checkcapacity/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@ limitations under the License.
package checkcapacity

import (
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)

const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)

const (
//CapacityIsNotFoundReason is added when capacity was not found in the cluster.
CapacityIsNotFoundReason = "CapacityIsNotFound"
//CapacityIsFoundReason is added when capacity was found in the cluster.
CapacityIsFoundReason = "CapacityIsFound"
//FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster.
FailedToBookCapacityReason = "FailedToBookCapacity"
//CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired.
CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired"
//CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired.
CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired"
//ExpiredReason is added if ProvisioningRequest is expired.
ExpiredReason = "Expired"
//ExpiredMsg is added if ProvisioningRequest is expired.
ExpiredMsg = "ProvisioningRequest is expired"
)

func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
Expand All @@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
for _, condition := range pr.Conditions() {
if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) {
return false
} else if checkConditionType(condition, v1beta1.CapacityFound) {
} else if checkConditionType(condition, v1beta1.Provisioned) {
book = true
}
}
return book
}

func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) {
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) {
var newConditions []v1.Condition
newCondition := v1.Condition{
Type: conditionType,
Status: conditionStatus,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
LastTransitionTime: now,
Reason: reason,
Message: message,
}
prevConditions := pr.Conditions()
switch conditionType {
case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed:
case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed:
conditionFound := false
for _, condition := range prevConditions {
if condition.Type == conditionType {
Expand Down
Loading

0 comments on commit 9492b23

Please sign in to comment.