Skip to content

Commit

Permalink
rac2: reduce how long rangeController.mu is held
Browse files Browse the repository at this point in the history
Also, never hold it and then acquire replicaSendStream.mu, since the
latter is held when doing IO. Additionally, changed some Lock calls to
RLock.

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Oct 13, 2024
1 parent 30dbb17 commit e0b8bd2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
68 changes: 41 additions & 27 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ type rangeController struct {

mu struct {
// All the fields in this struct are modified while holding raftMu and
// this mutex. So readers can hold either mutex.
// this mutex. So readers can hold either mutex. This mutex must be
// released quickly, since it is needed by rangeController.WaitForEval
// which can have high concurrency.
syncutil.RWMutex

// State for waiters. When anything in voterSets or nonVoterSets changes,
Expand Down Expand Up @@ -572,7 +574,8 @@ type rangeController struct {
// to call into the replicaSendStreams that have asked to be scheduled.
replicas map[roachpb.ReplicaID]struct{}
}
entryFCStateScratch []entryFCState
entryFCStateScratch []entryFCState
lastSendQueueStatsScratch RangeSendQueueStats
}

// voterStateForWaiters informs whether WaitForEval is required to wait for
Expand Down Expand Up @@ -1090,7 +1093,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra

// It may have been longer than the sendQueueStatRefreshInterval since we
// last updated the send queue stats. Maybe update them now.
rc.maybeUpdateSendQueueStats()
rc.maybeUpdateSendQueueStatsRaftMuLocked()

return nil
}
Expand Down Expand Up @@ -1373,13 +1376,15 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
if log.V(1) {
log.VInfof(ctx, 1, "r%v closing range controller", rc.opts.RangeID)
}
rc.mu.Lock()
defer rc.mu.Unlock()
func() {
rc.mu.Lock()
defer rc.mu.Unlock()

rc.mu.voterSets = nil
rc.mu.nonVoterSet = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil
rc.mu.voterSets = nil
rc.mu.nonVoterSet = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil
}()
// Return any tracked token deductions, as we don't expect to receive more
// AdmittedVector updates.
for _, rs := range rc.replicaMap {
Expand Down Expand Up @@ -1430,8 +1435,8 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal))
}
statsToSet.Clear()
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.RLock()
defer rc.mu.RUnlock()

statsToSet.internal = slices.Grow(statsToSet.internal, len(rc.mu.lastSendQueueStats))
// We will update the cheaper stats to ensure they are up-to-date. For the
Expand Down Expand Up @@ -1459,23 +1464,28 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
}
}

func (rc *rangeController) maybeUpdateSendQueueStats() {
func (rc *rangeController) maybeUpdateSendQueueStatsRaftMuLocked() {
now := rc.opts.Clock.PhysicalTime()
rc.mu.Lock()
defer rc.mu.Unlock()

if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
rc.updateSendQueueStatsRaftMuRCLocked(now)
updateStats := false
func() {
rc.mu.Lock()
defer rc.mu.Unlock()
if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
updateStats = true
}
}()
if !updateStats {
// Common case.
return
}
rc.updateSendQueueStatsRaftMuLocked(now)
}

func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) {
rc.mu.AssertHeld()

rc.mu.lastSendQueueStats.Clear()
func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) {
rc.lastSendQueueStatsScratch.Clear()
for _, rs := range rc.replicaMap {
stats := ReplicaSendQueueStats{
ReplicaID: rs.desc.ReplicaID,
Expand All @@ -1488,8 +1498,12 @@ func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) {
stats.SendQueueCount = rs.sendStream.queueLengthLocked()
}()
}
rc.mu.lastSendQueueStats.Set(stats)
rc.lastSendQueueStatsScratch.Set(stats)
}
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.lastSendQueueStats, rc.lastSendQueueStatsScratch =
rc.lastSendQueueStatsScratch, rc.mu.lastSendQueueStats
rc.mu.lastSendQueueStatRefresh = now
}

Expand Down Expand Up @@ -1620,8 +1634,8 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) {
replicas := map[roachpb.ReplicaID]stateForWaiters{}
func() {
var leaderID, leaseholderID roachpb.ReplicaID
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.RLock()
defer rc.mu.RUnlock()
for _, vs := range rc.mu.voterSets {
for _, v := range vs {
if v.isLeader {
Expand Down
12 changes: 2 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,11 +1323,7 @@ func TestRangeController(t *testing.T) {
var sizeCount, sizeBytes int64
for _, rcState := range state.ranges {
stats := RangeSendStreamStats{}
func() {
rcState.rc.mu.Lock()
defer rcState.rc.mu.Unlock()
rcState.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now())
}()
rcState.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now())
rcState.rc.SendStreamStats(&stats)
count, bytes := stats.SumSendQueues()
sizeCount += count
Expand Down Expand Up @@ -1371,11 +1367,7 @@ func TestRangeController(t *testing.T) {

r := state.ranges[roachpb.RangeID(rangeID)]
if refresh {
func() {
r.rc.mu.Lock()
defer r.rc.mu.Unlock()
r.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now())
}()
r.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now())
}
stats := RangeSendStreamStats{}
r.rc.SendStreamStats(&stats)
Expand Down

0 comments on commit e0b8bd2

Please sign in to comment.