Skip to content

Commit

Permalink
feat: support for sleeping when throttled
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hindess <[email protected]>
  • Loading branch information
hindessm committed Aug 1, 2023
1 parent 7d7ac52 commit 5ac5dc0
Showing 1 changed file with 31 additions and 3 deletions.
34 changes: 31 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Broker struct {

kerberosAuthenticator GSSAPIKerberosAuth
clientSessionReauthenticationTimeMs int64

throttleTimer *time.Timer
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand Down Expand Up @@ -456,7 +458,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
}

// Well-formed response
b.updateThrottleMetric(res)
b.handleThrottledResponse(res)
cb(res, nil)
},
}
Expand Down Expand Up @@ -999,6 +1001,9 @@ func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error {
return err
}

// check and wait if throttled
b.waitIfThrottled()

requestTime := time.Now()
// Will be decremented in responseReceiver (except error or request with NoResponse)
b.addRequestInFlightMetrics(1)
Expand Down Expand Up @@ -1046,7 +1051,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return err
}
if res != nil {
b.updateThrottleMetric(res)
b.handleThrottledResponse(res)
}
return nil
}
Expand Down Expand Up @@ -1645,7 +1650,7 @@ type throttleSupport interface {
throttleTime() time.Duration
}

func (b *Broker) updateThrottleMetric(resp protocolBody) {
func (b *Broker) handleThrottledResponse(resp protocolBody) {
throttledResponse, ok := resp.(throttleSupport)
if !ok {
return
Expand All @@ -1656,6 +1661,29 @@ func (b *Broker) updateThrottleMetric(resp protocolBody) {
}
DebugLogger.Printf(
"broker/%d %T throttled %v\n", b.ID(), resp, throttleTime)
b.setThrottle(throttleTime)
b.updateThrottleMetric(throttleTime)
}

func (b *Broker) setThrottle(throttleTime time.Duration) {
if b.throttleTimer != nil {
// if there is an existing timer stop/clear it
if !b.throttleTimer.Stop() {
<-b.throttleTimer.C
}
}
b.throttleTimer = time.NewTimer(throttleTime)
}

func (b *Broker) waitIfThrottled() {
if b.throttleTimer != nil {
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
<-b.throttleTimer.C
b.throttleTimer = nil
}
}

func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
Expand Down

0 comments on commit 5ac5dc0

Please sign in to comment.