Skip to content

Commit

Permalink
fix: add broker connection check in metadata refresh loop. Closes bro…
Browse files Browse the repository at this point in the history
…ken broker connections
  • Loading branch information
doxsch authored and dnwe committed Aug 19, 2023
1 parent 8681621 commit fab7238
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
17 changes: 17 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/rcrowley/go-metrics"
Expand Down Expand Up @@ -1765,3 +1766,19 @@ func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
c.ServerName = sn
return c
}

// isBroken checks if the connection on the broker is still working. For this purpose it sends
// an ApiVersions request. If the connection returns an EOF, ECONNRESET or EPIPE error, it is broken.
func (b *Broker) isBroken() bool {
_, err := b.ApiVersions(&ApiVersionsRequest{
ClientSoftwareName: defaultClientSoftwareName,
ClientSoftwareVersion: version(),
})

if err == io.EOF || errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) {

Check failure on line 1778 in broker.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.21.x

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
DebugLogger.Printf("Connection is broken on broker %s: %v", b.addr, err)
return true
}

return false
}
21 changes: 21 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ func (client *client) refreshMetadata() error {
}
}

client.checkAndCloseBrokenBroker()

if err := client.RefreshMetadata(topics...); err != nil {
return err
}
Expand Down Expand Up @@ -1090,6 +1092,25 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
return retry(error)
}

func (client *client) checkAndCloseBrokenBroker() {
client.lock.RLock()
defer client.lock.RUnlock()

DebugLogger.Println("check broker connections and close broken brokers")

for _, broker := range client.brokers {
broker.lock.Lock()
if broker.conn != nil {
broker.lock.Unlock()
if broker.isBroken() {
_ = broker.Close()
}
} else {
broker.lock.Unlock()
}
}
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
if client.Closed() {
Expand Down

0 comments on commit fab7238

Please sign in to comment.