Skip to content

Commit

Permalink
controllers: update queue cache if podGroup is not found
Browse files Browse the repository at this point in the history
Signed-off-by: sceneryback <[email protected]>
  • Loading branch information
sceneryback committed Sep 26, 2024
1 parent 61ff6a7 commit f2e9781
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 22 deletions.
12 changes: 10 additions & 2 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand All @@ -42,10 +43,17 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
// Ignore error here, tt can not occur.
ns, name, _ := cache.SplitMetaNamespaceKey(pgKey)

// TODO: check NotFound error and sync local cache.
pg, err := c.pgLister.PodGroups(ns).Get(name)
if err != nil {
return err
if !errors.IsNotFound(err) {
return err
}

klog.V(4).Infof("The podGroup %s is not found, skip it and continue to sync cache", pgKey)
c.pgMutex.Lock()
delete(c.podGroups[queue.Name], pgKey)
c.pgMutex.Unlock()
continue
}

switch pg.Status.Phase {
Expand Down
126 changes: 106 additions & 20 deletions pkg/controllers/queue/queue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package queue
import (
"context"
"fmt"
"reflect"
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -242,55 +244,139 @@ func TestSyncQueue(t *testing.T) {
namespace := "c1"

testCases := []struct {
Name string
podGroup *schedulingv1beta1.PodGroup
queue *schedulingv1beta1.Queue
ExpectValue int32
Name string
pgsInCache []*schedulingv1beta1.PodGroup
pgsInInformer []*schedulingv1beta1.PodGroup
queue *schedulingv1beta1.Queue
ExpectStatus schedulingv1beta1.QueueStatus
}{
{
Name: "syncQueue",
podGroup: &schedulingv1beta1.PodGroup{
pgsInCache: []*schedulingv1beta1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: namespace,
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c1",
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
},
},
pgsInInformer: []*schedulingv1beta1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: namespace,
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c1",
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
},
},
queue: &schedulingv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: namespace,
Name: "c1",
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c1",
Spec: schedulingv1beta1.QueueSpec{
Weight: 1,
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
ExpectStatus: schedulingv1beta1.QueueStatus{
Pending: 1,
Reservation: schedulingv1beta1.Reservation{},
Allocated: v1.ResourceList{},
},
},
{
Name: "syncQueueHandlingNotFoundPg",
pgsInCache: []*schedulingv1beta1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: namespace,
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c2",
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg2",
Namespace: namespace,
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c2",
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
},
},
pgsInInformer: []*schedulingv1beta1.PodGroup{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pg2",
Namespace: namespace,
},
Spec: schedulingv1beta1.PodGroupSpec{
Queue: "c2",
},
Status: schedulingv1beta1.PodGroupStatus{
Phase: schedulingv1beta1.PodGroupPending,
},
},
},
queue: &schedulingv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "c1",
Name: "c2",
},
Spec: schedulingv1beta1.QueueSpec{
Weight: 1,
},
},
ExpectValue: 1,
ExpectStatus: schedulingv1beta1.QueueStatus{
Pending: 1,
Reservation: schedulingv1beta1.Reservation{},
Allocated: v1.ResourceList{},
},
},
}

for i, testcase := range testCases {
c := newFakeController()

key, _ := cache.MetaNamespaceKeyFunc(testcase.podGroup)
c.podGroups[testcase.podGroup.Spec.Queue] = make(map[string]struct{})
c.podGroups[testcase.podGroup.Spec.Queue][key] = struct{}{}
for j := range testcase.pgsInCache {
key, _ := cache.MetaNamespaceKeyFunc(testcase.pgsInCache[j])
if _, ok := c.podGroups[testcase.pgsInCache[j].Spec.Queue]; !ok {
c.podGroups[testcase.pgsInCache[j].Spec.Queue] = make(map[string]struct{})
}
c.podGroups[testcase.pgsInCache[j].Spec.Queue][key] = struct{}{}
}

for j := range testcase.pgsInInformer {
c.pgInformer.Informer().GetIndexer().Add(testcase.pgsInInformer[j])
}

c.pgInformer.Informer().GetIndexer().Add(testcase.podGroup)
c.queueInformer.Informer().GetIndexer().Add(testcase.queue)
c.vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), testcase.queue, metav1.CreateOptions{})

err := c.syncQueue(testcase.queue, nil)

item, _ := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), testcase.queue.Name, metav1.GetOptions{})
if err != nil && testcase.ExpectValue != item.Status.Pending {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, c.queue.Len())
if err != nil && !reflect.DeepEqual(testcase.ExpectStatus, item.Status) {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectStatus, item.Status)
}
}

}

func TestProcessNextWorkItem(t *testing.T) {
Expand Down

0 comments on commit f2e9781

Please sign in to comment.