Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Feb 14, 2025
1 parent 67957a1 commit c0711fc
Show file tree
Hide file tree
Showing 11 changed files with 2,581 additions and 25 deletions.
37 changes: 17 additions & 20 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package allocate

import (
"sort"
"time"

"k8s.io/klog/v2"
Expand All @@ -34,7 +33,6 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
hyperNodesTiers []int

// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
Expand All @@ -45,7 +43,6 @@ type Action struct {
func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
hyperNodesTiers: []int{},
hyperNodeScoresByJob: make(map[string]map[string]float64),
}
}
Expand All @@ -61,26 +58,11 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
alloc.hyperNodesTiers = tiers
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)
alloc.parseHyperNodesTiers(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
Expand Down Expand Up @@ -241,7 +223,7 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for _, tier := range alloc.hyperNodesTiers {
for _, tier := range ssn.HyperNodesTiers {
if tier > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
Expand Down Expand Up @@ -375,6 +357,8 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
ssn := alloc.session
stmt := framework.NewStatement(ssn)
ph := util.NewPredicateHelper()
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]
jobAllocatedNewHyperNode := jobAllocatedHyperNode

for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)
Expand Down Expand Up @@ -414,11 +398,21 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}
}

task.JobAllocatedNewHyperNode = jobAllocatedNewHyperNode
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
if bestNode == nil {
continue
}

if hyperNode == "" {
hyperNode = util.FindHyperNodeOfNode(bestNode.Name, ssn.RealNodesList, ssn.HyperNodesTiers, ssn.HyperNodesSetByTier)
if hyperNode != "" {
if jobAllocatedNewHyperNode == "" {
jobAllocatedNewHyperNode = hyperNode
} else {
jobAllocatedNewHyperNode = ssn.HyperNodes.GetLCAHyperNode(hyperNode, jobAllocatedNewHyperNode)
}
}
}
alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
alloc.allocateResourcesForTask(stmt, task, bestNode, job)

Expand All @@ -429,6 +423,9 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a

if ssn.JobReady(job) {
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
if jobAllocatedNewHyperNode != "" && jobAllocatedNewHyperNode != jobAllocatedHyperNode {
job.PodGroup.GetAnnotations()[api.JobAllocatedHyperNode] = jobAllocatedNewHyperNode
}
return stmt
} else {
if !ssn.JobPipelined(job) {
Expand Down
252 changes: 252 additions & 0 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,132 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
ExpectBindsNum: 1,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can allocate job when highestTierAllowed not reached and hyperNodesInfo has three tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s3", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 2),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s3-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s3-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s3-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s3-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{
1: sets.New[string]("s3", "s4", "s5", "s6"),
2: sets.New[string]("s1", "s2"),
3: sets.New[string]("s0")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(api.BuildHyperNode("s0", 3, []api.MemberConfig{
{
Name: "s1",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s2",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s1": api.NewHyperNodeInfo(api.BuildHyperNode("s1", 2, []api.MemberConfig{
{
Name: "s3",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s4",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s2": api.NewHyperNodeInfo(api.BuildHyperNode("s2", 2, []api.MemberConfig{
{
Name: "s5",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s6",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s3": api.NewHyperNodeInfo(api.BuildHyperNode("s3", 1, []api.MemberConfig{
{
Name: "s3-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s3-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s4": api.NewHyperNodeInfo(api.BuildHyperNode("s4", 1, []api.MemberConfig{
{
Name: "s4-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s4-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s5": api.NewHyperNodeInfo(api.BuildHyperNode("s5", 1, []api.MemberConfig{
{
Name: "s5-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s5-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s6": api.NewHyperNodeInfo(api.BuildHyperNode("s6", 1, []api.MemberConfig{
{
Name: "s6-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s6-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2", "s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s1": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2"),
"s2": sets.New[string]("s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s3": sets.New[string]("s3-n1", "s3-n2"),
"s4": sets.New[string]("s4-n1", "s4-n2"),
"s5": sets.New[string]("s5-n1", "s5-n2"),
"s6": sets.New[string]("s6-n1", "s6-n2"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 1,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when cross highestTierAllowed tier",
PodGroups: []*schedulingv1.PodGroup{
Expand Down Expand Up @@ -687,6 +813,132 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
ExpectBindsNum: 0,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when cross highestTierAllowed tier and hyperNodesInfo has three tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s3", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s3-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s3-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s3-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s3-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{
1: sets.New[string]("s3", "s4", "s5", "s6"),
2: sets.New[string]("s1", "s2"),
3: sets.New[string]("s0")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(api.BuildHyperNode("s0", 3, []api.MemberConfig{
{
Name: "s1",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s2",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s1": api.NewHyperNodeInfo(api.BuildHyperNode("s1", 2, []api.MemberConfig{
{
Name: "s3",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s4",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s2": api.NewHyperNodeInfo(api.BuildHyperNode("s2", 2, []api.MemberConfig{
{
Name: "s5",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s6",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s3": api.NewHyperNodeInfo(api.BuildHyperNode("s3", 1, []api.MemberConfig{
{
Name: "s3-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s3-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s4": api.NewHyperNodeInfo(api.BuildHyperNode("s4", 1, []api.MemberConfig{
{
Name: "s4-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s4-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s5": api.NewHyperNodeInfo(api.BuildHyperNode("s5", 1, []api.MemberConfig{
{
Name: "s5-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s5-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s6": api.NewHyperNodeInfo(api.BuildHyperNode("s6", 1, []api.MemberConfig{
{
Name: "s6-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s6-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2", "s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s1": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2"),
"s2": sets.New[string]("s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s3": sets.New[string]("s3-n1", "s3-n2"),
"s4": sets.New[string]("s4-n1", "s4-n2"),
"s5": sets.New[string]("s5-n1", "s5-n2"),
"s6": sets.New[string]("s6-n1", "s6-n2"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 0,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when LCAHyperNode is empty",
PodGroups: []*schedulingv1.PodGroup{
Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ type TaskID types.UID

// TransactionContext holds all the fields that needed by scheduling transaction
type TransactionContext struct {
NodeName string
EvictionOccurred bool
Status TaskStatus
NodeName string
EvictionOccurred bool
JobAllocatedNewHyperNode string
Status TaskStatus
}

// Clone returns a clone of TransactionContext
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
Loading

0 comments on commit c0711fc

Please sign in to comment.