diff --git a/broker_test.go b/broker_test.go index 32ddb8694..186e92d16 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1435,3 +1435,98 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) { broker.Close() } } + +func Test_handleThrottledResponse(t *testing.T) { + mb := NewMockBroker(nil, 0) + broker := NewBroker(mb.Addr()) + broker.id = 0 + conf := NewTestConfig() + conf.Version = V1_0_0_0 + throttleTimeMs := 100 + throttleTime := time.Duration(throttleTimeMs) * time.Millisecond + tests := []struct { + name string + response protocolBody + expectDelay bool + }{ + { + name: "throttled response w/millisecond field", + response: &MetadataResponse{ + ThrottleTimeMs: int32(throttleTimeMs), + }, + expectDelay: true, + }, + { + name: "not throttled response w/millisecond field", + response: &MetadataResponse{ + ThrottleTimeMs: 0, + }, + }, + { + name: "throttled response w/time.Duration field", + response: &ProduceResponse{ + ThrottleTime: throttleTime, + }, + expectDelay: true, + }, + { + name: "not throttled response w/time.Duration field", + response: &ProduceResponse{ + ThrottleTime: time.Duration(0), + }, + }, + { + name: "not throttled response with no throttle time field", + response: &SaslHandshakeResponse{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker.metricRegistry = metrics.NewRegistry() + broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms") + broker.handleThrottledResponse(tt.response) + startTime := time.Now() + broker.waitIfThrottled() + if tt.expectDelay { + if time.Since(startTime) < throttleTime { + t.Fatal("expected throttling to cause delay") + } + if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) { + t.Fatal("expected throttling to update metrics") + } + } else { + if time.Since(startTime) > throttleTime { + t.Fatal("expected no throttling delay") + } + if broker.brokerThrottleTime.Count() != 0 { + t.Fatal("expected no metrics update") + } + } + }) + } + t.Run("test second throttle timer overrides first", func(t *testing.T) { + broker.metricRegistry = metrics.NewRegistry() + broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms") + broker.handleThrottledResponse(&MetadataResponse{ + ThrottleTimeMs: int32(throttleTimeMs), + }) + firstTimer := broker.throttleTimer + broker.handleThrottledResponse(&MetadataResponse{ + ThrottleTimeMs: int32(throttleTimeMs * 2), + }) + if firstTimer.Stop() { + t.Fatal("expected first timer to be stopped") + } + startTime := time.Now() + broker.waitIfThrottled() + if time.Since(startTime) < throttleTime*2 { + t.Fatal("expected throttling to use second delay") + } + if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) { + t.Fatal("expected throttling to update metrics") + } + if broker.brokerThrottleTime.Max() != int64(throttleTimeMs*2) { + t.Fatal("expected throttling to update metrics") + } + }) +}