Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing for consolidation budgets e2e #5525

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 89 additions & 10 deletions test/pkg/environment/common/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"fmt"
"io"
"math"
"strings"
"time"

Expand Down Expand Up @@ -468,25 +469,44 @@ func (env *Environment) ExpectCreatedNodeCount(comparator string, count int) []*
return createdNodes
}

func (env *Environment) ExpectNodeCount(comparator string, count int) {
GinkgoHelper()

nodeList := &v1.NodeList{}
Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
Expect(len(nodeList.Items)).To(BeNumerically(comparator, count))
}

func (env *Environment) ExpectNodeClaimCount(comparator string, count int) {
GinkgoHelper()

nodeClaimList := &corev1beta1.NodeClaimList{}
Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
Expect(len(nodeClaimList.Items)).To(BeNumerically(comparator, count))
}

func NodeNames(nodes []*v1.Node) []string {
return lo.Map(nodes, func(n *v1.Node, index int) string {
return n.Name
})
}

func (env *Environment) ConsistentlyExpectNodeCount(comparator string, count int, duration string) []*v1.Node {
func (env *Environment) ConsistentlyExpectNodeCount(comparator string, count int, duration time.Duration) []*v1.Node {
GinkgoHelper()
By(fmt.Sprintf("expecting nodes to be %s to %d for %s", comparator, count, duration))
nodeList := &v1.NodeList{}
Consistently(func(g Gomega) {
g.Expect(env.Client.List(env, nodeList, client.HasLabels{test.DiscoveryLabel})).To(Succeed())
g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count),
fmt.Sprintf("expected %d nodes, had %d (%v) for %s", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items)), duration))
}, duration).Should(Succeed())
}, duration.String()).Should(Succeed())
return lo.ToSlicePtr(nodeList.Items)
}

func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration string) {
// ConsistentlyExpectNoDisruptions ensures that the state of the cluster is not changed within a passed duration
// Specifically, we check if the cluster size in terms of nodes is the same as the passed-in size and we validate
// that no disrupting taints are added throughout the window
func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration time.Duration) {
GinkgoHelper()
Consistently(func(g Gomega) {
// Ensure we don't change our NodeClaims
Expand All @@ -504,7 +524,20 @@ func (env *Environment) ConsistentlyExpectNoDisruptions(nodeCount int, duration
})
g.Expect(ok).To(BeFalse())
}
}, duration).Should(Succeed())
}, duration.String()).Should(Succeed())
}

func (env *Environment) ConsistentlyExpectTaintedNodeCount(comparator string, count int, duration time.Duration) []*v1.Node {
GinkgoHelper()

By(fmt.Sprintf("checking for tainted nodes to be %s to %d for %s", comparator, count, duration))
nodeList := &v1.NodeList{}
Consistently(func(g Gomega) {
g.Expect(env.Client.List(env, nodeList, client.MatchingFields{"spec.taints[*].karpenter.sh/disruption": "disrupting"})).To(Succeed())
g.Expect(len(nodeList.Items)).To(BeNumerically(comparator, count),
fmt.Sprintf("expected %d tainted nodes, had %d (%v)", count, len(nodeList.Items), NodeNames(lo.ToSlicePtr(nodeList.Items))))
}, duration.String()).Should(Succeed())
return lo.ToSlicePtr(nodeList.Items)
}

func (env *Environment) EventuallyExpectTaintedNodeCount(comparator string, count int) []*v1.Node {
Expand Down Expand Up @@ -751,17 +784,63 @@ func (env *Environment) ExpectDaemonSetEnvironmentVariableUpdated(obj client.Obj
Expect(env.Client.Patch(env.Context, ds, patch)).To(Succeed())
}

func (env *Environment) ExpectHealthyPodsForNode(nodeName string) []*v1.Pod {
// ForcePodsToSpread ensures that currently scheduled pods get spread evenly across all passed nodes by deleting pods off of existing
// nodes and waiting them to reschedule. This is useful for scenarios where you want to force the nodes be underutilized
// but you want to keep a consistent count of nodes rather than leaving around empty ones.
func (env *Environment) ForcePodsToSpread(nodes ...*v1.Node) {
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
GinkgoHelper()

// Get the total count of pods across
podCount := 0
for _, n := range nodes {
podCount += len(env.ExpectActivePodsForNode(n.Name))
}
maxPodsPerNode := int(math.Ceil(float64(podCount) / float64(len(nodes))))

By(fmt.Sprintf("forcing %d pods to spread across %d nodes", podCount, len(nodes)))
start := time.Now()
for {
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
var nodePods []*v1.Pod
node, found := lo.Find(nodes, func(n *v1.Node) bool {
nodePods = env.ExpectActivePodsForNode(n.Name)
return len(nodePods) > maxPodsPerNode
})
if !found {
break
}
// Set the nodes to unschedulable so that the pods won't reschedule.
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
stored := node.DeepCopy()
node.Spec.Unschedulable = true
Expect(env.Client.Patch(env.Context, node, client.MergeFrom(stored))).To(Succeed())
for _, pod := range nodePods[maxPodsPerNode:] {
env.ExpectDeleted(pod)
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
}
Eventually(func(g Gomega) {
g.Expect(len(env.ExpectActivePodsForNode(node.Name))).To(Or(Equal(maxPodsPerNode), Equal(maxPodsPerNode-1)))
}).WithTimeout(5 * time.Second).Should(Succeed())

// TODO: Consider moving this time check to an Eventually poll. This gets a little tricker with helper functions
// since you need to make sure that your Expectation helper functions are scoped to to your "g Gomega" scope
// so that you don't fail the first time you get a failure on your expectation
if time.Since(start) > time.Minute*15 {
Fail("forcing pods to spread failed due to a timeout")
}
}
for _, n := range nodes {
stored := n.DeepCopy()
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
n.Spec.Unschedulable = false
Expect(env.Client.Patch(env.Context, n, client.MergeFrom(stored))).To(Succeed())
}
}

func (env *Environment) ExpectActivePodsForNode(nodeName string) []*v1.Pod {
GinkgoHelper()
podList := &v1.PodList{}
Expect(env.Client.List(env, podList, client.MatchingFields{"spec.nodeName": nodeName}, client.HasLabels{test.DiscoveryLabel})).To(Succeed())

// Return the healthy pods
return lo.Filter(lo.ToSlicePtr(podList.Items), func(p *v1.Pod, _ int) bool {
_, found := lo.Find(p.Status.Conditions, func(cond v1.PodCondition) bool {
return cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue
})
return found
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
return p.DeletionTimestamp.IsZero()
})
}

Expand Down
220 changes: 220 additions & 0 deletions test/suites/consolidation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1beta1 "sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/test"
Expand Down Expand Up @@ -63,6 +65,224 @@ var _ = AfterEach(func() { env.Cleanup() })
var _ = AfterEach(func() { env.AfterEach() })

var _ = Describe("Consolidation", func() {
Context("Budgets", func() {
var nodePool *corev1beta1.NodePool
var dep *appsv1.Deployment
var selector labels.Selector
var numPods int32
BeforeEach(func() {
nodePool = env.DefaultNodePool(nodeClass)
nodePool.Spec.Disruption.ConsolidateAfter = nil

numPods = 5
dep = test.Deployment(test.DeploymentOptions{
Replicas: numPods,
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "regular-app"},
},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
},
},
})
selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
})
It("should respect budgets for empty delete consolidation", func() {
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{
{
Nodes: "40%",
},
}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
nodes := env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

By("adding finalizers to the nodes to prevent termination")
for _, node := range nodes {
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
node.Finalizers = append(node.Finalizers, common.TestingFinalizer)
env.ExpectUpdated(node)
}

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

// Ensure that we get two nodes tainted, and they have overlap during the drift
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

// Remove the finalizer from each node so that we can terminate
for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}

// After the deletion timestamp is set and all pods are drained
// the node should be gone
env.EventuallyExpectNotFound(nodes[0], nodes[1])

// This check ensures that we are consolidating nodes at the same time
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}
env.EventuallyExpectNotFound(nodes[0], nodes[1])

// Expect there to only be one node remaining for the last replica
env.ExpectNodeCount("==", 1)
})
It("should respect budgets for non-empty delete consolidation", func() {
// This test will hold consolidation until we are ready to execute it
nodePool.Spec.Disruption.ConsolidateAfter = &corev1beta1.NillableDuration{}

nodePool = test.ReplaceRequirements(nodePool,
v1.NodeSelectorRequirement{
Key: v1beta1.LabelInstanceSize,
Operator: v1.NodeSelectorOpIn,
Values: []string{"2xlarge"},
},
)
// We're expecting to create 3 nodes, so we'll expect to see at most 2 nodes deleting at one time.
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "50%",
}}
numPods = 9
dep = test.Deployment(test.DeploymentOptions{
Replicas: numPods,
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "large-app"},
},
// Each 2xlarge has 8 cpu, so each node should fit no more than 3 pods.
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2100m"),
},
},
},
})
selector = labels.SelectorFromSet(dep.Spec.Selector.MatchLabels)
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 3)
nodes := env.EventuallyExpectCreatedNodeCount("==", 3)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

By("scaling down the deployment")
// Update the deployment to a third of the replicas.
dep.Spec.Replicas = lo.ToPtr[int32](3)
env.ExpectUpdated(dep)

env.ForcePodsToSpread(nodes...)
env.EventuallyExpectHealthyPodCount(selector, 3)

By("cordoning and adding finalizer to the nodes")
// Add a finalizer to each node so that we can stop termination disruptions
for _, node := range nodes {
Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed())
node.Finalizers = append(node.Finalizers, common.TestingFinalizer)
env.ExpectUpdated(node)
}

By("enabling consolidation")
nodePool.Spec.Disruption.ConsolidateAfter = nil
env.ExpectUpdated(nodePool)

// Ensure that we get two nodes tainted, and they have overlap during the drift
env.EventuallyExpectTaintedNodeCount("==", 2)
nodes = env.ConsistentlyExpectTaintedNodeCount("==", 2, time.Second*5)

for _, node := range nodes {
Expect(env.ExpectTestingFinalizerRemoved(node)).To(Succeed())
}
env.EventuallyExpectNotFound(nodes[0], nodes[1])
env.ExpectNodeCount("==", 1)
})
It("should not allow consolidation if the budget is fully blocking", func() {
// We're going to define a budget that doesn't allow any consolidation to happen
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "0",
}}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

env.ConsistentlyExpectNoDisruptions(5, time.Minute)
})
It("should not allow consolidation if the budget is fully blocking during a scheduled time", func() {
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
// We're going to define a budget that doesn't allow any drift to happen
// This is going to be on a schedule that only lasts 30 minutes, whose window starts 15 minutes before
// the current time and extends 15 minutes past the current time
// Times need to be in UTC since the karpenter containers were built in UTC time
windowStart := time.Now().Add(-time.Minute * 15).UTC()
nodePool.Spec.Disruption.Budgets = []corev1beta1.Budget{{
Nodes: "0",
Schedule: lo.ToPtr(fmt.Sprintf("%d %d * * *", windowStart.Minute(), windowStart.Hour())),
Duration: &metav1.Duration{Duration: time.Minute * 30},
}}

// Hostname anti-affinity to require one pod on each node
dep.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: dep.Spec.Selector,
TopologyKey: v1.LabelHostname,
},
},
},
}
env.ExpectCreated(nodeClass, nodePool, dep)

env.EventuallyExpectCreatedNodeClaimCount("==", 5)
env.EventuallyExpectCreatedNodeCount("==", 5)
env.EventuallyExpectHealthyPodCount(selector, int(numPods))

dep.Spec.Replicas = lo.ToPtr[int32](1)
By("making the nodes empty")
// Update the deployment to only contain 1 replica.
env.ExpectUpdated(dep)

env.ConsistentlyExpectNoDisruptions(5, time.Minute)
})
})
DescribeTable("should consolidate nodes (delete)", Label(debug.NoWatch), Label(debug.NoEvents),
func(spotToSpot bool) {
nodePool := test.NodePool(corev1beta1.NodePool{
Expand Down
Loading
Loading