From 32dbd062affda152557bd6a15dc43a69fc9e4841 Mon Sep 17 00:00:00 2001 From: Rui-Gan Date: Mon, 23 Sep 2024 13:11:40 +0800 Subject: [PATCH] feat: add hierarchical queues for capacity plugin Signed-off-by: Rui-Gan --- pkg/scheduler/actions/preempt/preempt.go | 2 +- pkg/scheduler/actions/reclaim/reclaim.go | 2 +- pkg/scheduler/api/types.go | 3 + pkg/scheduler/cache/cache.go | 5 + pkg/scheduler/cache/interface.go | 4 + pkg/scheduler/framework/session.go | 96 +-- pkg/scheduler/framework/session_plugins.go | 64 +- pkg/scheduler/plugins/capacity/capacity.go | 648 +++++++++++++++--- .../plugins/capacity/capacity_test.go | 137 +++- .../mutate/mutate_hierarchical_queue.go | 132 ++++ .../mutate/mutate_hierarchical_queue_test.go | 145 ++++ .../validate/validate_hierarchical_queue.go | 133 ++++ .../validate_hierarchical_queue_test.go | 301 ++++++++ 13 files changed, 1522 insertions(+), 150 deletions(-) create mode 100644 pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue.go create mode 100644 pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue_test.go create mode 100644 pkg/webhooks/admission/queues/validate/validate_hierarchical_queue.go create mode 100644 pkg/webhooks/admission/queues/validate/validate_hierarchical_queue_test.go diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 04b650bceb1..501022d7cb6 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -273,7 +273,7 @@ func (pmpt *Action) preempt( continue } - victimsQueue := ssn.BuildVictimsPriorityQueue(victims) + victimsQueue := ssn.BuildVictimsPriorityQueue(victims, preemptor) // Preempt victims for tasks, pick lowest priority task first. preempted := api.EmptyResource() diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index a350ba0d4b6..c0191a618b1 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -175,7 +175,7 @@ func (ra *Action) Execute(ssn *framework.Session) { continue } - victimsQueue := ssn.BuildVictimsPriorityQueue(victims) + victimsQueue := ssn.BuildVictimsPriorityQueue(victims, task) resreq := task.InitResreq.Clone() reclaimed := api.EmptyResource() diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index da9acc0443b..9ca9eb11389 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -116,6 +116,9 @@ type LessFn func(interface{}, interface{}) bool // CompareFn is the func declaration used by sort or priority queue. type CompareFn func(interface{}, interface{}) int +// CompareFn is the func declaration used by sort or priority victims. +type VictimCompareFn func(interface{}, interface{}, interface{}) int + // ValidateFn is the func declaration used to check object's status. type ValidateFn func(interface{}) bool diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 334d06d90a0..797801c3153 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -941,6 +941,11 @@ func (sc *SchedulerCache) Client() kubernetes.Interface { return sc.kubeClient } +// Client returns the volcano clientSet +func (sc *SchedulerCache) VCClient() vcclient.Interface { + return sc.vcClient +} + // ClientConfig returns the rest config func (sc *SchedulerCache) ClientConfig() *rest.Config { return sc.restConfig diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 94ab18f373a..400eb9f33e7 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + vcclient "volcano.sh/apis/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" ) @@ -74,6 +75,9 @@ type Cache interface { // Client returns the kubernetes clientSet, which can be used by plugins Client() kubernetes.Interface + // VCClient returns the volcano clientSet, which can be used by plugins + VCClient() vcclient.Interface + // ClientConfig returns the rest config ClientConfig() *rest.Config diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 03e40c30fd7..4b53d23df58 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -34,6 +34,7 @@ import ( "volcano.sh/apis/pkg/apis/scheduling" schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + vcclient "volcano.sh/apis/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" @@ -46,6 +47,7 @@ type Session struct { UID types.UID kubeClient kubernetes.Interface + vcClient vcclient.Interface recorder record.EventRecorder cache cache.Cache restConfig *rest.Config @@ -72,22 +74,24 @@ type Session struct { Configurations []conf.Configuration NodeList []*api.NodeInfo - plugins map[string]Plugin - eventHandlers []*EventHandler - jobOrderFns map[string]api.CompareFn - queueOrderFns map[string]api.CompareFn - taskOrderFns map[string]api.CompareFn - clusterOrderFns map[string]api.CompareFn - predicateFns map[string]api.PredicateFn - prePredicateFns map[string]api.PrePredicateFn - bestNodeFns map[string]api.BestNodeFn - nodeOrderFns map[string]api.NodeOrderFn - batchNodeOrderFns map[string]api.BatchNodeOrderFn - nodeMapFns map[string]api.NodeMapFn - nodeReduceFns map[string]api.NodeReduceFn - preemptableFns map[string]api.EvictableFn - reclaimableFns map[string]api.EvictableFn - overusedFns map[string]api.ValidateFn + plugins map[string]Plugin + eventHandlers []*EventHandler + jobOrderFns map[string]api.CompareFn + queueOrderFns map[string]api.CompareFn + taskOrderFns map[string]api.CompareFn + victimJobOrderFns map[string]api.VictimCompareFn + victimTaskOrderFns map[string]api.VictimCompareFn + clusterOrderFns map[string]api.CompareFn + predicateFns map[string]api.PredicateFn + prePredicateFns map[string]api.PrePredicateFn + bestNodeFns map[string]api.BestNodeFn + nodeOrderFns map[string]api.NodeOrderFn + batchNodeOrderFns map[string]api.BatchNodeOrderFn + nodeMapFns map[string]api.NodeMapFn + nodeReduceFns map[string]api.NodeReduceFn + preemptableFns map[string]api.EvictableFn + reclaimableFns map[string]api.EvictableFn + overusedFns map[string]api.ValidateFn // preemptiveFns means whether current queue can reclaim from other queue, // while reclaimableFns means whether current queue's resources can be reclaimed. preemptiveFns map[string]api.ValidateWithCandidateFn @@ -107,6 +111,7 @@ func openSession(cache cache.Cache) *Session { ssn := &Session{ UID: uuid.NewUUID(), kubeClient: cache.Client(), + vcClient: cache.VCClient(), restConfig: cache.ClientConfig(), recorder: cache.EventRecorder(), cache: cache, @@ -121,32 +126,34 @@ func openSession(cache cache.Cache) *Session { RevocableNodes: map[string]*api.NodeInfo{}, Queues: map[api.QueueID]*api.QueueInfo{}, - plugins: map[string]Plugin{}, - jobOrderFns: map[string]api.CompareFn{}, - queueOrderFns: map[string]api.CompareFn{}, - taskOrderFns: map[string]api.CompareFn{}, - clusterOrderFns: map[string]api.CompareFn{}, - predicateFns: map[string]api.PredicateFn{}, - prePredicateFns: map[string]api.PrePredicateFn{}, - bestNodeFns: map[string]api.BestNodeFn{}, - nodeOrderFns: map[string]api.NodeOrderFn{}, - batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, - nodeMapFns: map[string]api.NodeMapFn{}, - nodeReduceFns: map[string]api.NodeReduceFn{}, - preemptableFns: map[string]api.EvictableFn{}, - reclaimableFns: map[string]api.EvictableFn{}, - overusedFns: map[string]api.ValidateFn{}, - preemptiveFns: map[string]api.ValidateWithCandidateFn{}, - allocatableFns: map[string]api.AllocatableFn{}, - jobReadyFns: map[string]api.ValidateFn{}, - jobPipelinedFns: map[string]api.VoteFn{}, - jobValidFns: map[string]api.ValidateExFn{}, - jobEnqueueableFns: map[string]api.VoteFn{}, - jobEnqueuedFns: map[string]api.JobEnqueuedFn{}, - targetJobFns: map[string]api.TargetJobFn{}, - reservedNodesFns: map[string]api.ReservedNodesFn{}, - victimTasksFns: map[string][]api.VictimTasksFn{}, - jobStarvingFns: map[string]api.ValidateFn{}, + plugins: map[string]Plugin{}, + jobOrderFns: map[string]api.CompareFn{}, + queueOrderFns: map[string]api.CompareFn{}, + taskOrderFns: map[string]api.CompareFn{}, + victimJobOrderFns: map[string]api.VictimCompareFn{}, + victimTaskOrderFns: map[string]api.VictimCompareFn{}, + clusterOrderFns: map[string]api.CompareFn{}, + predicateFns: map[string]api.PredicateFn{}, + prePredicateFns: map[string]api.PrePredicateFn{}, + bestNodeFns: map[string]api.BestNodeFn{}, + nodeOrderFns: map[string]api.NodeOrderFn{}, + batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, + nodeMapFns: map[string]api.NodeMapFn{}, + nodeReduceFns: map[string]api.NodeReduceFn{}, + preemptableFns: map[string]api.EvictableFn{}, + reclaimableFns: map[string]api.EvictableFn{}, + overusedFns: map[string]api.ValidateFn{}, + preemptiveFns: map[string]api.ValidateWithCandidateFn{}, + allocatableFns: map[string]api.AllocatableFn{}, + jobReadyFns: map[string]api.ValidateFn{}, + jobPipelinedFns: map[string]api.VoteFn{}, + jobValidFns: map[string]api.ValidateExFn{}, + jobEnqueueableFns: map[string]api.VoteFn{}, + jobEnqueuedFns: map[string]api.JobEnqueuedFn{}, + targetJobFns: map[string]api.TargetJobFn{}, + reservedNodesFns: map[string]api.ReservedNodesFn{}, + victimTasksFns: map[string][]api.VictimTasksFn{}, + jobStarvingFns: map[string]api.ValidateFn{}, } snapshot := cache.Snapshot() @@ -590,6 +597,11 @@ func (ssn Session) KubeClient() kubernetes.Interface { return ssn.kubeClient } +// VCClient returns the volcano client +func (ssn Session) VCClient() vcclient.Interface { + return ssn.vcClient +} + // ClientConfig returns the rest client func (ssn Session) ClientConfig() *rest.Config { return ssn.restConfig diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 36557ec587b..7b4a4b2b035 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -45,6 +45,16 @@ func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { ssn.taskOrderFns[name] = cf } +// AddVictimJobOrderFn add victimjoborder function +func (ssn *Session) AddVictimJobOrderFn(name string, vcf api.VictimCompareFn) { + ssn.victimJobOrderFns[name] = vcf +} + +// AddVictimTaskOrderFn add victimtaskorder function +func (ssn *Session) AddVictimTaskOrderFn(name string, vcf api.VictimCompareFn) { + ssn.victimTaskOrderFns[name] = vcf +} + // AddPreemptableFn add preemptable function func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) { ssn.preemptableFns[name] = cf @@ -545,6 +555,29 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } +// VictimJobOrderFn invoke victimjoborder function of the plugins +func (ssn *Session) VictimJobOrderFn(l, r, preemptor interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + jof, found := ssn.victimJobOrderFns[plugin.Name] + if !found { + continue + } + if j := jof(l, r, preemptor); j != 0 { + return j < 0 + } + } + } + + lv := l.(*api.JobInfo) + rv := r.(*api.JobInfo) + if lv.Queue != rv.Queue { + return !ssn.QueueOrderFn(ssn.Queues[lv.Queue], ssn.Queues[rv.Queue]) + } + + return !ssn.JobOrderFn(l, r) +} + // ClusterOrderFn invoke ClusterOrderFn function of the plugins func (ssn *Session) ClusterOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { @@ -626,6 +659,23 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { return helpers.CompareTask(lv, rv) } +// VictimTaskOrderFn invoke victimtaskorder function of the plugins +func (ssn *Session) VictimTaskOrderFn(l, r, preemptor interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + tof, found := ssn.victimTaskOrderFns[plugin.Name] + if !found { + continue + } + if j := tof(l, r, preemptor); j != 0 { + return j < 0 + } + } + } + + return !ssn.TaskOrderFn(l, r) +} + // PredicateFn invoke predicate function of the plugins func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { for _, tier := range ssn.Tiers { @@ -791,21 +841,19 @@ func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map // BuildVictimsPriorityQueue returns a priority queue with victims sorted by: // if victims has same job id, sorted by !ssn.TaskOrderFn // if victims has different job id, sorted by !ssn.JobOrderFn -func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.PriorityQueue { +func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo, preemptor *api.TaskInfo) *util.PriorityQueue { victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool { lv := l.(*api.TaskInfo) rv := r.(*api.TaskInfo) if lv.Job == rv.Job { - return !ssn.TaskOrderFn(l, r) + return ssn.VictimTaskOrderFn(l, r, preemptor) } - lvJob, lvJobFound := ssn.Jobs[lv.Job] - rvJob, rvJobFound := ssn.Jobs[rv.Job] - if lvJobFound && rvJobFound && lvJob.Queue != rvJob.Queue { - return !ssn.QueueOrderFn(ssn.Queues[lvJob.Queue], ssn.Queues[rvJob.Queue]) - } + preemptorJob := ssn.Jobs[preemptor.Job] + lvJob := ssn.Jobs[lv.Job] + rvJob := ssn.Jobs[rv.Job] - return !ssn.JobOrderFn(lvJob, rvJob) + return ssn.VictimJobOrderFn(lvJob, rvJob, preemptorJob) }) for _, victim := range victims { victimsQueue.Push(victim) diff --git a/pkg/scheduler/plugins/capacity/capacity.go b/pkg/scheduler/plugins/capacity/capacity.go index 3906b2ab5f7..38edc5393bd 100644 --- a/pkg/scheduler/plugins/capacity/capacity.go +++ b/pkg/scheduler/plugins/capacity/capacity.go @@ -17,12 +17,21 @@ limitations under the License. package capacity import ( + "context" + "fmt" "math" + "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "volcano.sh/apis/pkg/apis/scheduling" + vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + vcclient "volcano.sh/apis/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" @@ -31,10 +40,12 @@ import ( ) const ( - PluginName = "capacity" + PluginName = "capacity" + rootQueueID = "root" ) type capacityPlugin struct { + rootQueue string totalResource *api.Resource totalGuarantee *api.Resource @@ -44,9 +55,11 @@ type capacityPlugin struct { } type queueAttr struct { - queueID api.QueueID - name string - share float64 + queueID api.QueueID + name string + share float64 + parents []api.QueueID + children map[api.QueueID]*queueAttr deserved *api.Resource allocated *api.Resource @@ -75,11 +88,180 @@ func (cp *capacityPlugin) Name() string { return PluginName } +// HierarchyEnabled returns if hierarchy is enabled +func (cp *capacityPlugin) HierarchyEnabled(ssn *framework.Session) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if plugin.Name != PluginName { + continue + } + return plugin.EnabledHierarchy != nil && *plugin.EnabledHierarchy + } + } + return false +} + func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { // Prepare scheduling data for this session. cp.totalResource.Add(ssn.TotalResource) klog.V(4).Infof("The total resource is <%v>", cp.totalResource) + + hierarchyEnabled := cp.HierarchyEnabled(ssn) + if hierarchyEnabled { + cp.onSessionOpenWithHierarchy(ssn) + } else { + cp.onSessionOpenWithoutHierarchy(ssn) + } + + ssn.AddReclaimableFn(cp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) { + var victims []*api.TaskInfo + allocations := map[api.QueueID]*api.Resource{} + + for _, reclaimee := range reclaimees { + job := ssn.Jobs[reclaimee.Job] + attr := cp.queueOpts[job.Queue] + + if _, found := allocations[job.Queue]; !found { + allocations[job.Queue] = attr.allocated.Clone() + } + allocated := allocations[job.Queue] + if allocated.LessPartly(reclaimer.Resreq, api.Zero) { + klog.V(3).Infof("Failed to allocate resource for Task <%s/%s> in Queue <%s>, not enough resource.", + reclaimee.Namespace, reclaimee.Name, job.Queue) + continue + } + + exceptReclaimee := allocated.Clone().Sub(reclaimee.Resreq) + // When scalar resource not specified in deserved such as "pods", we should skip it and consider it as infinity, + // so the following first condition will be true and the current queue will not be reclaimed. + if allocated.LessEqual(attr.deserved, api.Infinity) || !attr.guarantee.LessEqual(exceptReclaimee, api.Zero) { + continue + } + allocated.Sub(reclaimee.Resreq) + victims = append(victims, reclaimee) + } + klog.V(4).InfoS("Victims from capacity plugin", "victims", victims, "reclaimer", reclaimer) + return victims, util.Permit + }) + + ssn.AddPreemptiveFn(cp.Name(), func(obj interface{}, candidate interface{}) bool { + queue := obj.(*api.QueueInfo) + task := candidate.(*api.TaskInfo) + attr := cp.queueOpts[queue.UID] + + futureUsed := attr.allocated.Clone().Add(task.Resreq) + overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq) + metrics.UpdateQueueOverused(attr.name, overused) + if overused { + klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>", + queue.Name, attr.deserved, attr.allocated, attr.share) + } + + // PreemptiveFn is the opposite of OverusedFn in proportion plugin cause as long as there is a one-dimensional + // resource whose deserved is greater than allocated, current task can reclaim by preempt others. + return !overused + }) + + ssn.AddAllocatableFn(cp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool { + if hierarchyEnabled && !cp.isLeafQueue(queue.UID) { + return false + } + attr := cp.queueOpts[queue.UID] + + futureUsed := attr.allocated.Clone().Add(candidate.Resreq) + allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq) + if !allocatable { + klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v>: resource request <%v>", + queue.Name, attr.realCapability, attr.allocated, candidate.Name, candidate.Resreq) + } + + return allocatable + }) + + ssn.AddJobEnqueueableFn(cp.Name(), func(obj interface{}) int { + job := obj.(*api.JobInfo) + queueID := job.Queue + if hierarchyEnabled && !cp.isLeafQueue(queueID) { + return util.Reject + } + + attr := cp.queueOpts[queueID] + queue := ssn.Queues[queueID] + // If no capability is set, always enqueue the job. + if attr.realCapability == nil { + klog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.", + queue.Name, job.Namespace, job.Name) + return util.Permit + } + + if job.PodGroup.Spec.MinResources == nil { + klog.V(4).Infof("job %s MinResources is null.", job.Name) + return util.Permit + } + minReq := job.GetMinResources() + + klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>", + job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String()) + // The queue resource quota limit has not reached + r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) + + inqueue := r.LessEqualWithDimension(attr.realCapability, minReq) + klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue) + if inqueue { + attr.inqueue.Add(job.DeductSchGatedResources(minReq)) + return util.Permit + } + ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient") + return util.Reject + }) + + // Register event handlers. + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + attr := cp.queueOpts[job.Queue] + attr.allocated.Add(event.Task.Resreq) + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + + cp.updateShare(attr) + if hierarchyEnabled { + for _, parentID := range attr.parents { + parentAttr := cp.queueOpts[parentID] + parentAttr.allocated.Add(event.Task.Resreq) + } + } + + klog.V(4).Infof("Capacity AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + }, + DeallocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + attr := cp.queueOpts[job.Queue] + attr.allocated.Sub(event.Task.Resreq) + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + + cp.updateShare(attr) + if hierarchyEnabled { + for _, parentID := range attr.parents { + parentAttr := cp.queueOpts[parentID] + parentAttr.allocated.Sub(event.Task.Resreq) + } + } + + klog.V(4).Infof("Capacity EvictFunc: task <%v/%v>, resreq <%v>, share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + }, + }) +} + +func (cp *capacityPlugin) OnSessionClose(ssn *framework.Session) { + cp.totalResource = nil + cp.totalGuarantee = nil + cp.queueOpts = nil +} + +func (cp *capacityPlugin) onSessionOpenWithoutHierarchy(ssn *framework.Session) { for _, queue := range ssn.Queues { if len(queue.Queue.Spec.Guarantee.Resource) == 0 { continue @@ -220,133 +402,387 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) { return 1 }) +} - ssn.AddReclaimableFn(cp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) ([]*api.TaskInfo, int) { - var victims []*api.TaskInfo - allocations := map[api.QueueID]*api.Resource{} +func (cp *capacityPlugin) onSessionOpenWithHierarchy(ssn *framework.Session) { + // Set the root queue + cp.rootQueue = rootQueueID + ssn.Queues[api.QueueID(cp.rootQueue)] = &api.QueueInfo{ + UID: api.QueueID(cp.rootQueue), + Name: cp.rootQueue, + Queue: &scheduling.Queue{}, + } - for _, reclaimee := range reclaimees { - job := ssn.Jobs[reclaimee.Job] - attr := cp.queueOpts[job.Queue] + // Initialize queue attributes + for _, queue := range ssn.Queues { + _, found := cp.queueOpts[queue.UID] + if found { + continue + } - if _, found := allocations[job.Queue]; !found { - allocations[job.Queue] = attr.allocated.Clone() + attr := cp.newQueueAttr(queue) + cp.queueOpts[queue.UID] = attr + err := cp.updateParents(queue, ssn) + if err != nil { + klog.Errorf("Failed to update Queue <%s> attributes, error: %v", queue.Name, err) + return + } + } + + for _, job := range ssn.Jobs { + klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) + attr := cp.queueOpts[job.Queue] + if len(attr.children) > 0 { + klog.Errorf("The Queue <%s> of Job <%s/%s> is not leaf queue", attr.name, job.Namespace, job.Name) + return + } + + oldAllocated := attr.allocated.Clone() + oldRequest := attr.request.Clone() + oldInqueue := attr.inqueue.Clone() + oldElastic := attr.elastic.Clone() + + for status, tasks := range job.TaskStatusIndex { + if api.AllocatedStatus(status) { + for _, t := range tasks { + attr.allocated.Add(t.Resreq) + attr.request.Add(t.Resreq) + } + } else if status == api.Pending { + for _, t := range tasks { + attr.request.Add(t.Resreq) + } } - allocated := allocations[job.Queue] - if allocated.LessPartly(reclaimer.Resreq, api.Zero) { - klog.V(3).Infof("Failed to allocate resource for Task <%s/%s> in Queue <%s>, not enough resource.", - reclaimee.Namespace, reclaimee.Name, job.Queue) - continue + } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources())) + } + + // calculate inqueue resource for running jobs + // the judgement 'job.PodGroup.Status.Running >= job.PodGroup.Spec.MinMember' will work on cases such as the following condition: + // Considering a Spark job is completed(driver pod is completed) while the podgroup keeps running, the allocated resource will be reserved again if without the judgement. + if job.PodGroup.Status.Phase == scheduling.PodGroupRunning && + job.PodGroup.Spec.MinResources != nil && + int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember { + inqueued := util.GetInqueueResource(job, job.Allocated) + attr.inqueue.Add(job.DeductSchGatedResources(inqueued)) + } + attr.elastic.Add(job.GetElasticResources()) + + for _, parentID := range attr.parents { + parentAttr := cp.queueOpts[parentID] + parentAttr.allocated.Add(attr.allocated.Clone().Sub(oldAllocated)) + parentAttr.request.Add(attr.request.Clone().Sub(oldRequest)) + parentAttr.inqueue.Add(attr.inqueue.Clone().Sub(oldInqueue)) + parentAttr.elastic.Add(attr.elastic.Clone().Sub(oldElastic)) + } + + klog.V(5).Infof("Queue %s allocated <%s> request <%s> inqueue <%s> elastic <%s>", + attr.name, attr.allocated.String(), attr.request.String(), attr.inqueue.String(), attr.elastic.String()) + } + + // Check the hierarchical structure of queues + err := cp.checkHierarchicalQueue(cp.queueOpts[api.QueueID(cp.rootQueue)]) + if err != nil { + klog.Errorf("Failed to check queue's hierarchical structure, error: %v", err) + return + } + klog.V(4).Infof("Successfully checked queue's hierarchical structure.") + + err = cp.createOrUpdateRootQueue(ssn.VCClient()) + if err != nil { + klog.Errorf("Failed to create root queue, error: %v", err) + return + } + klog.V(4).Infof("Successfully created or updated root queue.") + + // Update share + for _, attr := range cp.queueOpts { + cp.updateShare(attr) + klog.V(4).Infof("The attributes of queue <%s> in capacity: deserved <%v>, realCapability <%v>, allocate <%v>, request <%v>, elastic <%v>, share <%0.2f>", + attr.name, attr.deserved, attr.realCapability, attr.allocated, attr.request, attr.elastic, attr.share) + } + + // Record metrics + for queueID := range ssn.Queues { + queue := ssn.Queues[queueID] + attr := cp.queueOpts[queueID] + metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory) + metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory) + metrics.UpdateQueuePodGroupInqueueCount(attr.name, queue.Queue.Status.Inqueue) + metrics.UpdateQueuePodGroupPendingCount(attr.name, queue.Queue.Status.Pending) + metrics.UpdateQueuePodGroupRunningCount(attr.name, queue.Queue.Status.Running) + metrics.UpdateQueuePodGroupUnknownCount(attr.name, queue.Queue.Status.Unknown) + } + + ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int { + lv := l.(*api.QueueInfo) + rv := r.(*api.QueueInfo) + + if lv.Queue.Spec.Priority != rv.Queue.Spec.Priority { + // return negative means high priority + return int(rv.Queue.Spec.Priority) - int(lv.Queue.Spec.Priority) + } + + lvLeaf := cp.isLeafQueue(lv.UID) + rvLeaf := cp.isLeafQueue(rv.UID) + + if lvLeaf && !rvLeaf { + return -1 + } else if !lvLeaf && rvLeaf { + return 1 + } else if !lvLeaf && !rvLeaf { + if cp.queueOpts[lv.UID].share == cp.queueOpts[rv.UID].share { + return 0 } - exceptReclaimee := allocated.Clone().Sub(reclaimee.Resreq) - // When scalar resource not specified in deserved such as "pods", we should skip it and consider it as infinity, - // so the following first condition will be true and the current queue will not be reclaimed. - if allocated.LessEqual(attr.deserved, api.Infinity) || !attr.guarantee.LessEqual(exceptReclaimee, api.Zero) { - continue + if cp.queueOpts[lv.UID].share < cp.queueOpts[rv.UID].share { + return -1 } - allocated.Sub(reclaimee.Resreq) - victims = append(victims, reclaimee) + return 1 } - klog.V(4).InfoS("Victims from capacity plugin", "victims", victims, "reclaimer", reclaimer) - return victims, util.Permit + + lvAttr := cp.queueOpts[lv.UID] + rvAttr := cp.queueOpts[rv.UID] + level := getQueueLevel(lvAttr, rvAttr) + lvParentID := lvAttr.queueID + rvParentID := rvAttr.queueID + if level+1 < len(lvAttr.parents) { + lvParentID = lvAttr.parents[level+1] + } + if level+1 < len(rvAttr.parents) { + rvParentID = rvAttr.parents[level+1] + } + + if cp.queueOpts[lvParentID].share == cp.queueOpts[rvParentID].share { + return 0 + } + + if cp.queueOpts[lvParentID].share < cp.queueOpts[rvParentID].share { + return -1 + } + + return 1 }) - ssn.AddPreemptiveFn(cp.Name(), func(obj interface{}, candidate interface{}) bool { - queue := obj.(*api.QueueInfo) - task := candidate.(*api.TaskInfo) - attr := cp.queueOpts[queue.UID] + ssn.AddVictimJobOrderFn(cp.Name(), func(l, r, preemptor interface{}) int { + lv := l.(*api.JobInfo) + rv := r.(*api.JobInfo) + pv := preemptor.(*api.JobInfo) - futureUsed := attr.allocated.Clone().Add(task.Resreq) - overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq) - metrics.UpdateQueueOverused(attr.name, overused) - if overused { - klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>", - queue.Name, attr.deserved, attr.allocated, attr.share) + lLevel := getQueueLevel(cp.queueOpts[lv.Queue], cp.queueOpts[pv.Queue]) + rLevel := getQueueLevel(cp.queueOpts[rv.Queue], cp.queueOpts[pv.Queue]) + + if lLevel == rLevel { + return 0 } - // PreemptiveFn is the opposite of OverusedFn in proportion plugin cause as long as there is a one-dimensional - // resource whose deserved is greater than allocated, current task can reclaim by preempt others. - return !overused + if lLevel > rLevel { + return -1 + } + + return 1 }) - ssn.AddAllocatableFn(cp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool { - attr := cp.queueOpts[queue.UID] + ssn.AddVictimTaskOrderFn(cp.Name(), func(l, r, preemptor interface{}) int { + lv := l.(*api.TaskInfo) + rv := r.(*api.TaskInfo) + pv := preemptor.(*api.TaskInfo) - futureUsed := attr.allocated.Clone().Add(candidate.Resreq) - allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq) - if !allocatable { - klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v>: resource request <%v>", - queue.Name, attr.realCapability, attr.allocated, candidate.Name, candidate.Resreq) + lJob := ssn.Jobs[lv.Job] + rJob := ssn.Jobs[rv.Job] + pJob := ssn.Jobs[pv.Job] + + lLevel := getQueueLevel(cp.queueOpts[lJob.Queue], cp.queueOpts[pJob.Queue]) + rLevel := getQueueLevel(cp.queueOpts[rJob.Queue], cp.queueOpts[pJob.Queue]) + + if lLevel == rLevel { + return 0 } - return allocatable + if lLevel > rLevel { + return -1 + } + + return 1 }) +} - ssn.AddJobEnqueueableFn(cp.Name(), func(obj interface{}) int { - job := obj.(*api.JobInfo) - queueID := job.Queue - attr := cp.queueOpts[queueID] - queue := ssn.Queues[queueID] - // If no capability is set, always enqueue the job. - if attr.realCapability == nil { - klog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.", - queue.Name, job.Namespace, job.Name) - return util.Permit +func (cp *capacityPlugin) newQueueAttr(queue *api.QueueInfo) *queueAttr { + attr := &queueAttr{ + queueID: queue.UID, + name: queue.Name, + parents: make([]api.QueueID, 0), + children: make(map[api.QueueID]*queueAttr), + + deserved: api.NewResource(queue.Queue.Spec.Deserved), + allocated: api.EmptyResource(), + request: api.EmptyResource(), + elastic: api.EmptyResource(), + inqueue: api.EmptyResource(), + guarantee: api.EmptyResource(), + } + if len(queue.Queue.Spec.Capability) != 0 { + attr.capability = api.NewResource(queue.Queue.Spec.Capability) + if attr.capability.MilliCPU <= 0 { + attr.capability.MilliCPU = math.MaxFloat64 + } + if attr.capability.Memory <= 0 { + attr.capability.Memory = math.MaxFloat64 } + } - if job.PodGroup.Spec.MinResources == nil { - klog.V(4).Infof("job %s MinResources is null.", job.Name) - return util.Permit + if len(queue.Queue.Spec.Guarantee.Resource) != 0 { + attr.guarantee = api.NewResource(queue.Queue.Spec.Guarantee.Resource) + } + + return attr +} + +func (cp *capacityPlugin) updateParents(queue *api.QueueInfo, ssn *framework.Session) error { + if queue.Name == cp.rootQueue { + return nil + } + + parent := cp.rootQueue + if queue.Queue.Spec.Parent != "" { + parent = queue.Queue.Spec.Parent + } + if _, exist := ssn.Queues[api.QueueID(parent)]; !exist { + return fmt.Errorf("the queue %s has invalid parent queue %s", queue.Name, parent) + } + + parentInfo := ssn.Queues[api.QueueID(parent)] + if _, found := cp.queueOpts[parentInfo.UID]; !found { + parentAttr := cp.newQueueAttr(parentInfo) + cp.queueOpts[parentAttr.queueID] = parentAttr + err := cp.updateParents(parentInfo, ssn) + if err != nil { + return err } - minReq := job.GetMinResources() + } - klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>", - job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String()) - // The queue resource quota limit has not reached - r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic) + cp.queueOpts[parentInfo.UID].children[queue.UID] = cp.queueOpts[queue.UID] + cp.queueOpts[queue.UID].parents = append(cp.queueOpts[parentInfo.UID].parents, parentInfo.UID) + return nil +} - inqueue := r.LessEqualWithDimension(attr.realCapability, minReq) - klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue) - if inqueue { - attr.inqueue.Add(job.DeductSchGatedResources(minReq)) - return util.Permit +func (cp *capacityPlugin) checkHierarchicalQueue(attr *queueAttr) error { + totalGuarantee := api.EmptyResource() + totalDeserved := api.EmptyResource() + for _, childAttr := range attr.children { + totalDeserved.Add(childAttr.deserved) + totalGuarantee.Add(childAttr.guarantee) + // Check if the parent queue's capability is less than the child queue's capability + if attr.capability != nil && childAttr.capability != nil && attr.capability.LessPartly(childAttr.capability, api.Zero) { + return fmt.Errorf("queue <%s> capability is less than its child queue <%s>", attr.name, childAttr.name) } - ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient") - return util.Reject - }) + } - // Register event handlers. - ssn.AddEventHandler(&framework.EventHandler{ - AllocateFunc: func(event *framework.Event) { - job := ssn.Jobs[event.Task.Job] - attr := cp.queueOpts[job.Queue] - attr.allocated.Add(event.Task.Resreq) - metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + if attr.name == cp.rootQueue { + attr.guarantee = totalGuarantee + cp.totalGuarantee = totalGuarantee + attr.realCapability = cp.totalResource + attr.deserved = cp.totalResource + } - cp.updateShare(attr) + for _, childAttr := range attr.children { + realCapability := attr.realCapability.Clone().Sub(totalGuarantee).Add(childAttr.guarantee) + if childAttr.capability == nil { + childAttr.realCapability = realCapability + } else { + realCapability.MinDimensionResource(childAttr.capability, api.Infinity) + childAttr.realCapability = realCapability + } + oldDeserved := childAttr.deserved.Clone() + childAttr.deserved.MinDimensionResource(childAttr.realCapability, api.Infinity) + childAttr.deserved.MinDimensionResource(childAttr.request, api.Infinity) - klog.V(4).Infof("Capacity AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>", - event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) - }, - DeallocateFunc: func(event *framework.Event) { - job := ssn.Jobs[event.Task.Job] - attr := cp.queueOpts[job.Queue] - attr.allocated.Sub(event.Task.Resreq) - metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory) + childAttr.deserved = helpers.Max(childAttr.deserved, childAttr.guarantee) + totalDeserved.Sub(oldDeserved).Add(childAttr.deserved) + } - cp.updateShare(attr) + // Check if the parent queue's deserved resources are less than the total deserved resources of child queues + if attr.deserved.LessPartly(totalDeserved, api.Zero) { + return fmt.Errorf("deserved resources of queue <%s> are less than the sum of its child queues' deserved resources", attr.name) + } - klog.V(4).Infof("Capacity EvictFunc: task <%v/%v>, resreq <%v>, share <%v>", - event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + // Check if the parent queue's guarantee resources are less than the total guarantee resources of child queues + if attr.guarantee.LessPartly(totalGuarantee, api.Zero) { + return fmt.Errorf("guarantee resources of queue <%s> are less than the sum of its child queues' guarantee resources", attr.name) + } + + for _, childAttr := range attr.children { + err := cp.checkHierarchicalQueue(childAttr) + if err != nil { + return err + } + } + + return nil +} + +func (cp *capacityPlugin) createOrUpdateRootQueue(vcClient vcclient.Interface) error { + queue := vcv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(cp.rootQueue), }, + Spec: vcv1beta1.QueueSpec{ + Deserved: v1.ResourceList{ + v1.ResourceCPU: api.ResFloat642Quantity(v1.ResourceCPU, cp.totalGuarantee.MilliCPU), + v1.ResourceMemory: api.ResFloat642Quantity(v1.ResourceMemory, cp.totalResource.Memory), + }, + Guarantee: vcv1beta1.Guarantee{ + Resource: v1.ResourceList{ + v1.ResourceCPU: api.ResFloat642Quantity(v1.ResourceCPU, cp.totalGuarantee.MilliCPU), + v1.ResourceMemory: api.ResFloat642Quantity(v1.ResourceMemory, cp.totalResource.Memory), + }, + }, + }, + } + for rn, rq := range cp.totalResource.ScalarResources { + queue.Spec.Deserved[rn] = api.ResFloat642Quantity(rn, rq) + } + for rn, rq := range cp.totalGuarantee.ScalarResources { + queue.Spec.Guarantee.Resource[rn] = api.ResFloat642Quantity(rn, rq) + } + + err := retry.OnError(wait.Backoff{ + Steps: 60, + Duration: time.Second, + Factor: 1, + Jitter: 0.1, + }, func(err error) bool { + return err != nil && !apierrors.IsNotFound(err) + }, func() error { + _, err := vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), &queue, metav1.UpdateOptions{}) + return err }) -} -func (cp *capacityPlugin) OnSessionClose(ssn *framework.Session) { - cp.totalResource = nil - cp.totalGuarantee = nil - cp.queueOpts = nil + if err != nil { + if apierrors.IsNotFound(err) { + err = retry.OnError(wait.Backoff{ + Steps: 60, + Duration: time.Second, + Factor: 1, + Jitter: 0.1, + }, func(err error) bool { + return err != nil && !apierrors.IsAlreadyExists(err) + }, func() error { + _, err := vcClient.SchedulingV1beta1().Queues().Create(context.TODO(), &queue, metav1.CreateOptions{}) + return err + }) + if err != nil && apierrors.IsAlreadyExists(err) { + return nil + } + } + + return err + } + + return nil } func (cp *capacityPlugin) updateShare(attr *queueAttr) { @@ -362,3 +798,21 @@ func (cp *capacityPlugin) updateShare(attr *queueAttr) { attr.share = res metrics.UpdateQueueShare(attr.name, attr.share) } + +func (cp *capacityPlugin) isLeafQueue(queueID api.QueueID) bool { + return len(cp.queueOpts[queueID].children) == 0 +} + +func getQueueLevel(l *queueAttr, r *queueAttr) int { + level := 0 + + for i := 0; i < min(len(l.parents), len(r.parents)); i++ { + if l.parents[i] == r.parents[i] { + level = i + } else { + return level + } + } + + return level +} diff --git a/pkg/scheduler/plugins/capacity/capacity_test.go b/pkg/scheduler/plugins/capacity/capacity_test.go index 71907f31607..164a4fbd714 100644 --- a/pkg/scheduler/plugins/capacity/capacity_test.go +++ b/pkg/scheduler/plugins/capacity/capacity_test.go @@ -40,7 +40,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func Test_capacityPlugin_OnSessionOpen(t *testing.T) { +func Test_capacityPlugin_OnSessionOpenWithoutHierarchy(t *testing.T) { plugins := map[string]framework.PluginBuilder{PluginName: New, predicates.PluginName: predicates.New} trueValue := true actions := []framework.Action{allocate.New(), reclaim.New()} @@ -333,3 +333,138 @@ func TestEnqueueAndAllocatable(t *testing.T) { }) } } + +func buildQueueWithParents(name string, parent string, deserved corev1.ResourceList, cap corev1.ResourceList) *schedulingv1beta1.Queue { + queue := util.BuildQueueWithResourcesQuantity(name, deserved, cap) + queue.Spec.Parent = parent + return queue +} + +func Test_capacityPlugin_OnSessionOpenWithHierarchy(t *testing.T) { + plugins := map[string]framework.PluginBuilder{PluginName: New, predicates.PluginName: predicates.New} + trueValue := true + actions := []framework.Action{enqueue.New(), reclaim.New(), allocate.New()} + + // nodes + n1 := util.BuildNode("n1", api.BuildResourceList("8", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{}) + + // resources for test case 0 + // pod + p1 := util.BuildPod("ns1", "p1", "", corev1.PodPending, api.BuildResourceList("1", "1Gi"), "pg1", make(map[string]string), map[string]string{}) + // podgroup + pg1 := util.BuildPodGroup("pg1", "ns1", "q11", 1, nil, schedulingv1beta1.PodGroupInqueue) + // queue + queue1 := buildQueueWithParents("q1", "root", nil, api.BuildResourceList("4", "4Gi")) + queue2 := buildQueueWithParents("q2", "root", nil, api.BuildResourceList("4", "4Gi")) + queue11 := buildQueueWithParents("q11", "q1", nil, api.BuildResourceList("1", "1Gi")) + queue12 := buildQueueWithParents("q12", "q1", nil, api.BuildResourceList("3", "3Gi")) + + // resources for test case 1 + // pod + p2 := util.BuildPod("ns1", "p2", "", corev1.PodPending, api.BuildResourceList("1", "1Gi"), "pg2", make(map[string]string), map[string]string{}) + // podgroup + pg2 := util.BuildPodGroup("pg2", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupPending) + + // resources for test case 2 + // pod + p3 := util.BuildPod("ns1", "p3", "", corev1.PodPending, api.BuildResourceList("2", "2Gi"), "pg3", make(map[string]string), map[string]string{}) + p4 := util.BuildPod("ns1", "p4", "", corev1.PodPending, api.BuildResourceList("2", "2Gi"), "pg3", make(map[string]string), map[string]string{}) + // podgroup + pg3 := util.BuildPodGroup("pg3", "ns1", "q31", 2, nil, schedulingv1beta1.PodGroupInqueue) + // queue + queue3 := buildQueueWithParents("q3", "root", api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...)) + queue4 := buildQueueWithParents("q4", "root", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "1"}}...), api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "4"}}...)) + queue31 := buildQueueWithParents("q31", "q3", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "4"}}...)) + queue32 := buildQueueWithParents("q32", "q3", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "2"}}...), api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "4"}}...)) + + // resources for test case 3 + // pod + p5 := util.BuildPod("ns1", "p5", "n1", corev1.PodRunning, api.BuildResourceList("4", "4Gi"), "pg4", map[string]string{}, make(map[string]string)) + p6 := util.BuildPod("ns1", "p6", "n1", corev1.PodRunning, api.BuildResourceList("2", "2Gi"), "pg5", map[string]string{schedulingv1beta1.PodPreemptable: "false"}, make(map[string]string)) + p7 := util.BuildPod("ns1", "p7", "n1", corev1.PodRunning, api.BuildResourceList("2", "2Gi"), "pg5", make(map[string]string), make(map[string]string)) + p8 := util.BuildPod("ns1", "p8", "", corev1.PodPending, api.BuildResourceList("2", "2Gi"), "pg6", make(map[string]string), map[string]string{}) + // podgroup + pg4 := util.BuildPodGroup("pg4", "ns1", "q4", 1, nil, schedulingv1beta1.PodGroupRunning) + pg5 := util.BuildPodGroup("pg5", "ns1", "q31", 1, nil, schedulingv1beta1.PodGroupRunning) + pg6 := util.BuildPodGroup("pg6", "ns1", "q32", 1, nil, schedulingv1beta1.PodGroupInqueue) + + tests := []uthelper.TestCommonStruct{ + { + Name: "case0: Pod allocatable when queue is leaf queue", + Plugins: plugins, + Pods: []*corev1.Pod{p1}, + Nodes: []*corev1.Node{n1}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2, queue11, queue12}, + ExpectBindMap: map[string]string{ + "ns1/p1": "n1", + }, + ExpectBindsNum: 1, + }, + { + Name: "case1: Pod not allocatable when queue is not leaf queue", + Plugins: plugins, + Pods: []*corev1.Pod{p2}, + Nodes: []*corev1.Node{n1}, + PodGroups: []*schedulingv1beta1.PodGroup{pg2}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2, queue11, queue12}, + ExpectBindMap: map[string]string{}, + ExpectBindsNum: 0, + }, + { + Name: "case2: Pod allocatable when queue has not exceed capability", + Plugins: plugins, + Pods: []*corev1.Pod{p3, p4}, + Nodes: []*corev1.Node{n1}, + PodGroups: []*schedulingv1beta1.PodGroup{pg3}, + Queues: []*schedulingv1beta1.Queue{queue3, queue4, queue31, queue32}, + ExpectBindMap: map[string]string{ + "ns1/p3": "n1", + "ns1/p4": "n1", + }, + ExpectBindsNum: 2, + }, + { + Name: "case3: Can reclaim from other queues when allocated < deserved", + Plugins: plugins, + Pods: []*corev1.Pod{p5, p6, p7, p8}, + Nodes: []*corev1.Node{n1}, + PodGroups: []*schedulingv1beta1.PodGroup{pg4, pg5, pg6}, + Queues: []*schedulingv1beta1.Queue{queue3, queue31, queue32, queue4}, + ExpectPipeLined: map[string][]string{ + "ns1/pg6": {"n1"}, + }, + ExpectEvicted: []string{"ns1/p7"}, + ExpectEvictNum: 1, + }, + } + + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledAllocatable: &trueValue, + EnablePreemptive: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledHierarchy: &trueValue, + }, + { + Name: predicates.PluginName, + EnabledPredicate: &trueValue, + }, + }, + }, + } + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + test.RegisterSession(tiers, nil) + defer test.Close() + test.Run(actions) + if err := test.CheckAll(i); err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue.go b/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue.go new file mode 100644 index 00000000000..dde2401caeb --- /dev/null +++ b/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue.go @@ -0,0 +1,132 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutate + +import ( + "context" + "fmt" + "strings" + + admissionv1 "k8s.io/api/admission/v1" + whv1 "k8s.io/api/admissionregistration/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/klog/v2" + + busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1" + vcbus "volcano.sh/apis/pkg/apis/bus/v1alpha1" + "volcano.sh/apis/pkg/apis/helpers" + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/webhooks/router" + "volcano.sh/volcano/pkg/webhooks/schema" + "volcano.sh/volcano/pkg/webhooks/util" +) + +func init() { + router.RegisterAdmission(hierarchyService) +} + +var hierarchyService = &router.AdmissionService{ + Path: "/hierarchicalqueues/mutate", + Func: HierarchicalQueues, + Config: config, + + MutatingConfig: &whv1.MutatingWebhookConfiguration{ + Webhooks: []whv1.MutatingWebhook{{ + Name: "mutatehierarchicalqueues.volcano.sh", + Rules: []whv1.RuleWithOperations{ + { + Operations: []whv1.OperationType{whv1.Create, whv1.Update}, + Rule: whv1.Rule{ + APIGroups: []string{schedulingv1beta1.SchemeGroupVersion.Group}, + APIVersions: []string{schedulingv1beta1.SchemeGroupVersion.Version}, + Resources: []string{"queues"}, + }, + }, + }, + }}, + }, +} + +var config = &router.AdmissionServiceConfig{} + +// HierarchicalQueues mutate queues when capacity plugin enables hierarchy. +func HierarchicalQueues(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { + klog.V(3).Infof("Mutating hierarchical queue %s for operation %s.", ar.Request.Name, ar.Request.Operation) + + queue, err := schema.DecodeQueue(ar.Request.Object, ar.Request.Resource) + if err != nil { + return util.ToAdmissionResponse(err) + } + + switch ar.Request.Operation { + case admissionv1.Create, admissionv1.Update: + err = closeQueue(queue) + default: + return util.ToAdmissionResponse(fmt.Errorf("invalid operation `%s`, "+ + "expect operation to be `CREATE` or `UPDATE`", ar.Request.Operation)) + } + + if err != nil { + return &admissionv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{Message: err.Error()}, + } + } + + return &admissionv1.AdmissionResponse{ + Allowed: true, + } +} + +func closeQueue(queue *schedulingv1beta1.Queue) error { + if queue.Status.State != schedulingv1beta1.QueueStateClosed && queue.Status.State != schedulingv1beta1.QueueStateClosing { + return nil + } + if queue.Name == "root" { + return fmt.Errorf("root queue can not be closed") + } + queueList, err := config.VolcanoClient.SchedulingV1beta1().Queues().List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.parent", queue.Name).String()}) + if err != nil { + return fmt.Errorf("failed to list child queues of closing/closed queue %s/%s: %v", queue.Namespace, queue.Name, err) + } + + for _, childQueue := range queueList.Items { + if childQueue.Status.State != schedulingv1beta1.QueueStateClosed && childQueue.Status.State != schedulingv1beta1.QueueStateClosing { + ctrlRef := metav1.NewControllerRef(&childQueue, helpers.V1beta1QueueKind) + cmd := &vcbus.Command{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: fmt.Sprintf("%s-%s-", + childQueue.Name, strings.ToLower(string(busv1alpha1.CloseQueueAction))), + Namespace: childQueue.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *ctrlRef, + }, + }, + TargetObject: ctrlRef, + Action: string(busv1alpha1.CloseQueueAction), + } + if _, err = config.VolcanoClient.BusV1alpha1().Commands(childQueue.Namespace).Create(context.TODO(), cmd, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create close command for child queue %s/%s of closing/closed queue %s/%s: %v", childQueue.Namespace, childQueue.Name, queue.Namespace, queue.Name, err) + } + klog.V(3).Infof("Closing child queue %s/%s because of its closing/closed parent queue %s/%s.", childQueue.Namespace, childQueue.Name, queue.Namespace, queue.Name) + } + } + return nil +} diff --git a/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue_test.go b/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue_test.go new file mode 100644 index 00000000000..a6cecd899b8 --- /dev/null +++ b/pkg/webhooks/admission/queues/mutate/mutate_hierarchical_queue_test.go @@ -0,0 +1,145 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutate + +import ( + "encoding/json" + "testing" + + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + fakeclient "volcano.sh/apis/pkg/client/clientset/versioned/fake" +) + +func TestMutateHierarchicalQueues(t *testing.T) { + closeState := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "close-state-queue", + }, + Spec: schedulingv1beta1.QueueSpec{ + Parent: "root", + }, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateClosing, + }, + } + + closeStateJSON, err := json.Marshal(closeState) + if err != nil { + t.Errorf("Marshal queue with close state failed for %v.", err) + } + + rootQueue := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "root", + }, + Spec: schedulingv1beta1.QueueSpec{}, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateClosing, + }, + } + closeRootJSON, err := json.Marshal(rootQueue) + if err != nil { + t.Errorf("Marshal root queue with close state failed for %v.", err) + } + + config.VolcanoClient = fakeclient.NewSimpleClientset() + + testCases := []struct { + Name string + AR admissionv1.AdmissionReview + reviewResponse *admissionv1.AdmissionResponse + }{ + { + Name: "Close queue", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "normal-case-set-close-state", + Operation: "UPDATE", + Object: runtime.RawExtension{ + Raw: closeStateJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: true, + PatchType: nil, + Patch: nil, + }, + }, + { + Name: "Close root queue", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "normal-case-set-close-state", + Operation: "UPDATE", + Object: runtime.RawExtension{ + Raw: closeRootJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "root queue can not be closed", + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + reviewResponse := HierarchicalQueues(testCase.AR) + if !equality.Semantic.DeepEqual(reviewResponse, testCase.reviewResponse) { + t.Errorf("Test case '%s' failed, expect: %v, got: %v", testCase.Name, + testCase.reviewResponse, reviewResponse) + } + }) + } + +} diff --git a/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue.go b/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue.go new file mode 100644 index 00000000000..33af6af779d --- /dev/null +++ b/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue.go @@ -0,0 +1,133 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validate + +import ( + "context" + "fmt" + "strings" + + admissionv1 "k8s.io/api/admission/v1" + whv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/klog/v2" + + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/webhooks/router" + "volcano.sh/volcano/pkg/webhooks/schema" + "volcano.sh/volcano/pkg/webhooks/util" +) + +func init() { + router.RegisterAdmission(hierarchyService) +} + +var hierarchyService = &router.AdmissionService{ + Path: "/hierarchicalqueues/validate", + Func: HierarchicalQueues, + Config: config, + MutatingConfig: &whv1.MutatingWebhookConfiguration{ + Webhooks: []whv1.MutatingWebhook{{ + Name: "validatehierarchicalqueues.volcano.sh", + Rules: []whv1.RuleWithOperations{ + { + Operations: []whv1.OperationType{whv1.Create, whv1.Update, whv1.Delete}, + Rule: whv1.Rule{ + APIGroups: []string{schedulingv1beta1.SchemeGroupVersion.Group}, + APIVersions: []string{schedulingv1beta1.SchemeGroupVersion.Version}, + Resources: []string{"queues"}, + }, + }, + }, + }}, + }, +} + +// HierarchicalQueues validate queues when capacity plugin enables hierarchy. +func HierarchicalQueues(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { + klog.V(3).Infof("Validating hierarchical queue %s for operation %s.", ar.Request.Name, ar.Request.Operation) + + queue, err := schema.DecodeQueue(ar.Request.Object, ar.Request.Resource) + if err != nil { + return util.ToAdmissionResponse(err) + } + + switch ar.Request.Operation { + case admissionv1.Create, admissionv1.Update: + err = validateHierarchicalQueue(queue) + case admissionv1.Delete: + err = validateHierarchicalQueueDeleting(queue) + default: + return util.ToAdmissionResponse(fmt.Errorf("invalid operation `%s`, "+ + "expect operation to be `CREATE`, `UPDATE` or `DELETE`", ar.Request.Operation)) + } + + if err != nil { + return &admissionv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{Message: err.Error()}, + } + } + + return &admissionv1.AdmissionResponse{ + Allowed: true, + } +} + +func validateHierarchicalQueue(queue *schedulingv1beta1.Queue) error { + if queue.Spec.Parent == "" || queue.Spec.Parent == "root" { + return nil + } + parentQueue, err := config.VolcanoClient.SchedulingV1beta1().Queues().Get(context.TODO(), queue.Spec.Parent, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get parent queue of queue %s/%s: %v", queue.Namespace, queue.Name, err) + } + + if parentQueue.Status.Pending+parentQueue.Status.Running+parentQueue.Status.Unknown+parentQueue.Status.Inqueue > 0 { + return fmt.Errorf("queue %s/%s cannot be the parent queue of queue %s/%s because it has PodGroups (pending: %d, running: %d, unknown: %d, inqueue: %d)", + parentQueue.Namespace, parentQueue.Name, queue.Namespace, queue.Name, parentQueue.Status.Pending, + parentQueue.Status.Running, parentQueue.Status.Unknown, parentQueue.Status.Inqueue) + } + + klog.V(3).Infof("Validation passed for hierarchical queue %s/%s with parent queue %s/%s", + queue.Namespace, queue.Name, parentQueue.Namespace, parentQueue.Name) + return nil +} + +func validateHierarchicalQueueDeleting(queue *schedulingv1beta1.Queue) error { + if queue.Name == "root" { + return fmt.Errorf("root queue can not be deleted") + } + queueList, err := config.VolcanoClient.SchedulingV1beta1().Queues().List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.parent", queue.Name).String()}) + if err != nil { + return fmt.Errorf("failed to list child queues of queue %s/%s: %v", queue.Namespace, queue.Name, err) + } + childQueueNames := make([]string, len(queueList.Items)) + for i, childQueue := range queueList.Items { + childQueueNames[i] = fmt.Sprintf("%s/%s", childQueue.Namespace, childQueue.Name) + } + + if len(queueList.Items) > 0 { + return fmt.Errorf("queue %s/%s can not be deleted because it has %d child queues: %s", + queue.Namespace, queue.Name, len(queueList.Items), strings.Join(childQueueNames, ", ")) + } + + klog.V(3).Infof("Validation passed for deleting hierarchical queue %s/%s", queue.Namespace, queue.Name) + return nil +} diff --git a/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue_test.go b/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue_test.go new file mode 100644 index 00000000000..ed16ecc61d1 --- /dev/null +++ b/pkg/webhooks/admission/queues/validate/validate_hierarchical_queue_test.go @@ -0,0 +1,301 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validate + +import ( + "context" + "encoding/json" + "testing" + + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + fakeclient "volcano.sh/apis/pkg/client/clientset/versioned/fake" +) + +func TestAdmitHierarchicalQueues(t *testing.T) { + stateNotSet := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "normal-case-not-set", + }, + Spec: schedulingv1beta1.QueueSpec{ + Weight: 1, + }, + } + + stateNotSetJSON, err := json.Marshal(stateNotSet) + if err != nil { + t.Errorf("Marshal queue without state set failed for %v.", err) + } + + openState := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "normal-case-set-open", + }, + Spec: schedulingv1beta1.QueueSpec{}, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateOpen, + }, + } + + openStateJSON, err := json.Marshal(openState) + if err != nil { + t.Errorf("Marshal queue with open state failed for %v.", err) + } + + closedState := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "normal-case-set-closed", + }, + Spec: schedulingv1beta1.QueueSpec{}, + Status: schedulingv1beta1.QueueStatus{ + State: schedulingv1beta1.QueueStateClosed, + }, + } + + closedStateJSON, err := json.Marshal(closedState) + if err != nil { + t.Errorf("Marshal queue with closed state failed for %v.", err) + } + + parentQueueWithJobs := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "parent-queue-with-jobs", + }, + Spec: schedulingv1beta1.QueueSpec{ + Parent: "queue-with-jobs", + }, + } + + parentQueueWithJobsJSON, err := json.Marshal(parentQueueWithJobs) + if err != nil { + t.Errorf("Marshal queue with parent queue failed for %v.", err) + } + + parentQueueWithoutJobs := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "parent-queue-without-jobs", + }, + Spec: schedulingv1beta1.QueueSpec{ + Parent: "queue-without-jobs", + }, + } + + parentQueueWithoutJobsJSON, err := json.Marshal(parentQueueWithoutJobs) + if err != nil { + t.Errorf("Marshal queue with parent queue failed for %v.", err) + } + + config.VolcanoClient = fakeclient.NewSimpleClientset() + queueWithJobs := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "queue-with-jobs", + }, + Spec: schedulingv1beta1.QueueSpec{ + Parent: "root", + }, + Status: schedulingv1beta1.QueueStatus{ + Running: 2, + }, + } + + queueWithoutJobs := schedulingv1beta1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "queue-without-jobs", + }, + Spec: schedulingv1beta1.QueueSpec{ + Parent: "root", + }, + } + + _, err = config.VolcanoClient.SchedulingV1beta1().Queues().Create(context.TODO(), &queueWithJobs, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Create queue with jobs failed for %v.", err) + } + + _, err = config.VolcanoClient.SchedulingV1beta1().Queues().Create(context.TODO(), &queueWithoutJobs, metav1.CreateOptions{}) + if err != nil { + t.Errorf("Create queue without jobs failed for %v.", err) + } + + testCases := []struct { + Name string + AR admissionv1.AdmissionReview + reviewResponse *admissionv1.AdmissionResponse + }{ + { + Name: "Normal Case State Not Set During Creating", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1beta1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "normal-case-not-set", + Operation: "CREATE", + Object: runtime.RawExtension{ + Raw: stateNotSetJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: true, + }, + }, + { + Name: "Normal Case Set State of Open During Creating", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1beta1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "normal-case-set-open", + Operation: "CREATE", + Object: runtime.RawExtension{ + Raw: openStateJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: true, + }, + }, + { + Name: "Normal Case Set State of Closed During Creating", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1beta1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "normal-case-set-closed", + Operation: "CREATE", + Object: runtime.RawExtension{ + Raw: closedStateJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: true, + }, + }, + { + Name: "Parent Queue is Leaf Queue", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1beta1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "parent-queue-with-jobs", + Operation: "CREATE", + Object: runtime.RawExtension{ + Raw: parentQueueWithJobsJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "queue /queue-with-jobs cannot be the parent queue of queue /parent-queue-with-jobs because it has PodGroups (pending: 0, running: 2, unknown: 0, inqueue: 0)", + }, + }, + }, + { + Name: "Parent Queue is not Leaf Queue", + AR: admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1beta1", + }, + Request: &admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Kind: "Queue", + }, + Resource: metav1.GroupVersionResource{ + Group: "scheduling.volcano.sh", + Version: "v1beta1", + Resource: "queues", + }, + Name: "parent-queue-without-jobs", + Operation: "CREATE", + Object: runtime.RawExtension{ + Raw: parentQueueWithoutJobsJSON, + }, + }, + }, + reviewResponse: &admissionv1.AdmissionResponse{ + Allowed: true, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + reviewResponse := HierarchicalQueues(testCase.AR) + if !equality.Semantic.DeepEqual(reviewResponse, testCase.reviewResponse) { + t.Errorf("Test case %s failed, expect %v, got %v", testCase.Name, + testCase.reviewResponse, reviewResponse) + } + }) + } +}