From e0b8bd23906591f7d7d6e711739ac849d7bc4a96 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Sun, 13 Oct 2024 10:09:28 -0400 Subject: [PATCH] rac2: reduce how long rangeController.mu is held Also, never hold it and then acquire replicaSendStream.mu, since the latter is held when doing IO. Additionally, changed some Lock calls to RLock. Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 68 +++++++++++-------- .../rac2/range_controller_test.go | 12 +--- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 3f5d89a2d268..68b982686ee2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -538,7 +538,9 @@ type rangeController struct { mu struct { // All the fields in this struct are modified while holding raftMu and - // this mutex. So readers can hold either mutex. + // this mutex. So readers can hold either mutex. This mutex must be + // released quickly, since it is needed by rangeController.WaitForEval + // which can have high concurrency. syncutil.RWMutex // State for waiters. When anything in voterSets or nonVoterSets changes, @@ -572,7 +574,8 @@ type rangeController struct { // to call into the replicaSendStreams that have asked to be scheduled. replicas map[roachpb.ReplicaID]struct{} } - entryFCStateScratch []entryFCState + entryFCStateScratch []entryFCState + lastSendQueueStatsScratch RangeSendQueueStats } // voterStateForWaiters informs whether WaitForEval is required to wait for @@ -1090,7 +1093,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // It may have been longer than the sendQueueStatRefreshInterval since we // last updated the send queue stats. Maybe update them now. - rc.maybeUpdateSendQueueStats() + rc.maybeUpdateSendQueueStatsRaftMuLocked() return nil } @@ -1373,13 +1376,15 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { if log.V(1) { log.VInfof(ctx, 1, "r%v closing range controller", rc.opts.RangeID) } - rc.mu.Lock() - defer rc.mu.Unlock() + func() { + rc.mu.Lock() + defer rc.mu.Unlock() - rc.mu.voterSets = nil - rc.mu.nonVoterSet = nil - close(rc.mu.waiterSetRefreshCh) - rc.mu.waiterSetRefreshCh = nil + rc.mu.voterSets = nil + rc.mu.nonVoterSet = nil + close(rc.mu.waiterSetRefreshCh) + rc.mu.waiterSetRefreshCh = nil + }() // Return any tracked token deductions, as we don't expect to receive more // AdmittedVector updates. for _, rs := range rc.replicaMap { @@ -1430,8 +1435,8 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) { panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal)) } statsToSet.Clear() - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() statsToSet.internal = slices.Grow(statsToSet.internal, len(rc.mu.lastSendQueueStats)) // We will update the cheaper stats to ensure they are up-to-date. For the @@ -1459,23 +1464,28 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) { } } -func (rc *rangeController) maybeUpdateSendQueueStats() { +func (rc *rangeController) maybeUpdateSendQueueStatsRaftMuLocked() { now := rc.opts.Clock.PhysicalTime() - rc.mu.Lock() - defer rc.mu.Unlock() - - if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add( - sendQueueStatRefreshInterval); now.After(nextUpdateTime) { - // We should update the stats, it has been longer than - // sendQueueStatRefreshInterval. - rc.updateSendQueueStatsRaftMuRCLocked(now) + updateStats := false + func() { + rc.mu.Lock() + defer rc.mu.Unlock() + if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add( + sendQueueStatRefreshInterval); now.After(nextUpdateTime) { + // We should update the stats, it has been longer than + // sendQueueStatRefreshInterval. + updateStats = true + } + }() + if !updateStats { + // Common case. + return } + rc.updateSendQueueStatsRaftMuLocked(now) } -func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) { - rc.mu.AssertHeld() - - rc.mu.lastSendQueueStats.Clear() +func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) { + rc.lastSendQueueStatsScratch.Clear() for _, rs := range rc.replicaMap { stats := ReplicaSendQueueStats{ ReplicaID: rs.desc.ReplicaID, @@ -1488,8 +1498,12 @@ func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) { stats.SendQueueCount = rs.sendStream.queueLengthLocked() }() } - rc.mu.lastSendQueueStats.Set(stats) + rc.lastSendQueueStatsScratch.Set(stats) } + rc.mu.Lock() + defer rc.mu.Unlock() + rc.mu.lastSendQueueStats, rc.lastSendQueueStatsScratch = + rc.lastSendQueueStatsScratch, rc.mu.lastSendQueueStats rc.mu.lastSendQueueStatRefresh = now } @@ -1620,8 +1634,8 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { replicas := map[roachpb.ReplicaID]stateForWaiters{} func() { var leaderID, leaseholderID roachpb.ReplicaID - rc.mu.Lock() - defer rc.mu.Unlock() + rc.mu.RLock() + defer rc.mu.RUnlock() for _, vs := range rc.mu.voterSets { for _, v := range vs { if v.isLeader { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 5fa146f87da5..270fa0fd9a53 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -1323,11 +1323,7 @@ func TestRangeController(t *testing.T) { var sizeCount, sizeBytes int64 for _, rcState := range state.ranges { stats := RangeSendStreamStats{} - func() { - rcState.rc.mu.Lock() - defer rcState.rc.mu.Unlock() - rcState.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now()) - }() + rcState.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now()) rcState.rc.SendStreamStats(&stats) count, bytes := stats.SumSendQueues() sizeCount += count @@ -1371,11 +1367,7 @@ func TestRangeController(t *testing.T) { r := state.ranges[roachpb.RangeID(rangeID)] if refresh { - func() { - r.rc.mu.Lock() - defer r.rc.mu.Unlock() - r.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now()) - }() + r.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now()) } stats := RangeSendStreamStats{} r.rc.SendStreamStats(&stats)