From ea4524e6172c42a3812ca38729b03bf235cb8b42 Mon Sep 17 00:00:00 2001 From: Neelesh Thakur Date: Wed, 29 May 2024 06:26:45 -0700 Subject: [PATCH] PWX-37335: honor operator's unschedulable annotation (#1778) (#1787) If the operator has added "unschedulable" annotation on a node, exclude that node from the filter response. Signed-off-by: Neelesh Thakur --- pkg/extender/extender.go | 33 +++++++++- pkg/extender/extender_test.go | 111 +++++++++++++++++++++++++++++++++- 2 files changed, 140 insertions(+), 4 deletions(-) diff --git a/pkg/extender/extender.go b/pkg/extender/extender.go index 7147b36b2a..6a9021251d 100644 --- a/pkg/extender/extender.go +++ b/pkg/extender/extender.go @@ -63,6 +63,8 @@ const ( skipScoringLabel = "stork.libopenstorage.org/skipSchedulerScoring" // annotation to disable hyperconvergence for a pod disableHyperconvergenceAnnotation = "stork.libopenstorage.org/disableHyperconvergence" + // annotation used by the operator to mark node as unschedulable + unschedulableAnnotation = "operator.libopenstorage.org/unschedulable" ) var ( @@ -234,6 +236,15 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request } // Do driver check even if we only have pending WaitForFirstConsumer volumes } else if len(driverVolumes) > 0 || len(WFFCVolumes) > 0 { + schedulableNodes := []v1.Node{} + for _, node := range args.Nodes.Items { + if nodeMarkedUnschedulable(&node) { + storklog.PodLog(pod).Debugf("Filtering out node %s because of annotation %s", + node.Name, unschedulableAnnotation) + continue + } + schedulableNodes = append(schedulableNodes, node) + } driverNodes, err := e.Driver.GetNodes() if err != nil { storklog.PodLog(pod).Errorf("Error getting list of driver nodes, returning all nodes, err: %v", err) @@ -241,11 +252,11 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request for _, volumeInfo := range driverVolumes { // Pod is using a volume that is labeled for Windows // This Pod needs to run only on Windows node - // Stork will return all nodes in the filter request + // Stork will return all schedulable nodes in the filter request if volumeInfo.WindowsVolume { e.encodeFilterResponse(encoder, pod, - args.Nodes.Items) + schedulableNodes) return } onlineNodeFound := false @@ -302,7 +313,7 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request } } - for _, node := range args.Nodes.Items { + for _, node := range schedulableNodes { for _, driverNode := range driverNodes { storklog.PodLog(pod).Debugf("nodeInfo: %v", driverNode) if (driverNode.Status == volume.NodeOnline || driverNode.Status == volume.NodeStorageDown) && @@ -1125,3 +1136,19 @@ func (e *Extender) updateVirtLauncherPodPrioritizeScores( storklog.PodLog(pod).Errorf("Failed to encode response: %v", err) } } + +func nodeMarkedUnschedulable(node *v1.Node) bool { + return getBoolVal(node.Annotations, unschedulableAnnotation, false) +} + +func getBoolVal(m map[string]string, key string, defaultVal bool) bool { + value, exists := m[key] + if !exists { + return defaultVal + } + boolval, err := strconv.ParseBool(value) + if err != nil { + return defaultVal + } + return boolval +} diff --git a/pkg/extender/extender_test.go b/pkg/extender/extender_test.go index d1aad3c180..6b7bc3e096 100644 --- a/pkg/extender/extender_test.go +++ b/pkg/extender/extender_test.go @@ -370,6 +370,8 @@ func TestExtender(t *testing.T) { t.Run("kubevirtPodScheduling", kubevirtPodScheduling) t.Run("kubevirtPodSchedulingAttachedOnMismatch", kubevirtPodSchedulingAttachedOnMismatch) t.Run("kubevirtPodSchedulingNonHyperconvergence", kubevirtPodSchedulingNonHyperconvergence) + t.Run("nodeMarkedUnschedulableTest", nodeMarkedUnschedulableTest) + t.Run("nodeMarkedUnschedulableWFFCVolTest", nodeMarkedUnschedulableWFFCVolTest) t.Run("teardown", teardown) } @@ -610,7 +612,7 @@ func noDriverVolumeTest(t *testing.T) { prioritizeResponse) } -// Create a pod with a PVC which uses the mocked WaitForFirstConusmer storage class +// Create a pod with a PVC which uses the mocked WaitForFirstConsumer storage class // The filter response should return all the input nodes // The prioritize response should return all nodes with equal priority func WFFCVolumeTest(t *testing.T) { @@ -2268,6 +2270,113 @@ func multiVolumeWithStorageDownNodesAntiHyperConvergenceTest(t *testing.T) { prioritizeResponse) } +// Mark one of the nodes as unschedulable using the operator annotation. +// Verify that the unschedulable node is not returned in the filter response. +func nodeMarkedUnschedulableTest(t *testing.T) { + nodes := &v1.NodeList{} + nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "", "")) + nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack2", "", "")) + nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack3", "", "")) + nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack1", "", "")) + nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "", "")) + + // nodes at index 1 is marked unschedulable by the operator + markNodeUnschedulable(&nodes.Items[1]) + + if err := driver.CreateCluster(5, nodes); err != nil { + t.Fatalf("Error creating cluster: %v", err) + } + + pod := newPod("unschedTestPod", defaultNamespace, map[string]bool{"unschedVol": false}) + + // Normal volume + vol := v1.Volume{} + pvc := &v1.PersistentVolumeClaim{} + pvc.Name = "unschedPVC" + pvc.Spec.VolumeName = "unschedVol" + mockSC := driver.GetStorageClassName() + pvc.Spec.StorageClassName = &mockSC + pvcSpec := &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + } + _, err := core.Instance().CreatePersistentVolumeClaim(pvc) + require.NoError(t, err) + vol.PersistentVolumeClaim = pvcSpec + pod.Spec.Volumes = append(pod.Spec.Volumes, vol) + driver.AddPVC(pvc) + provNodes := []int{1} + if err := driver.ProvisionVolume("unschedVol", provNodes, 1, nil, false, false, ""); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + filterResponse, err := sendFilterRequest(pod, nodes) + if err != nil { + t.Fatalf("Error sending filter request: %v", err) + } + verifyFilterResponse(t, nodes, []int{0, 2, 3, 4}, filterResponse) +} + +// Create a pod with a PVC which uses the mocked WaitForFirstConsumer storage class. +// Mark some of the nodes as unschedulable using the operator annotation. +// The filter response should not return the unschedulable nodes. +func nodeMarkedUnschedulableWFFCVolTest(t *testing.T) { + // Reset the event recorder + recorder := record.NewFakeRecorder(100) + extender.Recorder = recorder + + pod := newPod("unschedWFFCVolTestPod", defaultNamespace, nil) + nodes := &v1.NodeList{} + nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "us-east-1")) + nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "us-east-1")) + nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "us-east-1")) + nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack1", "a", "us-east-1")) + nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "a", "us-east-1")) + + // nodes at indices 2 and 3 are marked unschedulable by the operator + for i := 2; i < 4; i++ { + markNodeUnschedulable(&nodes.Items[i]) + } + + if err := driver.CreateCluster(5, nodes); err != nil { + t.Fatalf("Error creating cluster: %v", err) + } + + podVolume := v1.Volume{} + pvcClaim := &v1.PersistentVolumeClaim{} + pvcClaim.Name = "unschedWFFCPVC" + pvcClaim.Spec.VolumeName = "unschedWFFCVol" + mockSC := mock.MockStorageClassNameWFFC + pvcClaim.Spec.StorageClassName = &mockSC + pvcSpec := &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcClaim.Name, + } + _, err := core.Instance().CreatePersistentVolumeClaim(pvcClaim) + require.NoError(t, err) + podVolume.PersistentVolumeClaim = pvcSpec + pod.Spec.Volumes = append(pod.Spec.Volumes, podVolume) + driver.AddPVC(pvcClaim) + provNodes := []int{} + if err := driver.ProvisionVolume("unschedWFFCVol", provNodes, 1, nil, false, false, ""); err != nil { + t.Fatalf("Error provisioning volume: %v", err) + } + + filterResponse, err := sendFilterRequest(pod, nodes) + if err != nil { + t.Fatalf("Error sending filter request: %v", err) + } + verifyFilterResponse(t, nodes, []int{0, 1, 4}, filterResponse) + + // No events should be raised + require.Len(t, recorder.Events, 0) +} + +func markNodeUnschedulable(node *v1.Node) { + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[unschedulableAnnotation] = "true" +} + // Verify disableHyperconvergenceAnnotation is honored func disableHyperConvergenceTest(t *testing.T) { nodes := &v1.NodeList{}