From 6c1a7da75966b008b5abdda9c4b5b1dface632a9 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Fri, 24 Jan 2025 15:01:28 +0000 Subject: [PATCH 1/2] Scheduler: refactor nodedb stats Signed-off-by: Robert Smith --- internal/scheduler/nodedb/nodedb.go | 17 +++-------------- internal/scheduler/nodedb/nodeiteration_test.go | 14 ++++++-------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index ba74a66fc8d..1dea11811e3 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -4,8 +4,6 @@ import ( "fmt" "math" "strings" - "sync" - "sync/atomic" "text/tabwriter" "time" @@ -27,9 +25,8 @@ import ( var empty struct{} -func (nodeDb *NodeDb) AddNodeToDb(node *internaltypes.Node) { - nodeDb.mu.Lock() - defer nodeDb.mu.Unlock() +func (nodeDb *NodeDb) addNodeToStats(node *internaltypes.Node) { + nodeDb.numNodes += 1 for key := range nodeDb.indexedNodeLabels { if value, ok := node.GetLabelValue(key); ok { nodeDb.indexedNodeLabelValues[key][value] = empty @@ -42,9 +39,8 @@ func (nodeDb *NodeDb) AddNodeToDb(node *internaltypes.Node) { } func (nodeDb *NodeDb) CreateAndInsertWithJobDbJobsWithTxn(txn *memdb.Txn, jobs []*jobdb.Job, entry *internaltypes.Node) error { - _ = atomic.AddUint64(&nodeDb.numNodes, 1) - nodeDb.AddNodeToDb(entry) + nodeDb.addNodeToStats(entry) for _, job := range jobs { priority, ok := job.ScheduledAtPriority() @@ -122,9 +118,6 @@ type NodeDb struct { // If not set, no labels are indexed. indexedNodeLabels map[string]bool - // Mutex for the remaining fields of this struct, which are mutated after initialization. - mu sync.Mutex - // Map from indexed label names to the set of values that label takes across all nodes in the NodeDb. indexedNodeLabelValues map[string]map[string]struct{} // Total number of nodes in the db. @@ -305,8 +298,6 @@ func (nodeDb *NodeDb) NumNodes() int { } func (nodeDb *NodeDb) TotalKubernetesResources() internaltypes.ResourceList { - nodeDb.mu.Lock() - defer nodeDb.mu.Unlock() return nodeDb.totalResources } @@ -1126,8 +1117,6 @@ func nodeIndexName(keyIndex int) string { // using a cache to avoid allocating new strings when possible. func (nodeDb *NodeDb) stringFromPodRequirementsNotMetReason(reason PodRequirementsNotMetReason) string { h := reason.Sum64() - nodeDb.mu.Lock() - defer nodeDb.mu.Unlock() if s, ok := nodeDb.podRequirementsNotMetReasonStringCache[h]; ok { return s } else { diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index f70b1dce23a..b0f1f5063af 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -312,15 +312,15 @@ func TestNodeTypeIterator(t *testing.T) { require.NoError(t, err) entries := make([]*internaltypes.Node, len(tc.nodes)) + txn := nodeDb.Txn(true) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. newNodeId := fmt.Sprintf("%d", i) entry := testfixtures.WithIdNodes(newNodeId, []*internaltypes.Node{node})[0] - - nodeDb.AddNodeToDb(entry) + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, entry)) entries[i] = entry } - require.NoError(t, nodeDb.UpsertMany(entries)) + txn.Commit() indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) @@ -639,20 +639,18 @@ func TestNodeTypesIterator(t *testing.T) { nodeDb, err := newNodeDbWithNodes(nil) require.NoError(t, err) + txn := nodeDb.Txn(true) entries := make([]*internaltypes.Node, len(tc.nodes)) for i, node := range tc.nodes { // Set monotonically increasing node IDs to ensure nodes appear in predictable order. nodeId := fmt.Sprintf("%d", i) entry := testfixtures.WithIdNodes(nodeId, []*internaltypes.Node{node})[0] entry = testfixtures.WithIndexNode(uint64(i), entry) - require.NoError(t, err) - - nodeDb.AddNodeToDb(entry) - + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, entry)) entries[i] = entry } - require.NoError(t, nodeDb.UpsertMany(entries)) + txn.Commit() rr := tc.resourceRequests From 764acd3b7c6c93d452fea7ded130c0d2004a5745 Mon Sep 17 00:00:00 2001 From: Robert Smith Date: Fri, 24 Jan 2025 15:08:09 +0000 Subject: [PATCH 2/2] gofumpt Signed-off-by: Robert Smith --- internal/scheduler/nodedb/nodedb.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 1dea11811e3..c8159253c17 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -39,7 +39,6 @@ func (nodeDb *NodeDb) addNodeToStats(node *internaltypes.Node) { } func (nodeDb *NodeDb) CreateAndInsertWithJobDbJobsWithTxn(txn *memdb.Txn, jobs []*jobdb.Job, entry *internaltypes.Node) error { - nodeDb.addNodeToStats(entry) for _, job := range jobs {