Skip to content

Commit

Permalink
Scheduler: refactor nodedb stats (#4165)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Smith <[email protected]>
  • Loading branch information
robertdavidsmith authored Jan 27, 2025
1 parent 1ee7196 commit 6cfde08
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 23 deletions.
18 changes: 3 additions & 15 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"text/tabwriter"
"time"

Expand All @@ -27,9 +25,8 @@ import (

var empty struct{}

func (nodeDb *NodeDb) AddNodeToDb(node *internaltypes.Node) {
nodeDb.mu.Lock()
defer nodeDb.mu.Unlock()
func (nodeDb *NodeDb) addNodeToStats(node *internaltypes.Node) {
nodeDb.numNodes += 1
for key := range nodeDb.indexedNodeLabels {
if value, ok := node.GetLabelValue(key); ok {
nodeDb.indexedNodeLabelValues[key][value] = empty
Expand All @@ -42,9 +39,7 @@ func (nodeDb *NodeDb) AddNodeToDb(node *internaltypes.Node) {
}

func (nodeDb *NodeDb) CreateAndInsertWithJobDbJobsWithTxn(txn *memdb.Txn, jobs []*jobdb.Job, entry *internaltypes.Node) error {
_ = atomic.AddUint64(&nodeDb.numNodes, 1)

nodeDb.AddNodeToDb(entry)
nodeDb.addNodeToStats(entry)

for _, job := range jobs {
priority, ok := job.ScheduledAtPriority()
Expand Down Expand Up @@ -122,9 +117,6 @@ type NodeDb struct {
// If not set, no labels are indexed.
indexedNodeLabels map[string]bool

// Mutex for the remaining fields of this struct, which are mutated after initialization.
mu sync.Mutex

// Map from indexed label names to the set of values that label takes across all nodes in the NodeDb.
indexedNodeLabelValues map[string]map[string]struct{}
// Total number of nodes in the db.
Expand Down Expand Up @@ -305,8 +297,6 @@ func (nodeDb *NodeDb) NumNodes() int {
}

func (nodeDb *NodeDb) TotalKubernetesResources() internaltypes.ResourceList {
nodeDb.mu.Lock()
defer nodeDb.mu.Unlock()
return nodeDb.totalResources
}

Expand Down Expand Up @@ -1126,8 +1116,6 @@ func nodeIndexName(keyIndex int) string {
// using a cache to avoid allocating new strings when possible.
func (nodeDb *NodeDb) stringFromPodRequirementsNotMetReason(reason PodRequirementsNotMetReason) string {
h := reason.Sum64()
nodeDb.mu.Lock()
defer nodeDb.mu.Unlock()
if s, ok := nodeDb.podRequirementsNotMetReasonStringCache[h]; ok {
return s
} else {
Expand Down
14 changes: 6 additions & 8 deletions internal/scheduler/nodedb/nodeiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,15 @@ func TestNodeTypeIterator(t *testing.T) {
require.NoError(t, err)

entries := make([]*internaltypes.Node, len(tc.nodes))
txn := nodeDb.Txn(true)
for i, node := range tc.nodes {
// Set monotonically increasing node IDs to ensure nodes appear in predictable order.
newNodeId := fmt.Sprintf("%d", i)
entry := testfixtures.WithIdNodes(newNodeId, []*internaltypes.Node{node})[0]

nodeDb.AddNodeToDb(entry)
require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, entry))
entries[i] = entry
}
require.NoError(t, nodeDb.UpsertMany(entries))
txn.Commit()

indexedResourceRequests := make([]int64, len(testfixtures.TestResources))

Expand Down Expand Up @@ -639,20 +639,18 @@ func TestNodeTypesIterator(t *testing.T) {
nodeDb, err := newNodeDbWithNodes(nil)
require.NoError(t, err)

txn := nodeDb.Txn(true)
entries := make([]*internaltypes.Node, len(tc.nodes))
for i, node := range tc.nodes {
// Set monotonically increasing node IDs to ensure nodes appear in predictable order.
nodeId := fmt.Sprintf("%d", i)
entry := testfixtures.WithIdNodes(nodeId, []*internaltypes.Node{node})[0]
entry = testfixtures.WithIndexNode(uint64(i), entry)

require.NoError(t, err)

nodeDb.AddNodeToDb(entry)

require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, entry))
entries[i] = entry
}
require.NoError(t, nodeDb.UpsertMany(entries))
txn.Commit()

rr := tc.resourceRequests

Expand Down

0 comments on commit 6cfde08

Please sign in to comment.