diff --git a/pkg/cmd/roachtest/roachtestutil/disk_stall.go b/pkg/cmd/roachtest/roachtestutil/disk_stall.go index 79951b359e76..d68f9df617bd 100644 --- a/pkg/cmd/roachtest/roachtestutil/disk_stall.go +++ b/pkg/cmd/roachtest/roachtestutil/disk_stall.go @@ -23,11 +23,24 @@ type DiskStaller interface { Setup(ctx context.Context) Cleanup(ctx context.Context) Stall(ctx context.Context, nodes option.NodeListOption) + Slow(ctx context.Context, nodes option.NodeListOption, bytesPerSecond int) Unstall(ctx context.Context, nodes option.NodeListOption) DataDir() string LogDir() string } +type NoopDiskStaller struct{} + +var _ DiskStaller = NoopDiskStaller{} + +func (n NoopDiskStaller) Cleanup(ctx context.Context) {} +func (n NoopDiskStaller) DataDir() string { return "{store-dir}" } +func (n NoopDiskStaller) LogDir() string { return "logs" } +func (n NoopDiskStaller) Setup(ctx context.Context) {} +func (n NoopDiskStaller) Slow(_ context.Context, _ option.NodeListOption, _ int) {} +func (n NoopDiskStaller) Stall(_ context.Context, _ option.NodeListOption) {} +func (n NoopDiskStaller) Unstall(_ context.Context, _ option.NodeListOption) {} + type Fataler interface { Fatal(args ...interface{}) Fatalf(format string, args ...interface{}) @@ -68,15 +81,20 @@ func (s *cgroupDiskStaller) Setup(ctx context.Context) { func (s *cgroupDiskStaller) Cleanup(ctx context.Context) {} func (s *cgroupDiskStaller) Stall(ctx context.Context, nodes option.NodeListOption) { + // NB: I don't understand why, but attempting to set a bytesPerSecond={0,1} + // results in Invalid argument from the io.max cgroupv2 API. + s.Slow(ctx, nodes, 4) +} + +func (s *cgroupDiskStaller) Slow( + ctx context.Context, nodes option.NodeListOption, bytesPerSecond int, +) { // Shuffle the order of read and write stall initiation. rand.Shuffle(len(s.readOrWrite), func(i, j int) { s.readOrWrite[i], s.readOrWrite[j] = s.readOrWrite[j], s.readOrWrite[i] }) for _, rw := range s.readOrWrite { - // NB: I don't understand why, but attempting to set a - // bytesPerSecond={0,1} results in Invalid argument from the io.max - // cgroupv2 API. - if err := s.setThroughput(ctx, nodes, rw, throughput{limited: true, bytesPerSecond: 4}); err != nil { + if err := s.setThroughput(ctx, nodes, rw, throughput{limited: true, bytesPerSecond: bytesPerSecond}); err != nil { s.f.Fatal(err) } } @@ -225,6 +243,13 @@ func (s *dmsetupDiskStaller) Stall(ctx context.Context, nodes option.NodeListOpt s.c.Run(ctx, option.WithNodes(nodes), `sudo dmsetup suspend --noflush --nolockfs data1`) } +func (s *dmsetupDiskStaller) Slow( + ctx context.Context, nodes option.NodeListOption, bytesPerSecond int, +) { + // TODO(baptist): Consider https://github.com/kawamuray/ddi. + s.f.Fatal("Slow is not supported for dmsetupDiskStaller") +} + func (s *dmsetupDiskStaller) Unstall(ctx context.Context, nodes option.NodeListOption) { s.c.Run(ctx, option.WithNodes(nodes), `sudo dmsetup resume data1`) } diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index f33c8fcba3a8..695983a1e286 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand" "path/filepath" "reflect" "sort" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/install" @@ -82,11 +84,9 @@ type variations struct { splits int numNodes int numWorkloadNodes int - partitionSite bool vcpu int disks int leaseType registry.LeaseType - cleanRestart bool perturbation perturbation workload workloadType acceptableChange float64 @@ -111,10 +111,10 @@ var leases = []registry.LeaseType{ func (v variations) String() string { return fmt.Sprintf("seed: %d, fillDuration: %s, maxBlockBytes: %d, perturbationDuration: %s, "+ "validationDuration: %s, ratioOfMax: %f, splits: %d, numNodes: %d, numWorkloadNodes: %d, "+ - "partitionSite: %t, vcpu: %d, disks: %d, leaseType: %s, cloud: %v", + "vcpu: %d, disks: %d, leaseType: %s, cloud: %v, perturbation: %s", v.seed, v.fillDuration, v.maxBlockBytes, v.perturbationDuration, v.validationDuration, v.ratioOfMax, v.splits, v.numNodes, v.numWorkloadNodes, - v.partitionSite, v.vcpu, v.disks, v.leaseType, v.cloud) + v.vcpu, v.disks, v.leaseType, v.cloud, v.perturbation) } // Normally a single worker can handle 20-40 nodes. If we find this is @@ -141,12 +141,11 @@ func setupMetamorphic(p perturbation) variations { v.numWorkloadNodes = v.numNodes/numNodesPerWorker + 1 v.vcpu = numVCPUs[rng.Intn(len(numVCPUs))] v.disks = numDisks[rng.Intn(len(numDisks))] - v.partitionSite = rng.Intn(2) == 0 - v.cleanRestart = rng.Intn(2) == 0 v.cloud = cloudSets[rng.Intn(len(cloudSets))] // TODO(baptist): Temporarily disable the metamorphic tests on other clouds // as they have limitations on configurations that can run. v.cloud = registry.OnlyGCE + p.setupMetamorphic(rng) v.perturbation = p return v } @@ -160,14 +159,12 @@ func setupFull(p perturbation) variations { v.splits = 10000 v.numNodes = 12 v.numWorkloadNodes = v.numNodes/numNodesPerWorker + 1 - v.partitionSite = true v.vcpu = 16 v.disks = 2 v.fillDuration = 10 * time.Minute v.validationDuration = 5 * time.Minute v.perturbationDuration = 10 * time.Minute v.ratioOfMax = 0.5 - v.cleanRestart = true v.cloud = registry.OnlyGCE v.perturbation = p return v @@ -184,12 +181,10 @@ func setupDev(p perturbation) variations { v.numWorkloadNodes = 1 v.vcpu = 4 v.disks = 1 - v.partitionSite = true v.fillDuration = 20 * time.Second v.validationDuration = 10 * time.Second v.perturbationDuration = 30 * time.Second v.ratioOfMax = 0.5 - v.cleanRestart = true v.cloud = registry.AllClouds v.perturbation = p return v @@ -199,32 +194,44 @@ func registerLatencyTests(r registry.Registry) { // NB: If these tests fail because they are flaky, increase the numbers // until they pass. Additionally add the seed (from the log) that caused // them to fail as a comment in the test. - addMetamorphic(r, restart{}, math.Inf(1)) - addMetamorphic(r, partition{}, math.Inf(1)) + addMetamorphic(r, &restart{}, math.Inf(1)) + addMetamorphic(r, &partition{}, math.Inf(1)) addMetamorphic(r, addNode{}, 3.0) - addMetamorphic(r, decommission{}, 3.0) + addMetamorphic(r, &decommission{}, 3.0) addMetamorphic(r, backfill{}, 40.0) + addMetamorphic(r, &slowDisk{}, math.Inf(1)) // NB: If these tests fail, it likely signals a regression. Investigate the // history of the test on roachperf to see what changed. - addFull(r, restart{}, math.Inf(1)) - addFull(r, partition{}, math.Inf(1)) + addFull(r, &restart{cleanRestart: true}, math.Inf(1)) + addFull(r, &partition{partitionSite: true}, math.Inf(1)) addFull(r, addNode{}, 3.0) - addFull(r, decommission{}, 3.0) + addFull(r, &decommission{drain: true}, 3.0) addFull(r, backfill{}, 40.0) + addFull(r, &slowDisk{slowLiveness: true, walFailover: true}, math.Inf(1)) // NB: These tests will never fail and are not enabled, but they are useful // for development. - addDev(r, restart{}, math.Inf(1)) - addDev(r, partition{}, math.Inf(1)) + addDev(r, &restart{cleanRestart: true}, math.Inf(1)) + addDev(r, &partition{partitionSite: true}, math.Inf(1)) addDev(r, addNode{}, math.Inf(1)) - addDev(r, decommission{}, math.Inf(1)) + addDev(r, &decommission{drain: true}, math.Inf(1)) addDev(r, backfill{}, math.Inf(1)) + addDev(r, &slowDisk{slowLiveness: true, walFailover: true}, math.Inf(1)) } func (v variations) makeClusterSpec() spec.ClusterSpec { // NB: We use low memory to force non-cache disk reads earlier. - return spec.MakeClusterSpec(v.numNodes+v.numWorkloadNodes, spec.CPU(v.vcpu), spec.SSD(v.disks), spec.Mem(spec.Low)) + // TODO(baptist): Allow the non-disk tests to reuse the cluster. + return spec.MakeClusterSpec(v.numNodes+v.numWorkloadNodes, spec.CPU(v.vcpu), spec.SSD(v.disks), spec.Mem(spec.Low), spec.ReuseNone()) +} + +func (v variations) perturbationName() string { + t := reflect.TypeOf(v.perturbation) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Name() } func addMetamorphic(r registry.Registry, p perturbation, acceptableChange float64) { @@ -233,7 +240,7 @@ func addMetamorphic(r registry.Registry, p perturbation, acceptableChange float6 // TODO(baptist): Make the cloud be metamorphic for repeatable results with // a given seed. r.Add(registry.TestSpec{ - Name: fmt.Sprintf("perturbation/metamorphic/%s", reflect.TypeOf(p).Name()), + Name: fmt.Sprintf("perturbation/metamorphic/%s", v.perturbationName()), CompatibleClouds: v.cloud, Suites: registry.Suites(registry.Nightly), Owner: registry.OwnerKV, @@ -248,7 +255,7 @@ func addFull(r registry.Registry, p perturbation, acceptableChange float64) { v := setupFull(p) v.acceptableChange = acceptableChange r.Add(registry.TestSpec{ - Name: fmt.Sprintf("perturbation/full/%s", reflect.TypeOf(p).Name()), + Name: fmt.Sprintf("perturbation/full/%s", v.perturbationName()), CompatibleClouds: v.cloud, Suites: registry.Suites(registry.Nightly), Owner: registry.OwnerKV, @@ -263,7 +270,7 @@ func addDev(r registry.Registry, p perturbation, acceptableChange float64) { v := setupDev(p) v.acceptableChange = acceptableChange r.Add(registry.TestSpec{ - Name: fmt.Sprintf("perturbation/dev/%s", reflect.TypeOf(p).Name()), + Name: fmt.Sprintf("perturbation/dev/%s", v.perturbationName()), CompatibleClouds: v.cloud, Suites: registry.ManualOnly, Owner: registry.OwnerKV, @@ -274,6 +281,9 @@ func addDev(r registry.Registry, p perturbation, acceptableChange float64) { } type perturbation interface { + // setupMetamorphic is called at the start of the test to randomize the perturbation. + setupMetamorphic(rng *rand.Rand) + // startTargetNode is called for custom logic starting the target node(s). // Some of the perturbations need special logic for starting the target // node. @@ -297,6 +307,8 @@ type backfill struct{} var _ perturbation = backfill{} +func (b backfill) setupMetamorphic(rng *rand.Rand) {} + // startTargetNode starts the target node and creates the backfill table. func (b backfill) startTargetNode(ctx context.Context, t test.Test, v variations) { v.startNoBackup(ctx, t, v.targetNodes()) @@ -351,21 +363,90 @@ func (b backfill) endPerturbation(ctx context.Context, t test.Test, v variations return v.validationDuration } +type slowDisk struct { + // slowLiveness will place the liveness range on the slow node (may not be the slow disk). + slowLiveness bool + // walFailover will add add WAL failover to the slow node. + walFailover bool + staller roachtestutil.DiskStaller +} + +var _ perturbation = &slowDisk{} + +func (s *slowDisk) setupMetamorphic(rng *rand.Rand) { + s.slowLiveness = rng.Intn(2) == 0 + s.walFailover = rng.Intn(2) == 0 +} + +func (s *slowDisk) String() string { + return fmt.Sprintf("slowDisk{slowLiveness: %t, walFailover: %t}", s.slowLiveness, s.walFailover) +} + +// startTargetNode implements perturbation. +func (s *slowDisk) startTargetNode(ctx context.Context, t test.Test, v variations) { + extraArgs := []string{} + if s.walFailover && v.disks > 1 { + extraArgs = append(extraArgs, "--wal-failover=among-stores") + } + v.startNoBackup(ctx, t, v.targetNodes(), extraArgs...) + + if s.slowLiveness { + // TODO(baptist): Handle multiple target nodes. + target := v.targetNodes()[0] + db := v.Conn(ctx, t.L(), 1) + defer db.Close() + cmd := fmt.Sprintf(`ALTER RANGE liveness CONFIGURE ZONE USING CONSTRAINTS='{"+node%d":1}', lease_preferences='[[+node%d]]'`, target, target) + _, err := db.ExecContext(ctx, cmd) + require.NoError(t, err) + } + + if v.IsLocal() { + s.staller = roachtestutil.NoopDiskStaller{} + } else { + s.staller = roachtestutil.MakeCgroupDiskStaller(t, v, false /* readsToo */, false /* logsToo */) + } +} + +// startPerturbation implements perturbation. +func (s *slowDisk) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { + // TODO(baptist): Do this more dynamically? + s.staller.Slow(ctx, v.targetNodes(), 20_000_000) + waitDuration(ctx, v.validationDuration) + return v.validationDuration +} + +// endPerturbation implements perturbation. +func (s *slowDisk) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { + s.staller.Unstall(ctx, v.targetNodes()) + waitDuration(ctx, v.validationDuration) + return v.validationDuration +} + // restart will gracefully stop and then restart a node after a custom duration. -type restart struct{} +type restart struct { + cleanRestart bool +} + +var _ perturbation = &restart{} -var _ perturbation = restart{} +func (r *restart) String() string { + return fmt.Sprintf("restart{cleanRestart: %t}", r.cleanRestart) +} + +func (r *restart) setupMetamorphic(rng *rand.Rand) { + r.cleanRestart = rng.Intn(2) == 0 +} -func (r restart) startTargetNode(ctx context.Context, t test.Test, v variations) { +func (r *restart) startTargetNode(ctx context.Context, t test.Test, v variations) { v.startNoBackup(ctx, t, v.targetNodes()) } // startPerturbation stops the target node with a graceful shutdown. -func (r restart) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { +func (r *restart) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() gracefulOpts := option.DefaultStopOpts() // SIGTERM for clean shutdown - if v.cleanRestart { + if r.cleanRestart { gracefulOpts.RoachprodOpts.Sig = 15 } else { gracefulOpts.RoachprodOpts.Sig = 9 @@ -373,7 +454,7 @@ func (r restart) startPerturbation(ctx context.Context, t test.Test, v variation gracefulOpts.RoachprodOpts.Wait = true v.Stop(ctx, t.L(), gracefulOpts, v.targetNodes()) waitDuration(ctx, v.perturbationDuration) - if v.cleanRestart { + if r.cleanRestart { return timeutil.Since(startTime) } // If it is not a clean restart, we ignore the first 10 seconds to allow for lease movement. @@ -388,10 +469,10 @@ func (r restart) endPerturbation(ctx context.Context, t test.Test, v variations) return timeutil.Since(startTime) } -// partition nodes in the first region. -func (v variations) withPartitionedNodes(c cluster.Cluster) install.RunOptions { +// partition either the first node or all nodes in the first region. +func (v variations) withPartitionedNodes(c cluster.Cluster, partitionSite bool) install.RunOptions { numPartitionNodes := 1 - if v.partitionSite { + if partitionSite { numPartitionNodes = v.numNodes / NUM_REGIONS } return option.WithNodes(c.Range(1, numPartitionNodes)) @@ -399,22 +480,34 @@ func (v variations) withPartitionedNodes(c cluster.Cluster) install.RunOptions { // partition will partition the target node from either one other node or all // other nodes in a different AZ. -type partition struct{} +type partition struct { + partitionSite bool +} + +var _ perturbation = &partition{} + +func (p *partition) String() string { + return fmt.Sprintf("partition{partitionSite: %t}", p.partitionSite) +} -var _ perturbation = partition{} +func (p *partition) setupMetamorphic(rng *rand.Rand) { + p.partitionSite = rng.Intn(2) == 0 +} -func (p partition) startTargetNode(ctx context.Context, t test.Test, v variations) { +func (p *partition) startTargetNode(ctx context.Context, t test.Test, v variations) { v.startNoBackup(ctx, t, v.targetNodes()) } -func (p partition) startPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { +func (p *partition) startPerturbation( + ctx context.Context, t test.Test, v variations, +) time.Duration { targetIPs, err := v.InternalIP(ctx, t.L(), v.targetNodes()) require.NoError(t, err) if !v.IsLocal() { v.Run( ctx, - v.withPartitionedNodes(v), + v.withPartitionedNodes(v, p.partitionSite), fmt.Sprintf( `sudo iptables -A INPUT -p tcp -s %s -j DROP`, targetIPs[0])) } @@ -424,10 +517,10 @@ func (p partition) startPerturbation(ctx context.Context, t test.Test, v variati return v.perturbationDuration - 20*time.Second } -func (p partition) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { +func (p *partition) endPerturbation(ctx context.Context, t test.Test, v variations) time.Duration { startTime := timeutil.Now() if !v.IsLocal() { - v.Run(ctx, v.withPartitionedNodes(v), `sudo iptables -F`) + v.Run(ctx, v.withPartitionedNodes(v, p.partitionSite), `sudo iptables -F`) } waitDuration(ctx, v.validationDuration) return timeutil.Since(startTime) @@ -439,6 +532,8 @@ type addNode struct{} var _ perturbation = addNode{} +func (addNode) setupMetamorphic(rng *rand.Rand) {} + func (addNode) startTargetNode(ctx context.Context, t test.Test, v variations) { } @@ -461,33 +556,44 @@ func (addNode) endPerturbation(ctx context.Context, t test.Test, v variations) t } // restart will gracefully stop and then restart a node after a custom duration. -type decommission struct{} +type decommission struct { + drain bool +} + +var _ perturbation = &decommission{} -var _ perturbation = restart{} +func (d *decommission) String() string { + return fmt.Sprintf("decommission{drain: %t}", d.drain) +} + +func (d *decommission) setupMetamorphic(rng *rand.Rand) { + d.drain = rng.Intn(2) == 0 +} -func (d decommission) startTargetNode(ctx context.Context, t test.Test, v variations) { +func (d *decommission) startTargetNode(ctx context.Context, t test.Test, v variations) { v.startNoBackup(ctx, t, v.targetNodes()) } -func (d decommission) startPerturbation( +func (d *decommission) startPerturbation( ctx context.Context, t test.Test, v variations, ) time.Duration { startTime := timeutil.Now() // TODO(baptist): If we want to support multiple decommissions in parallel, // run drain and decommission in separate goroutine. - t.L().Printf("draining target nodes") - for _, node := range v.targetNodes() { - drainCmd := fmt.Sprintf( - "./cockroach node drain --self --certs-dir=%s --port={pgport:%d}", - install.CockroachNodeCertsDir, - node, - ) - v.Run(ctx, option.WithNodes(v.Node(node)), drainCmd) + if d.drain { + t.L().Printf("draining target nodes") + for _, node := range v.targetNodes() { + drainCmd := fmt.Sprintf( + "./cockroach node drain --self --certs-dir=%s --port={pgport:%d}", + install.CockroachNodeCertsDir, + node, + ) + v.Run(ctx, option.WithNodes(v.Node(node)), drainCmd) + } + // Wait for all the other nodes to see the drain over gossip. + time.Sleep(10 * time.Second) } - // Wait for all the other nodes to see the drain over gossip. - time.Sleep(10 * time.Second) - t.L().Printf("decommissioning nodes") for _, node := range v.targetNodes() { decommissionCmd := fmt.Sprintf( @@ -505,7 +611,7 @@ func (d decommission) startPerturbation( // endPerturbation already waited for completion as part of start, so it doesn't // need to wait again here. -func (d decommission) endPerturbation( +func (d *decommission) endPerturbation( ctx context.Context, t test.Test, v variations, ) time.Duration { waitDuration(ctx, v.validationDuration) @@ -794,7 +900,9 @@ func isAcceptableChange( } // startNoBackup starts the nodes without enabling backup. -func (v variations) startNoBackup(ctx context.Context, t test.Test, nodes option.NodeListOption) { +func (v variations) startNoBackup( + ctx context.Context, t test.Test, nodes option.NodeListOption, extraArgs ...string, +) { nodesPerRegion := v.numNodes / NUM_REGIONS for _, node := range nodes { // Don't start a backup schedule because this test is timing sensitive. @@ -802,6 +910,7 @@ func (v variations) startNoBackup(ctx context.Context, t test.Test, nodes option opts.RoachprodOpts.StoreCount = v.disks opts.RoachprodOpts.ExtraArgs = append(opts.RoachprodOpts.ExtraArgs, fmt.Sprintf("--locality=region=fake-%d", (node-1)/nodesPerRegion)) + opts.RoachprodOpts.ExtraArgs = append(opts.RoachprodOpts.ExtraArgs, extraArgs...) v.Start(ctx, t.L(), opts, install.MakeClusterSettings(), v.Node(node)) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index bb473e5cea0d..9d674a3fc4a0 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -566,6 +566,7 @@ type rangeController struct { // to call into the replicaSendStreams that have asked to be scheduled. replicas map[roachpb.ReplicaID]struct{} } + entryFCStateScratch []entryFCState } // voterStateForWaiters informs whether WaitForEval is required to wait for @@ -635,7 +636,8 @@ func (rc *rangeController) WaitForEval( if wc == admissionpb.ElasticWorkClass { waitForAllReplicateHandles = true } - var handles []tokenWaitingHandleInfo + var handlesScratch [5]tokenWaitingHandleInfo + handles := handlesScratch[:] var scratch []reflect.SelectCase rc.opts.EvalWaitMetrics.OnWaiting(wc) @@ -938,7 +940,11 @@ func constructRaftEventForReplica( func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error { // Compute the flow control state for each new entry. We do this once // here, instead of decoding each entry multiple times for all replicas. - newEntries := make([]entryFCState, len(e.Entries)) + numEntries := len(e.Entries) + if cap(rc.entryFCStateScratch) < numEntries { + rc.entryFCStateScratch = make([]entryFCState, 0, 2*numEntries) + } + newEntries := rc.entryFCStateScratch[:numEntries:numEntries] // needsTokens tracks which classes need tokens for the new entries. This // informs first-pass decision-making on replicas that don't have // send-queues, in MsgAppPull mode, and therefore can potentially send the @@ -2246,6 +2252,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( event.sendingEntries[0].id.index, rss.mu.sendQueue.indexToSend)) } rss.mu.sendQueue.indexToSend = event.sendingEntries[n-1].id.index + 1 + var sendTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens for _, entry := range event.sendingEntries { if !entry.usesFlowControl { continue @@ -2275,12 +2282,17 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens rss.mu.sendQueue.preciseSizeSum -= tokens } - flag := AdjNormal - if directive.preventSendQNoForceFlush { - flag = AdjPreventSendQueue - } - rss.parent.sendTokenCounter.Deduct(ctx, WorkClassFromRaftPriority(pri), tokens, flag) rss.mu.tracker.Track(ctx, entry.id, pri, tokens) + sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens + } + flag := AdjNormal + if directive.preventSendQNoForceFlush { + flag = AdjPreventSendQueue + } + for wc, tokens := range sendTokensToDeduct { + if tokens != 0 { + rss.parent.sendTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, flag) + } } if directive.preventSendQNoForceFlush { rss.parent.parent.opts.RangeControllerMetrics.SendQueue.PreventionCount.Inc(1) @@ -2292,6 +2304,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( event.newEntries[0].id.index, rss.mu.sendQueue.nextRaftIndex)) } rss.mu.sendQueue.nextRaftIndex = event.newEntries[n-1].id.index + 1 + var evalTokensToDeduct [admissionpb.NumWorkClasses]kvflowcontrol.Tokens for _, entry := range event.newEntries { if !entry.usesFlowControl { continue @@ -2327,9 +2340,14 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] += tokens } wc := WorkClassFromRaftPriority(pri) - rss.parent.evalTokenCounter.Deduct(ctx, wc, tokens, AdjNormal) + evalTokensToDeduct[wc] += tokens rss.mu.eval.tokensDeducted[wc] += tokens } + for wc, tokens := range evalTokensToDeduct { + if tokens != 0 { + rss.parent.evalTokenCounter.Deduct(ctx, admissionpb.WorkClass(wc), tokens, AdjNormal) + } + } } if n := len(event.sendingEntries); n > 0 && event.mode == MsgAppPull { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index ced5867e3f64..c666b7e46813 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -30,7 +30,7 @@ func MakeRaftNodeBasicStateLocked( // held, at least for read. func MakeReplicaStateInfos(rn *raft.RawNode, infoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo) { clear(infoMap) - rn.WithProgress(func(peerID raftpb.PeerID, _ raft.ProgressType, progress tracker.Progress) { + rn.WithBasicProgress(func(peerID raftpb.PeerID, progress tracker.BasicProgress) { infoMap[roachpb.ReplicaID(peerID)] = rac2.ReplicaStateInfo{ Match: progress.Match, Next: progress.Next, diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index 03dc487ffefc..81473c94595e 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" - "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -211,7 +210,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // to consider progress beyond it as meaningful. minIndex := kvpb.RaftIndex(status.Applied) - r.mu.internalRaftGroup.WithProgress(func(id raftpb.PeerID, _ raft.ProgressType, progress tracker.Progress) { + r.mu.internalRaftGroup.WithBasicProgress(func(id raftpb.PeerID, progress tracker.BasicProgress) { rep, ok := r.shMu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { return diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5e5814c96c2a..ff7fea50843d 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1993,7 +1993,7 @@ func (r *Replica) sendRaftMessage( // below for more context: _ = maybeDropMsgApp // NB: this code is allocation free. - r.mu.internalRaftGroup.WithProgress(func(id raftpb.PeerID, _ raft.ProgressType, pr tracker.Progress) { + r.mu.internalRaftGroup.WithBasicProgress(func(id raftpb.PeerID, pr tracker.BasicProgress) { if id == msg.To && pr.State == tracker.StateProbe { // It is moderately expensive to attach a full key to the message, but note that // a probing follower will only be appended to once per heartbeat interval (i.e. diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 3bf532ab535d..179f415b748a 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -630,6 +630,12 @@ func (rn *RawNode) WithProgress(visitor func(id pb.PeerID, typ ProgressType, pr withProgress(rn.raft, visitor) } +// WithBasicProgress is a helper to introspect the BasicProgress for this node +// and its peers. +func (rn *RawNode) WithBasicProgress(visitor func(id pb.PeerID, pr tracker.BasicProgress)) { + rn.raft.trk.WithBasicProgress(visitor) +} + // ReportUnreachable reports the given node is not reachable for the last send. func (rn *RawNode) ReportUnreachable(id pb.PeerID) { _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id}) diff --git a/pkg/raft/tracker/inflights.go b/pkg/raft/tracker/inflights.go index 6a09ff37a21e..b9bb090ed9ed 100644 --- a/pkg/raft/tracker/inflights.go +++ b/pkg/raft/tracker/inflights.go @@ -64,13 +64,15 @@ func (in *Inflights) Clone() *Inflights { } // Add notifies the Inflights that a new message with the given index and byte -// size is being dispatched. Full() must be called prior to Add() to verify that -// there is room for one more message, and consecutive calls to Add() must -// provide a monotonic sequence of indexes. +// size is being dispatched. Consecutive calls to Add() must provide a monotonic +// sequence of log indices. +// +// The caller should check that the tracker is not Full(), before calling Add(). +// If the tracker is full, the caller should hold off sending entries to the +// peer. However, Add() is still valid and allowed, for cases when this pacing +// is implemented at the higher app level. The tracker correctly tracks all the +// in-flight entries. func (in *Inflights) Add(index, bytes uint64) { - if in.Full() { - panic("cannot add into a Full inflights") - } next := in.start + in.count size := in.size if next >= size { @@ -134,7 +136,7 @@ func (in *Inflights) FreeLE(to uint64) { // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) + return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. diff --git a/pkg/raft/tracker/inflights_test.go b/pkg/raft/tracker/inflights_test.go index a7b1c7f5c2db..dafcd15d2a08 100644 --- a/pkg/raft/tracker/inflights_test.go +++ b/pkg/raft/tracker/inflights_test.go @@ -1,3 +1,6 @@ +// This code has been modified from its original form by The Cockroach Authors. +// All modifications are Copyright 2024 The Cockroach Authors. +// // Copyright 2019 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -203,26 +206,18 @@ func TestInflightsFull(t *testing.T) { addUntilFull := func(begin, end int) { for i := begin; i < end; i++ { - if in.Full() { - t.Fatalf("full at %d, want %d", i, end) - } + require.False(t, in.Full(), "full at %d, want %d", i, end) in.Add(uint64(i), uint64(100+i)) } - if !in.Full() { - t.Fatalf("not full at %d", end) - } + require.True(t, in.Full()) } addUntilFull(0, tc.fullAt) in.FreeLE(tc.freeLE) addUntilFull(tc.fullAt, tc.againAt) - defer func() { - if r := recover(); r == nil { - t.Errorf("Add() did not panic") - } - }() in.Add(100, 1024) + require.True(t, in.Full()) // the full tracker remains full }) } } diff --git a/pkg/raft/tracker/progress.go b/pkg/raft/tracker/progress.go index 55ee39b3b64f..8b4376cdd07b 100644 --- a/pkg/raft/tracker/progress.go +++ b/pkg/raft/tracker/progress.go @@ -433,3 +433,13 @@ func (m ProgressMap) String() string { } return buf.String() } + +// BasicProgress contains a subset of fields from Progress. +type BasicProgress struct { + // Match corresponds to Progress.Match. + Match uint64 + // Next corresponds to Progress.Next. + Next uint64 + // State corresponds to Progress.State. + State StateType +} diff --git a/pkg/raft/tracker/progresstracker.go b/pkg/raft/tracker/progresstracker.go index 3f888e82e941..cda7faf18060 100644 --- a/pkg/raft/tracker/progresstracker.go +++ b/pkg/raft/tracker/progresstracker.go @@ -137,3 +137,15 @@ func (p *ProgressTracker) LearnerNodes() []pb.PeerID { slices.Sort(nodes) return nodes } + +// WithBasicProgress is a helper to introspect the BasicProgress for this node +// and its peers. +func (p *ProgressTracker) WithBasicProgress(visitor func(id pb.PeerID, pr BasicProgress)) { + for id, p := range p.progress { + visitor(id, BasicProgress{ + Match: p.Match, + Next: p.Next, + State: p.State, + }) + } +}