Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132550: rac2: reduce how long rangeController.mu is held r=kvoli a=sumeerbhola

Also, never hold it and then acquire replicaSendStream.mu, since the latter is held when doing IO. Additionally, changed some Lock calls to RLock.

Epic: CRDB-37515

Release note: None

132568: rac2: use the real raft LogSlice r=sumeerbhola a=pav-kv

Related to #128779

Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Oct 14, 2024
3 parents 80041b8 + e0b8bd2 + 4479113 commit ea1419c
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 159 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"//pkg/roachpb",
Expand Down
75 changes: 44 additions & 31 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -144,7 +145,7 @@ type RaftInterface interface {
//
// If it returns true, all the entries in the slice are in the message, and
// Next is advanced to be equal to end.
SendMsgAppRaftMuLocked(replicaID roachpb.ReplicaID, slice RaftLogSlice) (raftpb.Message, bool)
SendMsgAppRaftMuLocked(replicaID roachpb.ReplicaID, slice raft.LogSlice) (raftpb.Message, bool)
}

// RaftLogSnapshot abstract raft.LogSnapshot.
Expand All @@ -163,11 +164,9 @@ type RaftLogSnapshot interface {
//
// NB: the [start, end) interval is different from RawNode.LogSlice which
// accepts an open-closed interval.
LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error)
LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error)
}

type RaftLogSlice interface{}

// RaftMsgAppMode specifies how Raft (at the leader) generates MsgApps. In
// both modes, Raft knows that (Match(i), Next(i)) are in-flight for a
// follower i.
Expand Down Expand Up @@ -538,7 +537,9 @@ type rangeController struct {

mu struct {
// All the fields in this struct are modified while holding raftMu and
// this mutex. So readers can hold either mutex.
// this mutex. So readers can hold either mutex. This mutex must be
// released quickly, since it is needed by rangeController.WaitForEval
// which can have high concurrency.
syncutil.RWMutex

// State for waiters. When anything in voterSets or nonVoterSets changes,
Expand Down Expand Up @@ -572,7 +573,8 @@ type rangeController struct {
// to call into the replicaSendStreams that have asked to be scheduled.
replicas map[roachpb.ReplicaID]struct{}
}
entryFCStateScratch []entryFCState
entryFCStateScratch []entryFCState
lastSendQueueStatsScratch RangeSendQueueStats
}

// voterStateForWaiters informs whether WaitForEval is required to wait for
Expand Down Expand Up @@ -1090,7 +1092,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra

// It may have been longer than the sendQueueStatRefreshInterval since we
// last updated the send queue stats. Maybe update them now.
rc.maybeUpdateSendQueueStats()
rc.maybeUpdateSendQueueStatsRaftMuLocked()

return nil
}
Expand Down Expand Up @@ -1373,13 +1375,15 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) {
if log.V(1) {
log.VInfof(ctx, 1, "r%v closing range controller", rc.opts.RangeID)
}
rc.mu.Lock()
defer rc.mu.Unlock()
func() {
rc.mu.Lock()
defer rc.mu.Unlock()

rc.mu.voterSets = nil
rc.mu.nonVoterSet = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil
rc.mu.voterSets = nil
rc.mu.nonVoterSet = nil
close(rc.mu.waiterSetRefreshCh)
rc.mu.waiterSetRefreshCh = nil
}()
// Return any tracked token deductions, as we don't expect to receive more
// AdmittedVector updates.
for _, rs := range rc.replicaMap {
Expand Down Expand Up @@ -1430,8 +1434,8 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal))
}
statsToSet.Clear()
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.RLock()
defer rc.mu.RUnlock()

statsToSet.internal = slices.Grow(statsToSet.internal, len(rc.mu.lastSendQueueStats))
// We will update the cheaper stats to ensure they are up-to-date. For the
Expand Down Expand Up @@ -1459,23 +1463,28 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
}
}

func (rc *rangeController) maybeUpdateSendQueueStats() {
func (rc *rangeController) maybeUpdateSendQueueStatsRaftMuLocked() {
now := rc.opts.Clock.PhysicalTime()
rc.mu.Lock()
defer rc.mu.Unlock()

if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
rc.updateSendQueueStatsRaftMuRCLocked(now)
updateStats := false
func() {
rc.mu.Lock()
defer rc.mu.Unlock()
if nextUpdateTime := rc.mu.lastSendQueueStatRefresh.Add(
sendQueueStatRefreshInterval); now.After(nextUpdateTime) {
// We should update the stats, it has been longer than
// sendQueueStatRefreshInterval.
updateStats = true
}
}()
if !updateStats {
// Common case.
return
}
rc.updateSendQueueStatsRaftMuLocked(now)
}

func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) {
rc.mu.AssertHeld()

rc.mu.lastSendQueueStats.Clear()
func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) {
rc.lastSendQueueStatsScratch.Clear()
for _, rs := range rc.replicaMap {
stats := ReplicaSendQueueStats{
ReplicaID: rs.desc.ReplicaID,
Expand All @@ -1488,8 +1497,12 @@ func (rc *rangeController) updateSendQueueStatsRaftMuRCLocked(now time.Time) {
stats.SendQueueCount = rs.sendStream.queueLengthLocked()
}()
}
rc.mu.lastSendQueueStats.Set(stats)
rc.lastSendQueueStatsScratch.Set(stats)
}
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.lastSendQueueStats, rc.lastSendQueueStatsScratch =
rc.lastSendQueueStatsScratch, rc.mu.lastSendQueueStats
rc.mu.lastSendQueueStatRefresh = now
}

Expand Down Expand Up @@ -1620,8 +1633,8 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) {
replicas := map[roachpb.ReplicaID]stateForWaiters{}
func() {
var leaderID, leaseholderID roachpb.ReplicaID
rc.mu.Lock()
defer rc.mu.Unlock()
rc.mu.RLock()
defer rc.mu.RUnlock()
for _, vs := range rc.mu.voterSets {
for _, v := range vs {
if v.isLeader {
Expand Down
39 changes: 18 additions & 21 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -422,31 +423,35 @@ func (r *testingRCRange) ScheduleControllerEvent(rangeID roachpb.RangeID) {
r.mu.scheduleControllerEventCount++
}

func (r *testingRCRange) LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error) {
func (r *testingRCRange) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
if start >= end {
panic("start >= end")
}
msg := raftpb.Message{
Type: raftpb.MsgApp,
}
var size uint64
var entries []raftpb.Entry
for _, entry := range r.entries {
if entry.Index >= start && entry.Index < end {
msg.Entries = append(msg.Entries, entry)
entries = append(entries, entry)
size += uint64(len(entry.Data))
if size > maxSize {
break
}
}
}
return msg, nil
// TODO(pav-kv): use a real LogSnapshot and construct a correct LogSlice.
return raft.MakeLogSlice(entries), nil
}

func (r *testingRCRange) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice RaftLogSlice,
replicaID roachpb.ReplicaID, ls raft.LogSlice,
) (raftpb.Message, bool) {
msg := slice.(raftpb.Message)
msg.To = raftpb.PeerID(replicaID)
// TODO(pav-kv): populate the message correctly.
// TODO(pav-kv): use the real RawNode instead of fakes.
msg := raftpb.Message{
Type: raftpb.MsgApp,
To: raftpb.PeerID(replicaID),
Entries: ls.Entries(),
}
r.mu.Lock()
defer r.mu.Unlock()
testR, ok := r.mu.r.replicaSet[replicaID]
Expand Down Expand Up @@ -1323,11 +1328,7 @@ func TestRangeController(t *testing.T) {
var sizeCount, sizeBytes int64
for _, rcState := range state.ranges {
stats := RangeSendStreamStats{}
func() {
rcState.rc.mu.Lock()
defer rcState.rc.mu.Unlock()
rcState.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now())
}()
rcState.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now())
rcState.rc.SendStreamStats(&stats)
count, bytes := stats.SumSendQueues()
sizeCount += count
Expand Down Expand Up @@ -1371,11 +1372,7 @@ func TestRangeController(t *testing.T) {

r := state.ranges[roachpb.RangeID(rangeID)]
if refresh {
func() {
r.rc.mu.Lock()
defer r.rc.mu.Unlock()
r.rc.updateSendQueueStatsRaftMuRCLocked(state.ts.Now())
}()
r.rc.updateSendQueueStatsRaftMuLocked(state.ts.Now())
}
stats := RangeSendStreamStats{}
r.rc.SendStreamStats(&stats)
Expand Down Expand Up @@ -1512,8 +1509,8 @@ func testingFirst(args ...interface{}) interface{} {

type testLogSnapshot struct{}

func (testLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error) {
return nil, nil
func (testLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
return raft.LogSlice{}, nil
}

func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/settings/cluster",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
}

func (rn *testRaftNode) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice rac2.RaftLogSlice,
_ roachpb.ReplicaID, _ raft.LogSlice,
) (raftpb.Message, bool) {
panic("unimplemented")
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {

// SendMsgAppRaftMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice rac2.RaftLogSlice,
replicaID roachpb.ReplicaID, ls raft.LogSlice,
) (raftpb.Message, bool) {
ls := slice.(raft.LogSlice)
rn.r.MuLock()
defer rn.r.MuUnlock()
return rn.RawNode.SendMsgApp(raftpb.PeerID(replicaID), ls)
Expand All @@ -79,6 +78,6 @@ type RaftLogSnapshot raft.LogSnapshot
var _ rac2.RaftLogSnapshot = RaftLogSnapshot{}

// LogSlice implements rac2.RaftLogSnapshot.
func (l RaftLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (rac2.RaftLogSlice, error) {
func (l RaftLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
return (raft.LogSnapshot(l)).LogSlice(start-1, end-1, maxSize)
}
2 changes: 1 addition & 1 deletion pkg/raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None)
app := logSlice{term: 1, entries: make([]pb.Entry, 0, len(peers))}
app := LogSlice{term: 1, entries: make([]pb.Entry, 0, len(peers))}
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
Expand Down
18 changes: 9 additions & 9 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type LogSnapshot struct {
// storage contains the stable log entries.
storage LogStorage
// unstable contains the unstable log entries.
unstable logSlice
unstable LogSlice
// logger gives access to logging errors.
logger raftlogger.Logger
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (l *raftLog) accTerm() uint64 {
// the log (so this log slice is insufficient to make our log consistent with
// the leader log), the slice is out of bounds (appending it would introduce a
// gap), or a.term is outdated.
func (l *raftLog) maybeAppend(a logSlice) bool {
func (l *raftLog) maybeAppend(a LogSlice) bool {
match, ok := l.match(a)
if !ok {
return false
Expand All @@ -179,7 +179,7 @@ func (l *raftLog) maybeAppend(a logSlice) bool {
//
// Returns false if the operation can not be done: entry a.prev does not match
// the lastEntryID of this log, or a.term is outdated.
func (l *raftLog) append(a logSlice) bool {
func (l *raftLog) append(a LogSlice) bool {
return l.unstable.append(a)
}

Expand All @@ -191,8 +191,8 @@ func (l *raftLog) append(a logSlice) bool {
//
// All the entries up to the returned index are already present in the log, and
// do not need to be rewritten. The caller can safely fast-forward the appended
// logSlice to this index.
func (l *raftLog) match(s logSlice) (uint64, bool) {
// LogSlice to this index.
func (l *raftLog) match(s LogSlice) (uint64, bool) {
if !l.matchTerm(s.prev) {
return 0, false
}
Expand Down Expand Up @@ -558,13 +558,13 @@ func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (LogSlice, error) {
if err != nil {
// The log is probably compacted at index > lo (err == ErrCompacted), or it
// can be a custom storage error.
return logSlice{}, err
return LogSlice{}, err
}
ents, err := l.slice(lo, hi, entryEncodingSize(maxSize))
if err != nil {
return logSlice{}, err
return LogSlice{}, err
}
return logSlice{
return LogSlice{
term: l.unstable.term,
prev: entryID{term: prevTerm, index: lo},
entries: ents,
Expand Down Expand Up @@ -659,7 +659,7 @@ func (l *raftLog) snap(storage LogStorage) LogSnapshot {
return LogSnapshot{
first: l.firstIndex(),
storage: storage,
unstable: l.unstable.logSlice,
unstable: l.unstable.LogSlice,
logger: l.logger,
}
}
Loading

0 comments on commit ea1419c

Please sign in to comment.