Skip to content

Commit

Permalink
kvserver/rangefeed: add buffered sender cdc_bench
Browse files Browse the repository at this point in the history
to discuss:we can't use cluster settings to change things without involving buffered registration at the beginning
  • Loading branch information
wenyihu6 committed Oct 14, 2024
1 parent 1efd2f9 commit 2179fb1
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ const (
// practice it can.
cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"

cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler
cdcBenchNoServer cdcBenchServer = ""
cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
cdcBenchSchedulerServer cdcBenchServer = "scheduler_with_unbuffered_sender" // new scheduler
cdcBenchSchedulerServerWithBufferedSender cdcBenchServer = "scheduler_with_buffered_sender" // new scheduler
)

var (
cdcBenchScanTypes = []cdcBenchScanType{
cdcBenchInitialScan, cdcBenchCatchupScan, cdcBenchColdCatchupScan}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer, cdcBenchSchedulerServer}
cdcBenchServers = []cdcBenchServer{cdcBenchProcessorServer, cdcBenchSchedulerServer, cdcBenchSchedulerServerWithBufferedSender}
)

func registerCDCBench(r registry.Registry) {
Expand Down Expand Up @@ -121,7 +122,7 @@ func registerCDCBench(r registry.Registry) {
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Timeout: 2 * time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, "", "", nullSink)
},
Expand All @@ -139,7 +140,7 @@ func registerCDCBench(r registry.Registry) {
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Timeout: time.Hour,
Timeout: 2 * time.Hour,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBenchWorkload(ctx, t, c, ranges, readPercent, server, format, nullSink)
},
Expand Down Expand Up @@ -425,6 +426,9 @@ func runCDCBenchWorkload(
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "false"
case cdcBenchSchedulerServer:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "true"
case cdcBenchSchedulerServerWithBufferedSender:
settings.ClusterSettings["kv.rangefeed.scheduler.enabled"] = "true"
settings.ClusterSettings["kv.rangefeed.buffered_stream_sender.enabled"] = "true"
case cdcBenchNoServer:
default:
t.Fatalf("unknown server type %q", server)
Expand Down

0 comments on commit 2179fb1

Please sign in to comment.