From d225e3a21505fcb94053ef13eb9071925779612b Mon Sep 17 00:00:00 2001 From: Hao Sun Date: Thu, 12 Oct 2023 23:36:34 -0700 Subject: [PATCH] fix(client): Metadata refresh should skip updates when metadata response is empty Fix #2664 We should skip the metadata refresh if the startup phase broker returns empty brokers in metadata response. The Java client skips the empty response to update the metadata cache (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1149) and we should make a feature parity in Sarama too Signed-off-by: Hao Sun --- client.go | 7 ++++ client_test.go | 86 ++++++++++++++++++++++++++++++++++++++---- client_tls_test.go | 4 +- offset_manager_test.go | 4 +- sync_producer_test.go | 1 + 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 5e665eaef..c6364eead 100644 --- a/client.go +++ b/client.go @@ -1035,6 +1035,13 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, var kerror KError var packetEncodingError PacketEncodingError if err == nil { + // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924). + if len(response.Brokers) == 0 { + Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr) + _ = broker.Close() + client.deregisterBroker(broker) + continue + } allKnownMetaData := len(topics) == 0 // valid response, use it shouldRetry, err := client.updateMetadata(response, allKnownMetaData) diff --git a/client_test.go b/client_test.go index 78243bce0..155827a00 100644 --- a/client_test.go +++ b/client_test.go @@ -23,7 +23,9 @@ func safeClose(t testing.TB, c io.Closer) { func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -92,6 +94,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -111,6 +114,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -358,6 +362,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) retryCount := int32(0) @@ -375,6 +380,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { metadataUnknownTopic := new(MetadataResponse) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -395,6 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -406,6 +413,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -481,6 +489,53 @@ func TestClientReceivingPartialMetadata(t *testing.T) { leader.Close() } +func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + broker := NewMockBroker(t, 2) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse1) + + c, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + client := c.(*client) + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 1 { + t.Error("incorrect number of brokers") + } + + // Empty metadata response + seedBroker.Returns(new(MetadataResponse)) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse2.AddBroker(broker.Addr(), broker.BrokerID()) + seedBroker.Returns(metadataResponse2) + err = c.RefreshMetadata() + if err != nil { + t.Fatal(err) + } + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 2 { + t.Error("incorrect number of brokers") + } + broker.Close() + seedBroker.Close() + safeClose(t, client) +} + func TestClientRefreshBehaviour(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -633,8 +688,9 @@ func TestClientGetBroker(t *testing.T) { func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) - emptyMetadata := new(MetadataResponse) - initialSeed.Returns(emptyMetadata) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) + initialSeed.Returns(metadataResponse) conf := NewTestConfig() conf.Metadata.Retry.Backoff = 0 @@ -643,7 +699,6 @@ func TestClientResurrectDeadSeeds(t *testing.T) { if err != nil { t.Fatal(err) } - initialSeed.Close() client := c.(*client) @@ -658,6 +713,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, client.seedBrokers[0]) client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)} client.deadSeeds = []*Broker{} + client.brokers = map[int32]*Broker{} wg := sync.WaitGroup{} wg.Add(1) @@ -676,7 +732,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) { seed3.Close() seed1.Close() - seed2.Returns(emptyMetadata) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seed2.Addr(), seed2.BrokerID()) + seed2.Returns(metadataResponse2) wg.Wait() @@ -767,6 +825,7 @@ func TestClientMetadataTimeout(t *testing.T) { // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) + emptyMetadata.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(emptyMetadata) conf := NewTestConfig() @@ -996,6 +1055,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { coordinator := NewMockBroker(t, 2) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -1011,11 +1071,13 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker.Returns(coordinatorResponse1) metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse2) replicas := []int32{coordinator.BrokerID()} metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse3) @@ -1049,6 +1111,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { defer seedBroker.Close() metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) conf := NewTestConfig() @@ -1105,7 +1168,9 @@ func TestClientConnectionRefused(t *testing.T) { func TestClientCoordinatorConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -1130,7 +1195,10 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) { func TestInitProducerIDConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(&MetadataResponse{Version: 4}) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse.Version = 4 + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Producer.Idempotent = true @@ -1161,7 +1229,9 @@ func TestInitProducerIDConnectionRefused(t *testing.T) { func TestMetricsCleanup(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() metrics.GetOrRegisterMeter("a", config.MetricRegistry) diff --git a/client_tls_test.go b/client_tls_test.go index aa01d63a6..7c2432c99 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -197,7 +197,9 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker := NewMockBrokerListener(childT, 1, seedListener) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Net.TLS.Enable = true diff --git a/offset_manager_test.go b/offset_manager_test.go index c3ac33641..def296be6 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -78,7 +78,9 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, func TestNewOffsetManager(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) diff --git a/sync_producer_test.go b/sync_producer_test.go index 8d366b011..776ae5f69 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -271,6 +271,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) broker.Returns(metadataResponse)