Skip to content

Commit

Permalink
test: Add e2e tests that ensure no 6+ minute delays for disrupted sta…
Browse files Browse the repository at this point in the history
…teful workloads
  • Loading branch information
AndrewSirenko committed Jul 31, 2024
1 parent ec4d0e1 commit 807b441
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
1 change: 1 addition & 0 deletions test/pkg/environment/common/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
CleanableObjects = []client.Object{
&corev1.Pod{},
&appsv1.Deployment{},
&appsv1.StatefulSet{},
&appsv1.DaemonSet{},
&policyv1.PodDisruptionBudget{},
&corev1.PersistentVolumeClaim{},
Expand Down
139 changes: 139 additions & 0 deletions test/suites/storage/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"fmt"
"strings"
"testing"
"time"

"k8s.io/apimachinery/pkg/labels"

awssdk "github.com/aws/aws-sdk-go/aws"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -236,6 +239,142 @@ var _ = Describe("Persistent Volumes", func() {
})
})

var _ = Describe("Stateful workloads", func() {
var numPods int
var persistentVolumeClaim *corev1.PersistentVolumeClaim
var storageClass *storagev1.StorageClass
var statefulSet *appsv1.StatefulSet
var selector labels.Selector
BeforeEach(func() {
// Ensure that the EBS driver is installed, or we can't run the test.
var ds appsv1.DaemonSet
if err := env.Client.Get(env.Context, client.ObjectKey{
Namespace: "kube-system",
Name: "ebs-csi-node",
}, &ds); err != nil {
if errors.IsNotFound(err) {
Skip(fmt.Sprintf("skipping StatefulSet test due to missing EBS driver %s", err))
} else {
Fail(fmt.Sprintf("determining EBS driver status, %s", err))
}
}

numPods = 1
subnets := env.GetSubnets(map[string]string{"karpenter.sh/discovery": env.ClusterName})
shuffledAZs := lo.Shuffle(lo.Keys(subnets))

storageClass = test.StorageClass(test.StorageClassOptions{
ObjectMeta: metav1.ObjectMeta{
Name: "test-storage-class",
},
Provisioner: awssdk.String("ebs.csi.aws.com"),
VolumeBindingMode: lo.ToPtr(storagev1.VolumeBindingWaitForFirstConsumer),
})

storageClass.AllowedTopologies = []corev1.TopologySelectorTerm{{
MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{{
Key: "topology.ebs.csi.aws.com/zone",
Values: []string{shuffledAZs[0]},
}},
}}

persistentVolumeClaim = test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
StorageClassName: &storageClass.Name,
})
statefulSet = test.StatefulSet(test.StatefulSetOptions{
Replicas: int32(numPods),
PodOptions: test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "my-app",
}},
},
})
// Ensure same volume is used across replica restarts.
statefulSet.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*persistentVolumeClaim}
// Ensure volume mounts to pod, so that we test that we avoid the 6+ minute force detach delay.
vm := corev1.VolumeMount{
Name: persistentVolumeClaim.Name,
MountPath: "/usr/share",
}
statefulSet.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{vm}
selector = labels.SelectorFromSet(statefulSet.Spec.Selector.MatchLabels)
})

It("should run on a new node without 6+ minute delays when disrupted", func() {
// EBS volume detach + attach should usually take ~20s. Extra time is to prevent flakes due to EC2 APIs.
forceDetachTimeout := 2 * time.Minute

env.ExpectCreated(nodeClass, nodePool, storageClass, statefulSet)
nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0]
node := env.EventuallyExpectCreatedNodeCount("==", 1)[0]
env.EventuallyExpectHealthyPodCount(selector, numPods)

env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration

// Delete original nodeClaim to get the original node deleted
env.ExpectDeleted(nodeClaim)

// Eventually the node will be tainted, which means its actively being disrupted
Eventually(func(g Gomega) {
g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed())
_, ok := lo.Find(node.Spec.Taints, func(t corev1.Taint) bool {
return karpv1.IsDisruptingTaint(t)
})
g.Expect(ok).To(BeTrue())
}).Should(Succeed())

env.EventuallyExpectCreatedNodeCount(">=", 1)

// Set the limit to 0 to make sure we don't continue to create nodeClaims.
// This is CRITICAL since it prevents leaking node resources into subsequent tests
nodePool.Spec.Limits = karpv1.Limits{
corev1.ResourceCPU: resource.MustParse("0"),
}
env.ExpectUpdated(nodePool)

// After the deletion timestamp is set and all pods are drained the node should be gone.
env.EventuallyExpectNotFound(nodeClaim, node)

// We expect the stateful workload to become healthy on new node before the 6-minute force detach timeout.
// We start timer after pod binds to node because volume attachment happens during ContainerCreating
env.EventuallyExpectCreatedNodeClaimCount("==", 1)
env.EventuallyExpectCreatedNodeCount(">=", 1)
env.EventuallyExpectBoundPodCount(selector, numPods)
env.EventuallyExpectHealthyPodCountWithTimeout(forceDetachTimeout, selector, numPods)
})
It("should not block node deletion if stateful workload cannot be drained", func() {
// Make pod un-drain-able by tolerating disruption taint.
statefulSet.Spec.Template.Spec.Tolerations = []corev1.Toleration{{
Key: "karpenter.sh/disruption",
Operator: corev1.TolerationOpEqual,
Value: "disrupting",
Effect: corev1.TaintEffectNoExecute,
}}

env.ExpectCreated(nodeClass, nodePool, storageClass, statefulSet)
nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0]
node := env.EventuallyExpectCreatedNodeCount("==", 1)[0]
env.EventuallyExpectHealthyPodCount(selector, numPods)

// Delete original nodeClaim to get the original node deleted
env.ExpectDeleted(nodeClaim)

// Eventually the node will be tainted, which means its actively being disrupted
Eventually(func(g Gomega) {
g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed())
_, ok := lo.Find(node.Spec.Taints, func(t corev1.Taint) bool {
return karpv1.IsDisruptingTaint(t)
})
g.Expect(ok).To(BeTrue())
}).Should(Succeed())

// After the deletion timestamp is set and all pods are drained
// the node should be gone regardless of orphaned volume attachment objects.
env.EventuallyExpectNotFound(nodeClaim, node)
})
})

var _ = Describe("Ephemeral Storage", func() {
It("should run a pod with instance-store ephemeral storage that exceeds EBS root block device mappings", func() {
nodeClass.Spec.InstanceStorePolicy = lo.ToPtr(v1.InstanceStorePolicyRAID0)
Expand Down

0 comments on commit 807b441

Please sign in to comment.