diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 026bb0fff34a..bc29271872a6 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -257,10 +257,21 @@ func (s *kafkaSink) Dial() error { return err } - if err = client.RefreshMetadata(s.Topics()...); err != nil { - // Now that we do not fetch metadata for all topics by default, we try - // RefreshMetadata manually to check for any connection error. - return errors.CombineErrors(err, client.Close()) + if err = client.RefreshMetadata(); err != nil { + // Dial() seems to be a weird state while topics are still setting up and + // calling RefreshMetadata with specific topics s.Topics() here triggered a + // sarama error (pq: kafka server: Replication-factor is invalid). To match + // the previous behaviour in sarama prior to #114740 during Dial(), we + // manually fetch metadata for all topics just during Dial(). See more in + // #116872. + if errors.Is(err, sarama.ErrLeaderNotAvailable) || errors.Is(err, sarama.ErrReplicaNotAvailable) || + errors.Is(err, sarama.ErrTopicAuthorizationFailed) || errors.Is(err, sarama.ErrClusterAuthorizationFailed) { + // To match sarama code in NewClient with conf.Metadata.Full set to true, + // we swallow the error above. + log.Infof(s.ctx, "kafka sink unable to refreshMetadata during Dial() due to: %s", err.Error()) + } else { + return errors.CombineErrors(err, client.Close()) + } } producer, err := s.newAsyncProducer(client) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index a813b369bc61..0d30a421efb9 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1523,7 +1523,7 @@ func registerCDC(r registry.Registry) { r.Add(registry.TestSpec{ Name: "cdc/kafka-topics", Owner: `cdc`, - Benchmark: true, + Skip: "#116872", Cluster: r.MakeClusterSpec(4, spec.Arch(vm.ArchAMD64)), Leases: registry.MetamorphicLeases, CompatibleClouds: registry.AllExceptAWS,