Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132478: roachprod: preserve error object in Get r=herkolategan a=DarrylWong

We need to keep the error object so our ssh flake detection works.

Fixes: #132434
Release note: none
Epic: none

132567: rac2: use the observed sizes when popping from the send-queue to appr… r=kvoli a=sumeerbhola

…oximate

We were using a fixed size of 500. We still do that for the first set of entries we pop. But for subsequent attempts we use the mean of what was observed.

Epic: CRDB-37515

Release note: None

Co-authored-by: DarrylWong <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Oct 14, 2024
3 parents 4ef3e64 + 12fa7da + 0149962 commit 79ef79a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 33 deletions.
58 changes: 45 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,13 +1853,11 @@ type replicaSendStream struct {
// LowPri.
originalEvalTokens [admissionpb.NumWorkClasses]kvflowcontrol.Tokens

// Approximate size stat for send-queue. For indices <
// nextRaftIndexInitial.
// entryTokensApproximator approximates the tokens needed per entry, for
// indices < nextRaftIndexInitial, in the send-queue.
//
// approxMeanSizeBytes is useful since it guides how many bytes to grab
// in deductedForScheduler.tokens. If each entry is 100 bytes, and half
// the entries are subject to AC, this should be ~50.
approxMeanSizeBytes kvflowcontrol.Tokens
// It guides how many bytes to grab in deductedForScheduler.tokens.
entryTokensApproximator entryTokensApproximator

// preciseSizeSum is the total size of entries subject to AC, and have
// an index >= nextRaftIndexInitial and >= indexToSend.
Expand Down Expand Up @@ -1952,9 +1950,6 @@ func (rs *replicaState) createReplicaSendStream(
rss.mu.nextRaftIndexInitial = nextRaftIndex
rss.mu.sendQueue.indexToSend = indexToSend
rss.mu.sendQueue.nextRaftIndex = nextRaftIndex
// TODO(sumeer): initialize based on recent appends seen by the
// RangeController.
rss.mu.sendQueue.approxMeanSizeBytes = 500
if mode == MsgAppPull && !rs.sendStream.isEmptySendQueueLocked() {
// NB: need to lock rss.mu since
// startAttemptingToEmptySendQueueViaWatcherLocked can hand a reference to
Expand Down Expand Up @@ -2538,6 +2533,8 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendLocked(
) {
rss.mu.AssertHeld()
var tokensNeeded kvflowcontrol.Tokens
var approximatedNumEntries int
var approximatedNumActualTokens kvflowcontrol.Tokens
for _, entry := range msg.Entries {
entryState := getEntryFCStateOrFatal(ctx, entry)
if entryState.id.index != rss.mu.sendQueue.indexToSend {
Expand All @@ -2549,18 +2546,27 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendLocked(
rss.mu.sendQueue.nextRaftIndex))
}
rss.mu.sendQueue.indexToSend++
isApproximatedEntry := entryState.id.index < rss.mu.nextRaftIndexInitial
if isApproximatedEntry {
approximatedNumEntries++
if entryState.usesFlowControl {
approximatedNumActualTokens += entryState.tokens
}
}
if entryState.usesFlowControl {
if entryState.id.index >= rss.mu.nextRaftIndexInitial {
if !isApproximatedEntry {
rss.mu.sendQueue.preciseSizeSum -= entryState.tokens
rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entryState.pri)] -=
entryState.tokens
}
// TODO(sumeer): use knowledge from entries < nextRaftIndexInitial to
// adjust approxMeanSizeBytes.
tokensNeeded += entryState.tokens
rss.mu.tracker.Track(ctx, entryState.id, raftpb.LowPri, entryState.tokens)
}
}
if approximatedNumEntries > 0 {
rss.mu.sendQueue.entryTokensApproximator.addStats(
approximatedNumEntries, approximatedNumActualTokens)
}
if !rss.mu.sendQueue.forceFlushScheduled {
// Subtract from already deducted tokens.
beforeDeductedTokens := rss.mu.sendQueue.deductedForSchedulerTokens
Expand Down Expand Up @@ -2716,7 +2722,8 @@ func (rss *replicaSendStream) approxQueueSizeLocked() kvflowcontrol.Tokens {
var size kvflowcontrol.Tokens
countWithApproxStats := int64(rss.mu.nextRaftIndexInitial) - int64(rss.mu.sendQueue.indexToSend)
if countWithApproxStats > 0 {
size = kvflowcontrol.Tokens(countWithApproxStats) * rss.mu.sendQueue.approxMeanSizeBytes
size = kvflowcontrol.Tokens(countWithApproxStats) *
rss.mu.sendQueue.entryTokensApproximator.meanTokensPerEntry()
}
size += rss.mu.sendQueue.preciseSizeSum
return size
Expand Down Expand Up @@ -2838,3 +2845,28 @@ func (cs connectedState) SafeFormat(w redact.SafePrinter, _ rune) {
panic(fmt.Sprintf("unknown connectedState %v", cs))
}
}

// entryTokensApproximator simply uses a mean of the entries observed to
// approximate the tokens needed. More sophisticated heuristics can be
// devised, if needed.
type entryTokensApproximator struct {
numEntries int
numTokens kvflowcontrol.Tokens
}

// REQUIRES: numEntries > 0.
func (a *entryTokensApproximator) addStats(numEntries int, numTokens kvflowcontrol.Tokens) {
a.numEntries += numEntries
a.numTokens += numTokens
}

func (a *entryTokensApproximator) meanTokensPerEntry() kvflowcontrol.Tokens {
if a.numEntries == 0 {
return 500
}
mean := a.numTokens / kvflowcontrol.Tokens(a.numEntries)
if mean == 0 {
mean = 1
}
return mean
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,20 @@ eval original in send-q: reg=+0 B ela=+0 B
++++
schedule-controller-event-count: 2

# Noop. Note that s3 has 0 send tokens.
adjust_tokens send
store_id=2 pri=HighPri tokens=0
----
t1/s1: eval reg=-3.0 MiB/+16 MiB ela=-3.0 MiB/+8.0 MiB
send reg=-3.0 MiB/+16 MiB ela=-3.0 MiB/+8.0 MiB
t1/s2: eval reg=+0 B/+16 MiB ela=-3.0 MiB/+8.0 MiB
send reg=+2.5 MiB/+16 MiB ela=-512 KiB/+8.0 MiB
t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB
send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB

# Provide 10KiB of elastic send tokens to s3. Only 4KiB are deducted, leaving
# 6KiB, since we are using a 4KiB estimate for entries in the send-queue that
# are < nextRaftIndexInitial.
# 6KiB, since we deduct a minimum of 4KiB of send tokens when trying to empty
# the send-queue.
adjust_tokens send
store_id=3 pri=HighPri tokens=10KiB
----
Expand Down Expand Up @@ -325,19 +336,21 @@ MsgApps sent in pull mode:
++++
schedule-controller-event-count: 3

# Add 1MiB of elastic send tokens, to return to 10KiB of tokens. 4KiB is again
# deducted, so 6KiB remains.
# Add 3MiB of elastic send tokens, to return to 2MiB+10KiB of tokens. The
# send-stream now has an estimate of 1MiB needed per entry, and deduct 1.1 x
# queue size = 1.1MiB.
adjust_tokens send
store_id=3 pri=HighPri tokens=1MiB
store_id=3 pri=HighPri tokens=3MiB
----
t1/s1: eval reg=-3.0 MiB/+16 MiB ela=-3.0 MiB/+8.0 MiB
send reg=-3.0 MiB/+16 MiB ela=-3.0 MiB/+8.0 MiB
t1/s2: eval reg=+0 B/+16 MiB ela=-3.0 MiB/+8.0 MiB
send reg=+2.5 MiB/+16 MiB ela=-512 KiB/+8.0 MiB
t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB
send reg=+1.0 MiB/+16 MiB ela=+6.0 KiB/+8.0 MiB
send reg=+3.0 MiB/+16 MiB ela=+932 KiB/+8.0 MiB

# Note the deducted value of 4KiB. Replica 3 is waiting for a scheduler event.
# Note the deducted value of 1.1MiB. Replica 3 is waiting for a scheduler
# event.
stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
Expand All @@ -356,7 +369,7 @@ LowPri:
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+0 B deducted=+4.0 KiB
(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+0 B deducted=+1.1 MiB
eval deducted: reg=+0 B ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
LowPri:
Expand All @@ -365,8 +378,8 @@ LowPri:
schedule-controller-event-count: 4
scheduled-replicas: 3

# Scheduler event. Replica 3 deducts 1MiB-4KiB without waiting since the entry
# was actually 1MiB. It no longer has a send-queue.
# Scheduler event. Replica 3 returns 0.1MiB since the entry was actually 1MiB.
# It no longer has a send-queue.
handle_scheduler_event range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
Expand Down Expand Up @@ -406,4 +419,4 @@ t1/s1: eval reg=-3.0 MiB/+16 MiB ela=-3.0 MiB/+8.0 MiB
t1/s2: eval reg=+0 B/+16 MiB ela=-3.0 MiB/+8.0 MiB
send reg=+2.5 MiB/+16 MiB ela=-512 KiB/+8.0 MiB
t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB
send reg=+1.0 MiB/+16 MiB ela=-1014 KiB/+8.0 MiB
send reg=+3.0 MiB/+16 MiB ela=+1.0 MiB/+8.0 MiB
21 changes: 12 additions & 9 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,13 +2250,10 @@ func (c *SyncedCluster) Put(
close(results)
}()

var errOnce sync.Once
var finalErr error
setErr := func(e error) {
if e != nil {
errOnce.Do(func() {
finalErr = e
})
if finalErr != nil {
finalErr = e
}
}

Expand Down Expand Up @@ -2597,24 +2594,30 @@ func (c *SyncedCluster) Get(
close(results)
}()

var finalErr error
setErr := func(e error) {
if finalErr != nil {
finalErr = e
}
}

defer spinner.Start()()
haveErr := false
for {
r, ok := <-results
if !ok {
break
}
if r.err != nil {
haveErr = true
setErr(r.err)
nodeTaskStatus(nodes[r.index], r.err.Error(), true)
} else {
nodeTaskStatus(nodes[r.index], "done", true)
}
}
spinner.MaybeLogTasks(l)

if haveErr {
return errors.Newf("get %s failed", src)
if finalErr != nil {
return errors.Wrapf(finalErr, "get %s failed", src)
}
return nil
}
Expand Down

0 comments on commit 79ef79a

Please sign in to comment.