Skip to content

Commit

Permalink
feat: add hierarchical queues for capacity plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Rui-Gan <[email protected]>
  • Loading branch information
Rui-Gan committed Sep 23, 2024
1 parent 0843c0d commit 32dbd06
Show file tree
Hide file tree
Showing 13 changed files with 1,522 additions and 150 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (pmpt *Action) preempt(
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, preemptor)
// Preempt victims for tasks, pick lowest priority task first.
preempted := api.EmptyResource()

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

victimsQueue := ssn.BuildVictimsPriorityQueue(victims)
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, task)

resreq := task.InitResreq.Clone()
reclaimed := api.EmptyResource()
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type LessFn func(interface{}, interface{}) bool
// CompareFn is the func declaration used by sort or priority queue.
type CompareFn func(interface{}, interface{}) int

// CompareFn is the func declaration used by sort or priority victims.
type VictimCompareFn func(interface{}, interface{}, interface{}) int

// ValidateFn is the func declaration used to check object's status.
type ValidateFn func(interface{}) bool

Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,11 @@ func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
}

// Client returns the volcano clientSet
func (sc *SchedulerCache) VCClient() vcclient.Interface {
return sc.vcClient
}

// ClientConfig returns the rest config
func (sc *SchedulerCache) ClientConfig() *rest.Config {
return sc.restConfig
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ type Cache interface {
// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface

// VCClient returns the volcano clientSet, which can be used by plugins
VCClient() vcclient.Interface

// ClientConfig returns the rest config
ClientConfig() *rest.Config

Expand Down
96 changes: 54 additions & 42 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
Expand All @@ -46,6 +47,7 @@ type Session struct {
UID types.UID

kubeClient kubernetes.Interface
vcClient vcclient.Interface
recorder record.EventRecorder
cache cache.Cache
restConfig *rest.Config
Expand All @@ -72,22 +74,24 @@ type Session struct {
Configurations []conf.Configuration
NodeList []*api.NodeInfo

plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
plugins map[string]Plugin
eventHandlers []*EventHandler
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
victimJobOrderFns map[string]api.VictimCompareFn
victimTaskOrderFns map[string]api.VictimCompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
nodeMapFns map[string]api.NodeMapFn
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
Expand All @@ -107,6 +111,7 @@ func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
kubeClient: cache.Client(),
vcClient: cache.VCClient(),
restConfig: cache.ClientConfig(),
recorder: cache.EventRecorder(),
cache: cache,
Expand All @@ -121,32 +126,34 @@ func openSession(cache cache.Cache) *Session {
RevocableNodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
victimJobOrderFns: map[string]api.VictimCompareFn{},
victimTaskOrderFns: map[string]api.VictimCompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
nodeMapFns: map[string]api.NodeMapFn{},
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
jobEnqueueableFns: map[string]api.VoteFn{},
jobEnqueuedFns: map[string]api.JobEnqueuedFn{},
targetJobFns: map[string]api.TargetJobFn{},
reservedNodesFns: map[string]api.ReservedNodesFn{},
victimTasksFns: map[string][]api.VictimTasksFn{},
jobStarvingFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -590,6 +597,11 @@ func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
}

// VCClient returns the volcano client
func (ssn Session) VCClient() vcclient.Interface {
return ssn.vcClient
}

// ClientConfig returns the rest client
func (ssn Session) ClientConfig() *rest.Config {
return ssn.restConfig
Expand Down
64 changes: 56 additions & 8 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) {
ssn.taskOrderFns[name] = cf
}

// AddVictimJobOrderFn add victimjoborder function
func (ssn *Session) AddVictimJobOrderFn(name string, vcf api.VictimCompareFn) {
ssn.victimJobOrderFns[name] = vcf
}

// AddVictimTaskOrderFn add victimtaskorder function
func (ssn *Session) AddVictimTaskOrderFn(name string, vcf api.VictimCompareFn) {
ssn.victimTaskOrderFns[name] = vcf
}

// AddPreemptableFn add preemptable function
func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) {
ssn.preemptableFns[name] = cf
Expand Down Expand Up @@ -545,6 +555,29 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool {
return lv.CreationTimestamp.Before(&rv.CreationTimestamp)
}

// VictimJobOrderFn invoke victimjoborder function of the plugins
func (ssn *Session) VictimJobOrderFn(l, r, preemptor interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
jof, found := ssn.victimJobOrderFns[plugin.Name]
if !found {
continue
}
if j := jof(l, r, preemptor); j != 0 {
return j < 0
}
}
}

lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
if lv.Queue != rv.Queue {
return !ssn.QueueOrderFn(ssn.Queues[lv.Queue], ssn.Queues[rv.Queue])
}

return !ssn.JobOrderFn(l, r)
}

// ClusterOrderFn invoke ClusterOrderFn function of the plugins
func (ssn *Session) ClusterOrderFn(l, r interface{}) bool {
for _, tier := range ssn.Tiers {
Expand Down Expand Up @@ -626,6 +659,23 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
return helpers.CompareTask(lv, rv)
}

// VictimTaskOrderFn invoke victimtaskorder function of the plugins
func (ssn *Session) VictimTaskOrderFn(l, r, preemptor interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
tof, found := ssn.victimTaskOrderFns[plugin.Name]
if !found {
continue
}
if j := tof(l, r, preemptor); j != 0 {
return j < 0
}
}
}

return !ssn.TaskOrderFn(l, r)
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
Expand Down Expand Up @@ -791,21 +841,19 @@ func (ssn *Session) NodeOrderReduceFn(task *api.TaskInfo, pluginNodeScoreMap map
// BuildVictimsPriorityQueue returns a priority queue with victims sorted by:
// if victims has same job id, sorted by !ssn.TaskOrderFn
// if victims has different job id, sorted by !ssn.JobOrderFn
func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.PriorityQueue {
func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo, preemptor *api.TaskInfo) *util.PriorityQueue {
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)
if lv.Job == rv.Job {
return !ssn.TaskOrderFn(l, r)
return ssn.VictimTaskOrderFn(l, r, preemptor)
}

lvJob, lvJobFound := ssn.Jobs[lv.Job]
rvJob, rvJobFound := ssn.Jobs[rv.Job]
if lvJobFound && rvJobFound && lvJob.Queue != rvJob.Queue {
return !ssn.QueueOrderFn(ssn.Queues[lvJob.Queue], ssn.Queues[rvJob.Queue])
}
preemptorJob := ssn.Jobs[preemptor.Job]
lvJob := ssn.Jobs[lv.Job]
rvJob := ssn.Jobs[rv.Job]

return !ssn.JobOrderFn(lvJob, rvJob)
return ssn.VictimJobOrderFn(lvJob, rvJob, preemptorJob)
})
for _, victim := range victims {
victimsQueue.Push(victim)
Expand Down
Loading

0 comments on commit 32dbd06

Please sign in to comment.