Skip to content

Commit

Permalink
changefeedccl: revert Dial() to fetch all metadata topics
Browse files Browse the repository at this point in the history
Patch (cockroachdb#114740) changed Dial() in pkg/ccl/changefeedccl/sink_kafka.go to refresh
metadata exclusively for kafkaSink.topics() rather than for all topics. This
triggered a sarama error ( an invalid replication factor in Kafka servers). It
seems hard to get to the bottom of it given that there are too many unknowns
from sarama and kafka side (More in cockroachdb#116872). We decided to revert back to what
Dial() previously did in sarama code and will continue investigating afterwards.

Part of: cockroachdb#116872
Fixes: cockroachdb#116358
Release note: None
  • Loading branch information
wenyihu6 committed Dec 21, 2023
1 parent 4870247 commit d22f882
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
18 changes: 14 additions & 4 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,20 @@ func (s *kafkaSink) Dial() error {
return err
}

if err = client.RefreshMetadata(s.Topics()...); err != nil {
// Now that we do not fetch metadata for all topics by default, we try
// RefreshMetadata manually to check for any connection error.
return errors.CombineErrors(err, client.Close())
if err = client.RefreshMetadata(); err != nil {
// Dial() seems to be at a weird state while topics are still setting up and
// passing specific topics s.Topics() RefreshMetadata here can trigger a
// sarama error. To match the previous behaviour in sarama prior to #114740
// during Dial(), we manually fetch metadata for all topics just during
// Dial(). See more in #116872.
if errors.Is(err, sarama.ErrLeaderNotAvailable) || errors.Is(err, sarama.ErrReplicaNotAvailable) ||
errors.Is(err, sarama.ErrTopicAuthorizationFailed) || errors.Is(err, sarama.ErrClusterAuthorizationFailed) {
// To match sarama code in NewClient with conf.Metadata.Full == true, we
// swallow the error when it is one of the errors above.
log.Infof(s.ctx, "kafka sink unable to refreshMetadata during Dial() due to: %s", err.Error())
} else {
return errors.CombineErrors(err, client.Close())
}
}

producer, err := s.newAsyncProducer(client)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ func registerCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "cdc/kafka-topics",
Owner: `cdc`,
Benchmark: true,
Skip: "#116872",
Cluster: r.MakeClusterSpec(4, spec.Arch(vm.ArchAMD64)),
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Expand Down

0 comments on commit d22f882

Please sign in to comment.