Skip to content

Commit

Permalink
Introduce ProvisioningRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Dec 14, 2023
1 parent 8f75e9c commit 4f1ae0b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (o *ScaleUpOrchestrator) Initialize(
// and in sync with instance groups.
func (o *ScaleUpOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
provreqPods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func TestBinpackingLimiter(t *testing.T) {
expander := NewMockRepotingStrategy(t, nil)
context.ExpanderStrategy = expander

scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1178,7 +1178,7 @@ func TestScaleUpNoHelp(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -1400,7 +1400,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1462,7 +1462,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1517,7 +1517,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nil, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Orchestrator interface {
// an unexpected error occurred. Assumes that all nodes in the cluster are ready
// and in sync with instance groups.
ScaleUp(
unschedulablePods []*apiv1.Pod,
unschedulablePods, provreqPods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Warningf("Failed to process unschedulable pods: %v", err)
}

unschedulablePodsToHelp, provreqPodsToHelp, _ := a.processors.ProvisioningRequestProcessor.Process(a.AutoscalingContext, unschedulablePodsToHelp)

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

Expand Down Expand Up @@ -542,7 +544,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return false, nil
}

if len(unschedulablePodsToHelp) == 0 {
if len(unschedulablePodsToHelp)+len(provreqPodsToHelp) == 0 {
scaleUpStatus.Result = status.ScaleUpNotNeeded
klog.V(1).Info("No unschedulable pods")
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
Expand All @@ -558,7 +560,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, provreqPodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
Expand All @@ -37,6 +38,8 @@ import (
type AutoscalingProcessors struct {
// PodListProcessor is used to process list of unschedulable pods before autoscaling.
PodListProcessor pods.PodListProcessor
// ProvisioningRequestProcessor is used to process list of unschedulable pods and provisioning requests before autoscaling.
ProvisioningRequestProcessor provreq.ProvisioningRequestProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor
// BinpackingLimiter processes expansion options to stop binpacking early.
Expand Down Expand Up @@ -72,9 +75,10 @@ type AutoscalingProcessors struct {
// DefaultProcessors returns default set of processors.
func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(),
PodListProcessor: pods.NewDefaultPodListProcessor(),
ProvisioningRequestProcessor: provreq.NewDefaultProvisioningRequestProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{
MaxAllocatableDifferenceRatio: config.DefaultMaxAllocatableDifferenceRatio,
MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)

type ProvisioningRequestProcessor interface {
// Process processes lists of unschedulable pods and inject pods that consume provisioning requests.
Process(*context.AutoscalingContext, []*apiv1.Pod) ([]*apiv1.Pod, []*apiv1.Pod, error)
// CleanUp cleans up the processor's internal structures.
CleanUp()
}

// NoOpPodListProcessor is returning pod lists without processing them.
type NoOpProvisioningRequestProcessor struct {
}

// NewDefaultPodListProcessor creates an instance of PodListProcessor.
func NewDefaultProvisioningRequestProcessor() ProvisioningRequestProcessor {
return &NoOpProvisioningRequestProcessor{}
}

// Process processes lists of unschedulable and scheduled pods before scaling of the cluster.
func (p *NoOpProvisioningRequestProcessor) Process(
context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, []*apiv1.Pod, error) {
return unschedulablePods, nil, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *NoOpProvisioningRequestProcessor) CleanUp() {
}

0 comments on commit 4f1ae0b

Please sign in to comment.