Skip to content

Commit

Permalink
test: add throttling support test
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hindess <[email protected]>
  • Loading branch information
hindessm committed Aug 1, 2023
1 parent 5ac5dc0 commit 102513a
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,3 +1435,98 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) {
broker.Close()
}
}

func Test_handleThrottledResponse(t *testing.T) {
mb := NewMockBroker(nil, 0)
broker := NewBroker(mb.Addr())
broker.id = 0
conf := NewTestConfig()
conf.Version = V1_0_0_0
throttleTimeMs := 100
throttleTime := time.Duration(throttleTimeMs) * time.Millisecond
tests := []struct {
name string
response protocolBody
expectDelay bool
}{
{
name: "throttled response w/millisecond field",
response: &MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs),
},
expectDelay: true,
},
{
name: "not throttled response w/millisecond field",
response: &MetadataResponse{
ThrottleTimeMs: 0,
},
},
{
name: "throttled response w/time.Duration field",
response: &ProduceResponse{
ThrottleTime: throttleTime,
},
expectDelay: true,
},
{
name: "not throttled response w/time.Duration field",
response: &ProduceResponse{
ThrottleTime: time.Duration(0),
},
},
{
name: "not throttled response with no throttle time field",
response: &SaslHandshakeResponse{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
broker.metricRegistry = metrics.NewRegistry()
broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms")
broker.handleThrottledResponse(tt.response)
startTime := time.Now()
broker.waitIfThrottled()
if tt.expectDelay {
if time.Since(startTime) < throttleTime {
t.Fatal("expected throttling to cause delay")
}
if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) {
t.Fatal("expected throttling to update metrics")
}
} else {
if time.Since(startTime) > throttleTime {
t.Fatal("expected no throttling delay")
}
if broker.brokerThrottleTime.Count() != 0 {
t.Fatal("expected no metrics update")
}
}
})
}
t.Run("test second throttle timer overrides first", func(t *testing.T) {
broker.metricRegistry = metrics.NewRegistry()
broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms")
broker.handleThrottledResponse(&MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs),
})
firstTimer := broker.throttleTimer
broker.handleThrottledResponse(&MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs * 2),
})
if firstTimer.Stop() {
t.Fatal("expected first timer to be stopped")
}
startTime := time.Now()
broker.waitIfThrottled()
if time.Since(startTime) < throttleTime*2 {
t.Fatal("expected throttling to use second delay")
}
if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) {
t.Fatal("expected throttling to update metrics")
}
if broker.brokerThrottleTime.Max() != int64(throttleTimeMs*2) {
t.Fatal("expected throttling to update metrics")
}
})
}

0 comments on commit 102513a

Please sign in to comment.