diff --git a/test/pkg/environment/common/setup.go b/test/pkg/environment/common/setup.go index acc4a2610dbf..badee033ead6 100644 --- a/test/pkg/environment/common/setup.go +++ b/test/pkg/environment/common/setup.go @@ -48,6 +48,7 @@ var ( CleanableObjects = []client.Object{ &corev1.Pod{}, &appsv1.Deployment{}, + &appsv1.StatefulSet{}, &appsv1.DaemonSet{}, &policyv1.PodDisruptionBudget{}, &corev1.PersistentVolumeClaim{}, diff --git a/test/suites/storage/suite_test.go b/test/suites/storage/suite_test.go index ccdb15a2d96d..7fc9d91973f0 100644 --- a/test/suites/storage/suite_test.go +++ b/test/suites/storage/suite_test.go @@ -18,6 +18,9 @@ import ( "fmt" "strings" "testing" + "time" + + "k8s.io/apimachinery/pkg/labels" awssdk "github.com/aws/aws-sdk-go/aws" karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -236,6 +239,142 @@ var _ = Describe("Persistent Volumes", func() { }) }) +var _ = Describe("Stateful workloads", func() { + var numPods int + var persistentVolumeClaim *corev1.PersistentVolumeClaim + var storageClass *storagev1.StorageClass + var statefulSet *appsv1.StatefulSet + var selector labels.Selector + BeforeEach(func() { + // Ensure that the EBS driver is installed, or we can't run the test. + var ds appsv1.DaemonSet + if err := env.Client.Get(env.Context, client.ObjectKey{ + Namespace: "kube-system", + Name: "ebs-csi-node", + }, &ds); err != nil { + if errors.IsNotFound(err) { + Skip(fmt.Sprintf("skipping StatefulSet test due to missing EBS driver %s", err)) + } else { + Fail(fmt.Sprintf("determining EBS driver status, %s", err)) + } + } + + numPods = 1 + subnets := env.GetSubnets(map[string]string{"karpenter.sh/discovery": env.ClusterName}) + shuffledAZs := lo.Shuffle(lo.Keys(subnets)) + + storageClass = test.StorageClass(test.StorageClassOptions{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-storage-class", + }, + Provisioner: awssdk.String("ebs.csi.aws.com"), + VolumeBindingMode: lo.ToPtr(storagev1.VolumeBindingWaitForFirstConsumer), + }) + + storageClass.AllowedTopologies = []corev1.TopologySelectorTerm{{ + MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{{ + Key: "topology.ebs.csi.aws.com/zone", + Values: []string{shuffledAZs[0]}, + }}, + }} + + persistentVolumeClaim = test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{ + StorageClassName: &storageClass.Name, + }) + statefulSet = test.StatefulSet(test.StatefulSetOptions{ + Replicas: int32(numPods), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "my-app", + }}, + }, + }) + // Ensure same volume is used across replica restarts. + statefulSet.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{*persistentVolumeClaim} + // Ensure volume mounts to pod, so that we test that we avoid the 6+ minute force detach delay. + vm := corev1.VolumeMount{ + Name: persistentVolumeClaim.Name, + MountPath: "/usr/share", + } + statefulSet.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{vm} + selector = labels.SelectorFromSet(statefulSet.Spec.Selector.MatchLabels) + }) + + It("should run on a new node without 6+ minute delays when disrupted", func() { + // EBS volume detach + attach should usually take ~20s. Extra time is to prevent flakes due to EC2 APIs. + forceDetachTimeout := 2 * time.Minute + + env.ExpectCreated(nodeClass, nodePool, storageClass, statefulSet) + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + node := env.EventuallyExpectCreatedNodeCount("==", 1)[0] + env.EventuallyExpectHealthyPodCount(selector, numPods) + + env.Monitor.Reset() // Reset the monitor so that we can expect a single node to be spun up after expiration + + // Delete original nodeClaim to get the original node deleted + env.ExpectDeleted(nodeClaim) + + // Eventually the node will be tainted, which means its actively being disrupted + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed()) + _, ok := lo.Find(node.Spec.Taints, func(t corev1.Taint) bool { + return karpv1.IsDisruptingTaint(t) + }) + g.Expect(ok).To(BeTrue()) + }).Should(Succeed()) + + env.EventuallyExpectCreatedNodeCount(">=", 1) + + // Set the limit to 0 to make sure we don't continue to create nodeClaims. + // This is CRITICAL since it prevents leaking node resources into subsequent tests + nodePool.Spec.Limits = karpv1.Limits{ + corev1.ResourceCPU: resource.MustParse("0"), + } + env.ExpectUpdated(nodePool) + + // After the deletion timestamp is set and all pods are drained the node should be gone. + env.EventuallyExpectNotFound(nodeClaim, node) + + // We expect the stateful workload to become healthy on new node before the 6-minute force detach timeout. + // We start timer after pod binds to node because volume attachment happens during ContainerCreating + env.EventuallyExpectCreatedNodeClaimCount("==", 1) + env.EventuallyExpectCreatedNodeCount(">=", 1) + env.EventuallyExpectBoundPodCount(selector, numPods) + env.EventuallyExpectHealthyPodCountWithTimeout(forceDetachTimeout, selector, numPods) + }) + It("should not block node deletion if stateful workload cannot be drained", func() { + // Make pod un-drain-able by tolerating disruption taint. + statefulSet.Spec.Template.Spec.Tolerations = []corev1.Toleration{{ + Key: "karpenter.sh/disruption", + Operator: corev1.TolerationOpEqual, + Value: "disrupting", + Effect: corev1.TaintEffectNoExecute, + }} + + env.ExpectCreated(nodeClass, nodePool, storageClass, statefulSet) + nodeClaim := env.EventuallyExpectCreatedNodeClaimCount("==", 1)[0] + node := env.EventuallyExpectCreatedNodeCount("==", 1)[0] + env.EventuallyExpectHealthyPodCount(selector, numPods) + + // Delete original nodeClaim to get the original node deleted + env.ExpectDeleted(nodeClaim) + + // Eventually the node will be tainted, which means its actively being disrupted + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).Should(Succeed()) + _, ok := lo.Find(node.Spec.Taints, func(t corev1.Taint) bool { + return karpv1.IsDisruptingTaint(t) + }) + g.Expect(ok).To(BeTrue()) + }).Should(Succeed()) + + // After the deletion timestamp is set and all pods are drained + // the node should be gone regardless of orphaned volume attachment objects. + env.EventuallyExpectNotFound(nodeClaim, node) + }) +}) + var _ = Describe("Ephemeral Storage", func() { It("should run a pod with instance-store ephemeral storage that exceeds EBS root block device mappings", func() { nodeClass.Spec.InstanceStorePolicy = lo.ToPtr(v1.InstanceStorePolicyRAID0)