Skip to content

Commit

Permalink
changefeedccl: revert Dial() to fetch all metadata topics
Browse files Browse the repository at this point in the history
Patch (cockroachdb#114740) changed Dial() in pkg/ccl/changefeedccl/sink_kafka.go to refresh
metadata exclusively for kafkaSink.topics() rather than for all topics. This
triggered a sarama error ( an invalid replication factor in Kafka servers). It
seems hard to get to the bottom of it given that there are too many unknowns
from sarama and kafka side (More in cockroachdb#116872). We decided to revert back to what
Dial() previously did in sarama code and will continue investigating afterwards.

Part of: cockroachdb#116358, cockroachdb#116872
Release note: None
  • Loading branch information
wenyihu6 committed Dec 20, 2023
1 parent c3943c8 commit 07ab1f9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
19 changes: 15 additions & 4 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,21 @@ func (s *kafkaSink) Dial() error {
return err
}

if err = client.RefreshMetadata(s.Topics()...); err != nil {
// Now that we do not fetch metadata for all topics by default, we try
// RefreshMetadata manually to check for any connection error.
return errors.CombineErrors(err, client.Close())
if err = client.RefreshMetadata(); err != nil {
// Dial() seems to be a weird state while topics are still setting up and
// calling RefreshMetadata with specific topics s.Topics() here triggered a
// sarama error (pq: kafka server: Replication-factor is invalid). To match
// the previous behaviour in sarama prior to #114740 during Dial(), we
// manually fetch metadata for all topics just during Dial(). See more in
// #116872.
if errors.Is(err, sarama.ErrLeaderNotAvailable) || errors.Is(err, sarama.ErrReplicaNotAvailable) ||
errors.Is(err, sarama.ErrTopicAuthorizationFailed) || errors.Is(err, sarama.ErrClusterAuthorizationFailed) {
// To match sarama code in NewClient with conf.Metadata.Full set to true,
// we swallow the error above.
log.Infof(s.ctx, "kafka sink unable to refreshMetadata during Dial() due to: %s", err.Error())
} else {
return errors.CombineErrors(err, client.Close())
}
}

producer, err := s.newAsyncProducer(client)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ func registerCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "cdc/kafka-topics",
Owner: `cdc`,
Benchmark: true,
Skip: "#116872",
Cluster: r.MakeClusterSpec(4, spec.Arch(vm.ArchAMD64)),
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Expand Down

0 comments on commit 07ab1f9

Please sign in to comment.