Skip to content

Commit

Permalink
The producer can be easily blocked due to a race condition in Broker.…
Browse files Browse the repository at this point in the history
…throttleTimer, which may result in a panic. add throttleTimerLock to protect

Fixes #2823

Signed-off-by: shacheng <[email protected]>
  • Loading branch information
shacheng committed Mar 7, 2024
1 parent fd84c2b commit 7d6b70b
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Broker struct {
kerberosAuthenticator GSSAPIKerberosAuth
clientSessionReauthenticationTimeMs int64

throttleTimer *time.Timer
throttleTimer *time.Timer
throttleTimerLock sync.Mutex
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand Down Expand Up @@ -1697,6 +1698,8 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) {
}

func (b *Broker) setThrottle(throttleTime time.Duration) {
b.throttleTimerLock.Lock()
defer b.throttleTimerLock.Unlock()
if b.throttleTimer != nil {
// if there is an existing timer stop/clear it
if !b.throttleTimer.Stop() {
Expand All @@ -1707,6 +1710,8 @@ func (b *Broker) setThrottle(throttleTime time.Duration) {
}

func (b *Broker) waitIfThrottled() {
b.throttleTimerLock.Lock()
defer b.throttleTimerLock.Unlock()
if b.throttleTimer != nil {
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
<-b.throttleTimer.C
Expand Down

0 comments on commit 7d6b70b

Please sign in to comment.