From fab7238cd609373b1f0353fb1e3cf1be6449b69e Mon Sep 17 00:00:00 2001 From: doxsch <28098153+doxsch@users.noreply.github.com> Date: Tue, 11 Jan 2022 10:36:11 +0100 Subject: [PATCH] fix: add broker connection check in metadata refresh loop. Closes broken broker connections --- broker.go | 17 +++++++++++++++++ client.go | 21 +++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/broker.go b/broker.go index ff8cc8125..1996770fd 100644 --- a/broker.go +++ b/broker.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/rcrowley/go-metrics" @@ -1765,3 +1766,19 @@ func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config { c.ServerName = sn return c } + +// isBroken checks if the connection on the broker is still working. For this purpose it sends +// an ApiVersions request. If the connection returns an EOF, ECONNRESET or EPIPE error, it is broken. +func (b *Broker) isBroken() bool { + _, err := b.ApiVersions(&ApiVersionsRequest{ + ClientSoftwareName: defaultClientSoftwareName, + ClientSoftwareVersion: version(), + }) + + if err == io.EOF || errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) { + DebugLogger.Printf("Connection is broken on broker %s: %v", b.addr, err) + return true + } + + return false +} diff --git a/client.go b/client.go index b3d356954..9679398cc 100644 --- a/client.go +++ b/client.go @@ -987,6 +987,8 @@ func (client *client) refreshMetadata() error { } } + client.checkAndCloseBrokenBroker() + if err := client.RefreshMetadata(topics...); err != nil { return err } @@ -1090,6 +1092,25 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, return retry(error) } +func (client *client) checkAndCloseBrokenBroker() { + client.lock.RLock() + defer client.lock.RUnlock() + + DebugLogger.Println("check broker connections and close broken brokers") + + for _, broker := range client.brokers { + broker.lock.Lock() + if broker.conn != nil { + broker.lock.Unlock() + if broker.isBroken() { + _ = broker.Close() + } + } else { + broker.lock.Unlock() + } + } +} + // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { if client.Closed() {