-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use least loaded broker to refresh metadata #2645
Conversation
8d06ab9
to
ede5508
Compare
functional_test.go
Outdated
@@ -228,7 +228,7 @@ mainLoop: | |||
} | |||
for _, broker := range brokers { | |||
err := broker.Open(client.Config()) | |||
if err != nil { | |||
if err != nil && err != ErrAlreadyConnected { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the functional tests to skip the ErrAlreadyConnected error because this change will use one broker from the broker list to refresh metadata, which is already connected when reaching the following validation.
@dnwe Could you please review my change? Thanks |
19b1221
to
f5c9e47
Compare
Seed brokers never change after client initialization. If the first seed broker became stale (still online, but moved to other Kafka cluster), Sarama client may use this stale broker to get the wrong metadata. To avoid using the stale broker to do metadata refresh, we will choose the least loaded broker in the cached broker list which is the similar to how the Java client implementation works: https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736 Contributes-to: IBM#2637 Signed-off-by: Hao Sun <[email protected]>
f5c9e47
to
98ec384
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me. One minor query on the existing deregisterBroker behaviour for the seedBrokers list, but happy to approve and merge as-is
// deregisterBroker removes a broker from the broker list, and if it's | ||
// not in the broker list, removes it from seedBrokers. | ||
func (client *client) deregisterBroker(broker *Broker) { | ||
client.lock.Lock() | ||
defer client.lock.Unlock() | ||
|
||
_, ok := client.brokers[broker.ID()] | ||
if ok { | ||
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) | ||
delete(client.brokers, broker.ID()) | ||
return | ||
} | ||
if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] { | ||
client.deadSeeds = append(client.deadSeeds, broker) | ||
client.seedBrokers = client.seedBrokers[1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we only seem to deregister the broker from the seedBrokers list if it's the first element in the list (after the most recent shuffle) — is that still the desired behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it is expected. seed brokers only shuffle during client initialization or hard RefreshBrokers. After that, the cached broker list is empty, and the client will use the first seed broker to fetch metadata. deregisterBroker method deregisters the first seed broker will apply that moment when this first seed broker is unavailable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know when we have a new release including this change so that our side can use it? @dnwe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seed brokers never change after client initialization. If the first seed
broker became stale (still online, but moved to other Kafka cluster),
Sarama client may use this stale broker to get the wrong metadata. To
avoid using the stale broker to do metadata refresh, we will choose the
least loaded broker in the cached broker list which is the similar to
how the Java client implementation works:
https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736
Contributes-to: #2637