Skip to content

Commit

Permalink
added roachtest
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyihu6 committed Jan 29, 2024
1 parent 7c54194 commit 2aeb28a
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
kafka.install(ct.ctx)
kafka.start(ct.ctx, "kafka")

if args.kafkaQuota > 0 {
kafka.setProducerQuota(ct.ctx, args.kafkaQuota)
}

if args.kafkaChaos {
ct.mon.Go(func(ctx context.Context) error {
period, downTime := 2*time.Minute, 20*time.Second
Expand Down Expand Up @@ -505,6 +509,7 @@ type feedArgs struct {
assumeRole string
tolerateErrors bool
sinkURIOverride string
kafkaQuota int
cdcFeatureFlags
}

Expand Down Expand Up @@ -1242,6 +1247,34 @@ func registerCDC(r registry.Registry) {
ct.waitForWorkload()
},
})
r.Add(registry.TestSpec{
Name: "cdc/kafka-quota",
Owner: `cdc`,
Benchmark: true,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
ct := newCDCTester(ctx, t, c)
defer ct.Close()

ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"})

feed := ct.newChangefeed(feedArgs{
sinkType: kafkaSink,
targets: allTpccTargets,
opts: map[string]string{"initial_scan": "'no'"},
kafkaQuota: 1024,
})
ct.runFeedLatencyVerifier(feed, latencyTargets{
steadyLatency: 5 * time.Minute,
})
ct.waitForWorkload()
t.Fatal("failed statement")
},
})
r.Add(registry.TestSpec{
Name: "cdc/crdb-chaos",
Owner: `cdc`,
Expand Down Expand Up @@ -1995,6 +2028,40 @@ type kafkaManager struct {
useKafka2 bool
}

func (k kafkaManager) setProducerQuota(ctx context.Context, bytesPerSecond int) {
// bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
k.t.Status("setting producer quota to %d bytes per second for all users", bytesPerSecond)
//k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"),
// "--bootstrap-server", "localhost:9092",
// "--alter",
// "--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond),
// "--entity-type", "users",
// "--entity-name", "default")
//

k.c.Run(ctx, option.WithNodes(k.kafkaSinkNode), filepath.Join(k.binDir(), "kafka-configs"),
// bootstrap-server=localhost:9092
"--bootstrap-server", "localhost:9092",
"--alter",
"--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond),
"--entity-type", "users",
"--entity-default")

//k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"),
// "--bootstrap-server", "localhost:9092",
// "--alter",
// "--add-config", "SCRAM-SHA-512=[password=scram512-secret]",
// "--entity-type", "users",
// "--entity-name", "scram512")
//
//k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"),
// "--bootstrap-server", "localhost:9092",
// "--alter",
// "--add-config", "SCRAM-SHA-256=[password=scram256-secret]",
// "--entity-type", "users",
// "--entity-name", "scram256")
}

func (k kafkaManager) basePath() string {
if k.c.IsLocal() {
return `/tmp/confluent`
Expand Down

0 comments on commit 2aeb28a

Please sign in to comment.