Skip to content

Commit

Permalink
Merge pull request kubernetes#6667 from kisieland/refactor-estimation
Browse files Browse the repository at this point in the history
Refactor estimation
  • Loading branch information
k8s-ci-robot authored Apr 5, 2024
2 parents 609fb71 + 5aa6b2c commit 425b91e
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 224 deletions.
66 changes: 34 additions & 32 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

// Calculate expansion options
schedulablePods := map[string][]*apiv1.Pod{}
schedulablePodGroups := map[string][]estimator.PodEquivalenceGroup{}
var options []expander.Option

for _, nodeGroup := range validNodeGroups {
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, len(nodes)+len(upcomingNodes), now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
Expand Down Expand Up @@ -195,14 +195,14 @@ func (o *ScaleUpOrchestrator) ScaleUp(
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePods, podEquivalenceGroups, daemonSets)
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
if aErr != nil {
return scaleUpStatus, aErr
}
}

// Recompute similar node groups in case they need to be updated
bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePods, now)
bestOption.SimilarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePodGroups, now)
if bestOption.SimilarNodeGroups != nil {
// if similar node groups are found, log about them
similarNodeGroupIds := make([]string, 0)
Expand Down Expand Up @@ -440,28 +440,28 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
// ComputeExpansionOption computes expansion option based on pending pods and cluster state.
func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeGroup cloudprovider.NodeGroup,
schedulablePods map[string][]*apiv1.Pod,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
pods := schedulablePods[nodeGroup.Id()]
podGroups := schedulablePodGroups[nodeGroup.Id()]
nodeInfo := nodeInfos[nodeGroup.Id()]

if len(pods) == 0 {
if len(podGroups) == 0 {
return option
}

option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now)
option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePodGroups, now)

estimateStart := time.Now()
expansionEstimator := o.estimatorBuilder(
o.autoscalingContext.PredicateChecker,
o.autoscalingContext.ClusterSnapshot,
estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
)
option.NodeCount, option.Pods = expansionEstimator.Estimate(pods, nodeInfo, nodeGroup)
option.NodeCount, option.Pods = expansionEstimator.Estimate(podGroups, nodeInfo, nodeGroup)
metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart)

autoscalingOptions, err := nodeGroup.GetOptions(o.autoscalingContext.NodeGroupDefaults)
Expand All @@ -480,7 +480,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
func (o *ScaleUpOrchestrator) CreateNodeGroup(
initialOption *expander.Option,
nodeInfos map[string]*schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
Expand All @@ -503,16 +503,16 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
mainCreatedNodeInfo, aErr := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig)
if aErr == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
schedulablePods[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo)
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo)
} else {
klog.Warningf("Cannot build node info for newly created main node group %v; balancing similar node groups may not work; err=%v", createNodeGroupResult.MainCreatedNodeGroup.Id(), aErr)
// Use node info based on expansion candidate but update Id which likely changed when node group was created.
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = nodeInfos[oldId]
schedulablePods[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePods[oldId]
schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = schedulablePodGroups[oldId]
}
if oldId != createNodeGroupResult.MainCreatedNodeGroup.Id() {
delete(nodeInfos, oldId)
delete(schedulablePods, oldId)
delete(schedulablePodGroups, oldId)
}
for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
nodeInfo, aErr := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, o.taintConfig)
Expand All @@ -521,7 +521,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
continue
}
nodeInfos[nodeGroup.Id()] = nodeInfo
schedulablePods[nodeGroup.Id()] = o.SchedulablePods(podEquivalenceGroups, nodeGroup, nodeInfo)
schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfo)
}

// Update ClusterStateRegistry so similar nodegroups rebalancing works.
Expand All @@ -531,13 +531,13 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
return createNodeGroupResults, nil, nil
}

// SchedulablePods returns a list of pods that could be scheduled
// SchedulablePodGroups returns a list of pods that could be scheduled
// in a given node group after a scale up.
func (o *ScaleUpOrchestrator) SchedulablePods(
func (o *ScaleUpOrchestrator) SchedulablePodGroups(
podEquivalenceGroups []*equivalence.PodGroup,
nodeGroup cloudprovider.NodeGroup,
nodeInfo *schedulerframework.NodeInfo,
) []*apiv1.Pod {
) []estimator.PodEquivalenceGroup {
o.autoscalingContext.ClusterSnapshot.Fork()
defer o.autoscalingContext.ClusterSnapshot.Revert()

Expand All @@ -548,15 +548,17 @@ func (o *ScaleUpOrchestrator) SchedulablePods(
}
if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil {
klog.Errorf("Error while adding test Node: %v", err)
return []*apiv1.Pod{}
return []estimator.PodEquivalenceGroup{}
}

var schedulablePods []*apiv1.Pod
var schedulablePodGroups []estimator.PodEquivalenceGroup
for _, eg := range podEquivalenceGroups {
samplePod := eg.Pods[0]
if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
// Add pods to option.
schedulablePods = append(schedulablePods, eg.Pods...)
schedulablePodGroups = append(schedulablePodGroups, estimator.PodEquivalenceGroup{
Pods: eg.Pods,
})
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
} else {
Expand All @@ -568,7 +570,7 @@ func (o *ScaleUpOrchestrator) SchedulablePods(
}
}

return schedulablePods
return schedulablePodGroups
}

// UpcomingNodes returns a list of nodes that are not ready but should be.
Expand Down Expand Up @@ -652,7 +654,7 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou
func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
nodeGroup cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
schedulablePods map[string][]*apiv1.Pod,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
now time.Time,
) []cloudprovider.NodeGroup {
if !o.autoscalingContext.BalanceSimilarNodeGroups {
Expand All @@ -667,8 +669,8 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
return nil
}

groupSchedulablePods, found := schedulablePods[nodeGroup.Id()]
if !found || len(groupSchedulablePods) == 0 {
podGroups, found := schedulablePodGroups[nodeGroup.Id()]
if !found || len(podGroups) == 0 {
return nil
}

Expand All @@ -683,21 +685,21 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
// Non-existing node groups are created later so skip check for them.
if ng.Exist() && !o.clusterStateRegistry.NodeGroupScaleUpSafety(ng, now).SafeToScale {
klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id())
} else if similarSchedulablePods, found := schedulablePods[ng.Id()]; found && matchingSchedulablePods(groupSchedulablePods, similarSchedulablePods) {
} else if similarPodGroups, found := schedulablePodGroups[ng.Id()]; found && matchingSchedulablePodGroups(podGroups, similarPodGroups) {
validSimilarNodeGroups = append(validSimilarNodeGroups, ng)
}
}

return validSimilarNodeGroups
}

func matchingSchedulablePods(groupSchedulablePods []*apiv1.Pod, similarSchedulablePods []*apiv1.Pod) bool {
schedulablePods := make(map[*apiv1.Pod]bool)
for _, pod := range similarSchedulablePods {
schedulablePods[pod] = true
func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
schedulableSamplePods := make(map[*apiv1.Pod]bool)
for _, podGroup := range similarPodGroups {
schedulableSamplePods[podGroup.Exemplar()] = true
}
for _, pod := range groupSchedulablePods {
if _, found := schedulablePods[pod]; !found {
for _, podGroup := range podGroups {
if _, found := schedulableSamplePods[podGroup.Exemplar()]; !found {
return false
}
}
Expand Down
55 changes: 28 additions & 27 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

Expand Down Expand Up @@ -1208,17 +1209,17 @@ func (p *constNodeGroupSetProcessor) BalanceScaleUpBetweenGroups(_ *context.Auto
func (p *constNodeGroupSetProcessor) CleanUp() {}

func TestComputeSimilarNodeGroups(t *testing.T) {
pod1 := BuildTestPod("p1", 100, 1000)
pod2 := BuildTestPod("p2", 100, 1000)
pod3 := BuildTestPod("p3", 100, 1000)
podGroup1 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p1", 100, 1000)}}
podGroup2 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p2", 100, 1000)}}
podGroup3 := estimator.PodEquivalenceGroup{Pods: []*v1.Pod{BuildTestPod("p3", 100, 1000)}}

testCases := []struct {
name string
nodeGroup string
similarNodeGroups []string
otherNodeGroups []string
balancingEnabled bool
schedulablePods map[string][]*apiv1.Pod
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup
wantSimilarNodeGroups []string
}{
{
Expand All @@ -1242,12 +1243,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: false,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1},
"ng3": {pod1},
"pg1": {pod1},
"pg2": {pod1},
schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{
"ng1": {podGroup1},
"ng2": {podGroup1},
"ng3": {podGroup1},
"pg1": {podGroup1},
"pg2": {podGroup1},
},
wantSimilarNodeGroups: []string{},
},
Expand All @@ -1257,12 +1258,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1},
"ng3": {pod1},
"pg1": {pod1},
"pg2": {pod1},
schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{
"ng1": {podGroup1},
"ng2": {podGroup1},
"ng3": {podGroup1},
"pg1": {podGroup1},
"pg2": {podGroup1},
},
wantSimilarNodeGroups: []string{"ng2", "ng3"},
},
Expand All @@ -1272,12 +1273,12 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1},
"ng2": {pod1, pod2},
"ng3": {pod1, pod2, pod3},
"pg1": {pod1, pod2},
"pg2": {pod1, pod2, pod3},
schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{
"ng1": {podGroup1},
"ng2": {podGroup1, podGroup2},
"ng3": {podGroup1, podGroup2, podGroup3},
"pg1": {podGroup1, podGroup2},
"pg2": {podGroup1, podGroup2, podGroup3},
},
wantSimilarNodeGroups: []string{"ng2", "ng3"},
},
Expand All @@ -1287,10 +1288,10 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
similarNodeGroups: []string{"ng2", "ng3"},
otherNodeGroups: []string{"pg1", "pg2"},
balancingEnabled: true,
schedulablePods: map[string][]*apiv1.Pod{
"ng1": {pod1, pod2},
"ng2": {pod1},
"pg1": {pod1},
schedulablePodGroups: map[string][]estimator.PodEquivalenceGroup{
"ng1": {podGroup1, podGroup2},
"ng2": {podGroup1},
"pg1": {podGroup1},
},
wantSimilarNodeGroups: []string{},
},
Expand Down Expand Up @@ -1331,7 +1332,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {

suOrchestrator := &ScaleUpOrchestrator{}
suOrchestrator.Initialize(&ctx, &processors.AutoscalingProcessors{NodeGroupSetProcessor: nodeGroupSetProcessor}, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePods, now)
similarNodeGroups := suOrchestrator.ComputeSimilarNodeGroups(provider.GetNodeGroup(tc.nodeGroup), nodeInfos, tc.schedulablePodGroups, now)

var gotSimilarNodeGroups []string
for _, ng := range similarNodeGroups {
Expand Down
Loading

0 comments on commit 425b91e

Please sign in to comment.