Skip to content

Commit

Permalink
PWX-37335: honor operator's unschedulable annotation (#1778) (#1787)
Browse files Browse the repository at this point in the history
If the operator has added "unschedulable" annotation on a node, exclude that node
from the filter response.

Signed-off-by: Neelesh Thakur <[email protected]>
  • Loading branch information
pureneelesh authored May 29, 2024
1 parent 8896b61 commit ea4524e
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 4 deletions.
33 changes: 30 additions & 3 deletions pkg/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const (
skipScoringLabel = "stork.libopenstorage.org/skipSchedulerScoring"
// annotation to disable hyperconvergence for a pod
disableHyperconvergenceAnnotation = "stork.libopenstorage.org/disableHyperconvergence"
// annotation used by the operator to mark node as unschedulable
unschedulableAnnotation = "operator.libopenstorage.org/unschedulable"
)

var (
Expand Down Expand Up @@ -234,18 +236,27 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
}
// Do driver check even if we only have pending WaitForFirstConsumer volumes
} else if len(driverVolumes) > 0 || len(WFFCVolumes) > 0 {
schedulableNodes := []v1.Node{}
for _, node := range args.Nodes.Items {
if nodeMarkedUnschedulable(&node) {
storklog.PodLog(pod).Debugf("Filtering out node %s because of annotation %s",
node.Name, unschedulableAnnotation)
continue
}
schedulableNodes = append(schedulableNodes, node)
}
driverNodes, err := e.Driver.GetNodes()
if err != nil {
storklog.PodLog(pod).Errorf("Error getting list of driver nodes, returning all nodes, err: %v", err)
} else {
for _, volumeInfo := range driverVolumes {
// Pod is using a volume that is labeled for Windows
// This Pod needs to run only on Windows node
// Stork will return all nodes in the filter request
// Stork will return all schedulable nodes in the filter request
if volumeInfo.WindowsVolume {
e.encodeFilterResponse(encoder,
pod,
args.Nodes.Items)
schedulableNodes)
return
}
onlineNodeFound := false
Expand Down Expand Up @@ -302,7 +313,7 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
}
}

for _, node := range args.Nodes.Items {
for _, node := range schedulableNodes {
for _, driverNode := range driverNodes {
storklog.PodLog(pod).Debugf("nodeInfo: %v", driverNode)
if (driverNode.Status == volume.NodeOnline || driverNode.Status == volume.NodeStorageDown) &&
Expand Down Expand Up @@ -1125,3 +1136,19 @@ func (e *Extender) updateVirtLauncherPodPrioritizeScores(
storklog.PodLog(pod).Errorf("Failed to encode response: %v", err)
}
}

func nodeMarkedUnschedulable(node *v1.Node) bool {
return getBoolVal(node.Annotations, unschedulableAnnotation, false)
}

func getBoolVal(m map[string]string, key string, defaultVal bool) bool {
value, exists := m[key]
if !exists {
return defaultVal
}
boolval, err := strconv.ParseBool(value)
if err != nil {
return defaultVal
}
return boolval
}
111 changes: 110 additions & 1 deletion pkg/extender/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ func TestExtender(t *testing.T) {
t.Run("kubevirtPodScheduling", kubevirtPodScheduling)
t.Run("kubevirtPodSchedulingAttachedOnMismatch", kubevirtPodSchedulingAttachedOnMismatch)
t.Run("kubevirtPodSchedulingNonHyperconvergence", kubevirtPodSchedulingNonHyperconvergence)
t.Run("nodeMarkedUnschedulableTest", nodeMarkedUnschedulableTest)
t.Run("nodeMarkedUnschedulableWFFCVolTest", nodeMarkedUnschedulableWFFCVolTest)
t.Run("teardown", teardown)
}

Expand Down Expand Up @@ -610,7 +612,7 @@ func noDriverVolumeTest(t *testing.T) {
prioritizeResponse)
}

// Create a pod with a PVC which uses the mocked WaitForFirstConusmer storage class
// Create a pod with a PVC which uses the mocked WaitForFirstConsumer storage class
// The filter response should return all the input nodes
// The prioritize response should return all nodes with equal priority
func WFFCVolumeTest(t *testing.T) {
Expand Down Expand Up @@ -2268,6 +2270,113 @@ func multiVolumeWithStorageDownNodesAntiHyperConvergenceTest(t *testing.T) {
prioritizeResponse)
}

// Mark one of the nodes as unschedulable using the operator annotation.
// Verify that the unschedulable node is not returned in the filter response.
func nodeMarkedUnschedulableTest(t *testing.T) {
nodes := &v1.NodeList{}
nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "", ""))
nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack2", "", ""))
nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack3", "", ""))
nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack1", "", ""))
nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "", ""))

// nodes at index 1 is marked unschedulable by the operator
markNodeUnschedulable(&nodes.Items[1])

if err := driver.CreateCluster(5, nodes); err != nil {
t.Fatalf("Error creating cluster: %v", err)
}

pod := newPod("unschedTestPod", defaultNamespace, map[string]bool{"unschedVol": false})

// Normal volume
vol := v1.Volume{}
pvc := &v1.PersistentVolumeClaim{}
pvc.Name = "unschedPVC"
pvc.Spec.VolumeName = "unschedVol"
mockSC := driver.GetStorageClassName()
pvc.Spec.StorageClassName = &mockSC
pvcSpec := &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
}
_, err := core.Instance().CreatePersistentVolumeClaim(pvc)
require.NoError(t, err)
vol.PersistentVolumeClaim = pvcSpec
pod.Spec.Volumes = append(pod.Spec.Volumes, vol)
driver.AddPVC(pvc)
provNodes := []int{1}
if err := driver.ProvisionVolume("unschedVol", provNodes, 1, nil, false, false, ""); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

filterResponse, err := sendFilterRequest(pod, nodes)
if err != nil {
t.Fatalf("Error sending filter request: %v", err)
}
verifyFilterResponse(t, nodes, []int{0, 2, 3, 4}, filterResponse)
}

// Create a pod with a PVC which uses the mocked WaitForFirstConsumer storage class.
// Mark some of the nodes as unschedulable using the operator annotation.
// The filter response should not return the unschedulable nodes.
func nodeMarkedUnschedulableWFFCVolTest(t *testing.T) {
// Reset the event recorder
recorder := record.NewFakeRecorder(100)
extender.Recorder = recorder

pod := newPod("unschedWFFCVolTestPod", defaultNamespace, nil)
nodes := &v1.NodeList{}
nodes.Items = append(nodes.Items, *newNode("node1", "node1", "192.168.0.1", "rack1", "a", "us-east-1"))
nodes.Items = append(nodes.Items, *newNode("node2", "node2", "192.168.0.2", "rack1", "a", "us-east-1"))
nodes.Items = append(nodes.Items, *newNode("node3", "node3", "192.168.0.3", "rack1", "a", "us-east-1"))
nodes.Items = append(nodes.Items, *newNode("node4", "node4", "192.168.0.4", "rack1", "a", "us-east-1"))
nodes.Items = append(nodes.Items, *newNode("node5", "node5", "192.168.0.5", "rack2", "a", "us-east-1"))

// nodes at indices 2 and 3 are marked unschedulable by the operator
for i := 2; i < 4; i++ {
markNodeUnschedulable(&nodes.Items[i])
}

if err := driver.CreateCluster(5, nodes); err != nil {
t.Fatalf("Error creating cluster: %v", err)
}

podVolume := v1.Volume{}
pvcClaim := &v1.PersistentVolumeClaim{}
pvcClaim.Name = "unschedWFFCPVC"
pvcClaim.Spec.VolumeName = "unschedWFFCVol"
mockSC := mock.MockStorageClassNameWFFC
pvcClaim.Spec.StorageClassName = &mockSC
pvcSpec := &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcClaim.Name,
}
_, err := core.Instance().CreatePersistentVolumeClaim(pvcClaim)
require.NoError(t, err)
podVolume.PersistentVolumeClaim = pvcSpec
pod.Spec.Volumes = append(pod.Spec.Volumes, podVolume)
driver.AddPVC(pvcClaim)
provNodes := []int{}
if err := driver.ProvisionVolume("unschedWFFCVol", provNodes, 1, nil, false, false, ""); err != nil {
t.Fatalf("Error provisioning volume: %v", err)
}

filterResponse, err := sendFilterRequest(pod, nodes)
if err != nil {
t.Fatalf("Error sending filter request: %v", err)
}
verifyFilterResponse(t, nodes, []int{0, 1, 4}, filterResponse)

// No events should be raised
require.Len(t, recorder.Events, 0)
}

func markNodeUnschedulable(node *v1.Node) {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[unschedulableAnnotation] = "true"
}

// Verify disableHyperconvergenceAnnotation is honored
func disableHyperConvergenceTest(t *testing.T) {
nodes := &v1.NodeList{}
Expand Down

0 comments on commit ea4524e

Please sign in to comment.