Skip to content

Commit

Permalink
Improve allocatable resource calculation - nodeDb (#4197)
Browse files Browse the repository at this point in the history
* Improve allocatable resource calculation - nodeDb

 - We no longer add nodes to the nodeDb if they are unschedulable with no jobs assigned to them
   - These nodes should never get anything assigned to them, so no need to include them in the nodeDb
   - This means these nodes resource is not counted towards total allocatable resource, which is used for fairshare purposes
 - Don't allow Armada to schedule over unallocatableResources
   - node now has an allocatableResources resources field, which is calculated as  allocatableResources = totalResources - sum(unallocatableResources)
   - This is how much resource is actually alloctable, and we no longer use totalResources for fairshare calculation

The net result here is:
 - Resource used for fairshare calculate better represents the amount of resource available to be scheduled on
     - Unallocatable resource is properly respected
     - Nodes that cannot be scheduled on do not skew the fairshare calculation

The fairshare calculation still is "wrong" in the case an unschedulable node has jobs running on it, but those jobs don't fill the node
 - The remaning resource is used in the fairshare calculation, but can never actually be allocated

Signed-off-by: JamesMurkin <[email protected]>

* Simplify setting evicted priority allocatable resource

Signed-off-by: JamesMurkin <[email protected]>

* Remove unused func

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Feb 10, 2025
1 parent eb1324c commit 6569e24
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 27 deletions.
2 changes: 2 additions & 0 deletions e2e/setup/kind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ nodes:
image: kindest/node:v1.26.15
- role: control-plane
image: kindest/node:v1.26.15
labels:
armadaproject.io/pool: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
Expand Down
55 changes: 35 additions & 20 deletions internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package internaltypes

import (
"fmt"
"math"

"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -41,8 +40,13 @@ type Node struct {
taints []v1.Taint
labels map[string]string

// Total space allocatable on this node
unschedulable bool

// Total space on this node
totalResources ResourceList
// Total space allocatable by armada jobs on this node
// allocatableResources = totalResources - sum(unallocatableResources)
allocatableResources ResourceList

unallocatableResources map[int32]ResourceList

Expand All @@ -63,20 +67,19 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
resourceListFactory *ResourceListFactory,
) *Node {
totalResources := resourceListFactory.FromNodeProto(node.TotalResources.Resources)

allocatableByPriority := map[int32]ResourceList{}
for _, p := range allowedPriorities {
allocatableByPriority[p] = totalResources
}
allocatableResources := totalResources
unallocatableResources := make(map[int32]ResourceList, len(node.UnallocatableResources))
for p, rl := range node.UnallocatableResources {
MarkAllocated(allocatableByPriority, p, resourceListFactory.FromJobResourceListIgnoreUnknown(rl.Resources))
resource := resourceListFactory.FromJobResourceListIgnoreUnknown(rl.Resources)
allocatableResources = allocatableResources.Subtract(resource)
unallocatableResources[p] = resource
}
allocatableByPriority[EvictedPriority] = allocatableByPriority[minInt32(allowedPriorities)]

unallocatableResources := map[int32]ResourceList{}
for p, u := range node.UnallocatableResources {
unallocatableResources[p] = resourceListFactory.FromJobResourceListIgnoreUnknown(u.Resources)
allocatableByPriority := map[int32]ResourceList{}
for _, p := range allowedPriorities {
allocatableByPriority[p] = allocatableResources
}
allocatableByPriority[EvictedPriority] = allocatableResources

return CreateNodeAndType(
node.Id,
Expand All @@ -90,6 +93,7 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
indexedTaints,
indexedNodeLabels,
totalResources,
allocatableResources,
unallocatableResources,
allocatableByPriority,
)
Expand All @@ -107,6 +111,7 @@ func CreateNodeAndType(
indexedTaints map[string]bool,
indexedNodeLabels map[string]bool,
totalResources ResourceList,
allocatableResources ResourceList,
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
) *Node {
Expand Down Expand Up @@ -137,7 +142,9 @@ func CreateNodeAndType(
pool,
taints,
labels,
unschedulable,
totalResources,
allocatableResources,
unallocatableResources,
allocatableByPriority,
map[string]ResourceList{},
Expand All @@ -155,7 +162,9 @@ func CreateNode(
pool string,
taints []v1.Taint,
labels map[string]string,
unschedulable bool,
totalResources ResourceList,
allocatableResources ResourceList,
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
allocatedByQueue map[string]ResourceList,
Expand All @@ -172,7 +181,9 @@ func CreateNode(
pool: pool,
taints: koTaint.DeepCopyTaints(taints),
labels: deepCopyLabels(labels),
unschedulable: unschedulable,
totalResources: totalResources,
allocatableResources: allocatableResources,
unallocatableResources: maps.Clone(unallocatableResources),
AllocatableByPriority: maps.Clone(allocatableByPriority),
AllocatedByQueue: maps.Clone(allocatedByQueue),
Expand All @@ -190,6 +201,10 @@ func (node *Node) GetName() string {
return node.name
}

func (node *Node) IsUnschedulable() bool {
return node.unschedulable
}

func (node *Node) GetPool() string {
return node.pool
}
Expand Down Expand Up @@ -243,6 +258,10 @@ func (node *Node) GetTotalResources() ResourceList {
return node.totalResources
}

func (node *Node) GetAllocatableResources() ResourceList {
return node.allocatableResources
}

func (node *Node) GetUnallocatableResources() map[int32]ResourceList {
return maps.Clone(node.unallocatableResources)
}
Expand All @@ -258,7 +277,9 @@ func (node *Node) DeepCopyNilKeys() *Node {
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
unschedulable: node.unschedulable,
totalResources: node.totalResources,
allocatableResources: node.allocatableResources,
unallocatableResources: node.unallocatableResources,

// keys set to nil
Expand All @@ -282,7 +303,9 @@ func (node *Node) SummaryString() string {
result += fmt.Sprintf("Executor: %s\n", node.executor)
result += fmt.Sprintf("Name: %s\n", node.name)
result += fmt.Sprintf("Pool: %s\n", node.pool)
result += fmt.Sprintf("Unschedulable: %t\n", node.unschedulable)
result += fmt.Sprintf("TotalResources: %s\n", node.totalResources.String())
result += fmt.Sprintf("AllocatableResources: %s\n", node.allocatableResources.String())
result += fmt.Sprintf("Labels: %v\n", node.labels)
result += fmt.Sprintf("Taints: %v\n", node.taints)
for p, u := range node.unallocatableResources {
Expand All @@ -299,11 +322,3 @@ func deepCopyLabels(labels map[string]string) map[string]string {
}
return result
}

func minInt32(arr []int32) int32 {
result := int32(math.MaxInt32)
for _, val := range arr {
result = min(result, val)
}
return result
}
4 changes: 4 additions & 0 deletions internal/scheduler/internaltypes/node_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (f *NodeFactory) CreateNodeAndType(
taints []v1.Taint,
labels map[string]string,
totalResources ResourceList,
allocatableResources ResourceList,
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
) *Node {
Expand All @@ -77,6 +78,7 @@ func (f *NodeFactory) CreateNodeAndType(
f.indexedTaints,
f.indexedNodeLabels,
totalResources,
allocatableResources,
unallocatableResources,
allocatableByPriority,
)
Expand Down Expand Up @@ -125,6 +127,7 @@ func (f *NodeFactory) AddLabels(nodes []*Node, extraLabels map[string]string) []
f.indexedTaints,
f.indexedNodeLabels,
node.GetTotalResources(),
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
)
Expand All @@ -146,6 +149,7 @@ func (f *NodeFactory) AddTaints(nodes []*Node, extraTaints []v1.Taint) []*Node {
f.indexedTaints,
f.indexedNodeLabels,
node.GetTotalResources(),
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
)
Expand Down
8 changes: 8 additions & 0 deletions internal/scheduler/internaltypes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func TestNode(t *testing.T) {
"memory": resource.MustParse("32Gi"),
},
)
allocatableResources := resourceListFactory.FromNodeProto(
map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
)
unallocatableResources := map[int32]ResourceList{
1: resourceListFactory.FromJobResourceListIgnoreUnknown(
map[string]resource.Quantity{
Expand Down Expand Up @@ -110,7 +116,9 @@ func TestNode(t *testing.T) {
pool,
taints,
labels,
false,
totalResources,
allocatableResources,
unallocatableResources,
allocatableByPriority,
allocatedByQueue,
Expand Down
12 changes: 6 additions & 6 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (nodeDb *NodeDb) addNodeToStats(node *internaltypes.Node) {
}
nodeType := node.GetNodeType()
nodeDb.numNodesByNodeType[nodeType.GetId()]++
nodeDb.totalResources = nodeDb.totalResources.Add(node.GetTotalResources())
nodeDb.totalAllocatableResources = nodeDb.totalAllocatableResources.Add(node.GetAllocatableResources())
nodeDb.nodeTypes[node.GetNodeTypeId()] = nodeType
}

Expand Down Expand Up @@ -123,8 +123,8 @@ type NodeDb struct {
numNodes uint64
// Number of nodes in the db by node type.
numNodesByNodeType map[uint64]int
// Total amount of resources, e.g., "cpu", "memory", "gpu", across all nodes in the db.
totalResources internaltypes.ResourceList
// Total amount of allocatable resources, e.g., "cpu", "memory", "gpu", across all nodes in the db.
totalAllocatableResources internaltypes.ResourceList
// Set of node types. Populated automatically as nodes are inserted.
// Node types are not cleaned up if all nodes of that type are removed from the NodeDb.
nodeTypes map[uint64]*internaltypes.NodeType
Expand Down Expand Up @@ -204,7 +204,7 @@ func NewNodeDb(
nodeTypes: make(map[uint64]*internaltypes.NodeType),
wellKnownNodeTypes: make(map[string]*configuration.WellKnownNodeType),
numNodesByNodeType: make(map[uint64]int),
totalResources: resourceListFactory.MakeAllZero(),
totalAllocatableResources: resourceListFactory.MakeAllZero(),
db: db,
// Set the initial capacity (somewhat arbitrarily) to 128 reasons.
podRequirementsNotMetReasonStringCache: make(map[uint64]string, 128),
Expand Down Expand Up @@ -297,7 +297,7 @@ func (nodeDb *NodeDb) NumNodes() int {
}

func (nodeDb *NodeDb) TotalKubernetesResources() internaltypes.ResourceList {
return nodeDb.totalResources
return nodeDb.totalAllocatableResources
}

func (nodeDb *NodeDb) Txn(write bool) *memdb.Txn {
Expand Down Expand Up @@ -1023,7 +1023,7 @@ func (nodeDb *NodeDb) ClearAllocated() error {
node = node.DeepCopyNilKeys()
node.AllocatableByPriority = newAllocatableByPriorityAndResourceType(
nodeDb.nodeDbPriorities,
node.GetTotalResources(),
node.GetAllocatableResources(),
)
newNodes = append(newNodes, node)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/nodedb/nodeidindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func makeTestNode(id string) *internaltypes.Node {
"pool",
[]v1.Taint{},
map[string]string{},
false,
internaltypes.ResourceList{},
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/nodedb/nodematching.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func StaticJobRequirementsMet(node *internaltypes.Node, jctx *schedulercontext.J
return matches, reason, err
}

// We are using total resource here, as it is an optimistic initial check
// Ideally we'd use allocatable, however allocatable could be quite dynamic and have an adverse impact on the SubmitCheck
matches, reason = resourceRequirementsMet(node.GetTotalResources(), jctx.KubernetesResourceRequirements)
if !matches {
return matches, reason, nil
Expand Down
4 changes: 4 additions & 0 deletions internal/scheduler/nodedb/nodematching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ func makeTestNodeTaintsLabels(taints []v1.Taint, labels map[string]string) *inte
"pool",
taints,
labels,
false,
internaltypes.ResourceList{},
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
Expand All @@ -685,6 +687,8 @@ func makeTestNodeResources(t *testing.T, allocatableByPriority map[int32]interna
"pool",
[]v1.Taint{},
map[string]string{},
false,
totalResources,
totalResources,
nil,
allocatableByPriority,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2498,8 +2498,10 @@ func testNodeWithTaints(node *internaltypes.Node, taints []v1.Taint) *internalty
node.GetPool(),
taints,
node.GetLabels(),
false,
node.GetTotalResources(),
nil,
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
node.AllocatedByQueue,
node.AllocatedByJobId,
Expand Down
8 changes: 8 additions & 0 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,14 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, currentPoolJo
}

for _, node := range nodes {
if node.IsUnschedulable() && len(jobsByNodeId[node.GetId()]) == 0 {
// Don't add nodes that cannot be scheduled on into the nodedb
// - For efficiency
// - So the resource of the node is not counted for fairshare
// NOTE - Unschedulable nodes with jobs already scheduled on to them still need to be added to the nodeDb,
// so the jobs can be rescheduled onto them if evicted
continue
}
if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobsByNodeId[node.GetId()], node); err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ func WithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltypes.
node.GetPool(),
node.GetTaints(),
node.GetLabels(),
false,
node.GetTotalResources(),
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
node.AllocatedByQueue,
Expand All @@ -329,7 +331,9 @@ func WithIdNodes(nodeId string, nodes []*internaltypes.Node) []*internaltypes.No
node.GetPool(),
node.GetTaints(),
node.GetLabels(),
false,
node.GetTotalResources(),
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
node.AllocatedByQueue,
Expand All @@ -350,7 +354,9 @@ func WithIndexNode(idx uint64, node *internaltypes.Node) *internaltypes.Node {
node.GetPool(),
node.GetTaints(),
node.GetLabels(),
false,
node.GetTotalResources(),
node.GetAllocatableResources(),
node.GetUnallocatableResources(),
node.AllocatableByPriority,
node.AllocatedByQueue,
Expand Down Expand Up @@ -743,6 +749,8 @@ func TestSimpleNode(id string) *internaltypes.Node {
"",
nil,
nil,
false,
internaltypes.ResourceList{},
internaltypes.ResourceList{},
nil,
nil,
Expand All @@ -766,6 +774,7 @@ func TestNode(priorities []int32, resources map[string]resource.Quantity) *inter
schedulerconfiguration.NodeIdLabel: id,
},
rl,
rl,
map[int32]internaltypes.ResourceList{},
internaltypes.NewAllocatableByPriorityAndResourceType(priorities, rl))
}
Expand Down

0 comments on commit 6569e24

Please sign in to comment.