Skip to content

Commit

Permalink
chore(test): speedup some slow tests
Browse files Browse the repository at this point in the history
- make consumer and produce retries zero backoff
- use parallel on TLS test
- reduce mockbroker noise
- add mockresponses support for per-topic Err in MetadataResponse
- use microseconds in test debug logger

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 10, 2023
1 parent fa7db9a commit 991b2b0
Show file tree
Hide file tree
Showing 20 changed files with 170 additions and 151 deletions.
9 changes: 0 additions & 9 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2323,12 +2323,3 @@ ProducerLoop:

log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}

// NewTestConfig returns a config meant to be used by tests.
// Due to inconsistencies with the request versions the clients send using the default Kafka version
// and the response versions our mocks use, we default to the minimum Kafka version in most tests
func NewTestConfig() *Config {
config := NewConfig()
config.Version = MinVersion
return config
}
1 change: 1 addition & 0 deletions client_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func TestTLS(t *testing.T) {
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client)
})
}
Expand Down
11 changes: 11 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"github.com/rcrowley/go-metrics"
)

// NewTestConfig returns a config meant to be used by tests.
// Due to inconsistencies with the request versions the clients send using the default Kafka version
// and the response versions our mocks use, we default to the minimum Kafka version in most tests
func NewTestConfig() *Config {
config := NewConfig()
config.Consumer.Retry.Backoff = 0
config.Producer.Retry.Backoff = 0
config.Version = MinVersion
return config
}

func TestDefaultConfigValidates(t *testing.T) {
config := NewTestConfig()
if err := config.Validate(); err != nil {
Expand Down
87 changes: 31 additions & 56 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
config.Version = V2_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Retry.Max = 2
config.Consumer.Group.Rebalance.Retry.Backoff = 0
config.Consumer.Offsets.AutoCommit.Enable = false

broker0 := NewMockBroker(t, 0)
Expand Down Expand Up @@ -100,72 +101,46 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
}

func TestConsume_RaceTest(t *testing.T) {
const groupID = "test-group"
const topic = "test-topic"
const offsetStart = int64(1234)
const (
groupID = "test-group"
topic = "test-topic"
offsetStart = int64(1234)
)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_8_1_0
cfg.Consumer.Return.Errors = true
cfg.Metadata.Full = true

seedBroker := NewMockBroker(t, 1)

joinGroupResponse := &JoinGroupResponse{}

syncGroupResponse := &SyncGroupResponse{
Version: 3, // sarama > 2.3.0.0 uses version 3
}
// Leverage mock response to get the MemberAssignment bytes
mockSyncGroupResponse := NewMockSyncGroupResponse(t).SetMemberAssignment(&ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0
UserData: []byte{0x01},
})
syncGroupResponse.MemberAssignment = mockSyncGroupResponse.MemberAssignment

heartbeatResponse := &HeartbeatResponse{
Err: ErrNoError,
}
offsetFetchResponse := &OffsetFetchResponse{
Version: 1,
ThrottleTimeMs: 0,
Err: ErrNoError,
}
offsetFetchResponse.AddBlock(topic, 0, &OffsetFetchResponseBlock{
Offset: offsetStart,
LeaderEpoch: 0,
Metadata: "",
Err: ErrNoError,
})

offsetResponse := &OffsetResponse{
Version: 1,
}
offsetResponse.AddTopicPartition(topic, 0, offsetStart)

metadataResponse := new(MetadataResponse)
metadataResponse.Version = 10
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopic("mismatched-topic", ErrUnknownTopicOrPartition)
defer seedBroker.Close()

handlerMap := map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockSequence(metadataResponse),
"OffsetRequest": NewMockSequence(offsetResponse),
"OffsetFetchRequest": NewMockSequence(offsetFetchResponse),
"FindCoordinatorRequest": NewMockSequence(NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, groupID, seedBroker)),
"JoinGroupRequest": NewMockSequence(joinGroupResponse),
"SyncGroupRequest": NewMockSequence(syncGroupResponse),
"HeartbeatRequest": NewMockSequence(heartbeatResponse),
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetError("mismatched-topic", ErrUnknownTopicOrPartition),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset(topic, 0, -1, offsetStart),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).
SetOffset(groupID, topic, 0, offsetStart, "", ErrNoError),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, groupID, seedBroker),
"JoinGroupRequest": NewMockJoinGroupResponse(t),
"SyncGroupRequest": NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0
UserData: []byte{0x01},
},
),
"HeartbeatRequest": NewMockHeartbeatResponse(t),
}
seedBroker.SetHandlerByMap(handlerMap)

cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))

defer seedBroker.Close()
cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))

retryWait := 20 * time.Millisecond
retryWait := 10 * time.Millisecond
var err error
clientRetries := 0
outerFor:
Expand Down Expand Up @@ -195,8 +170,8 @@ outerFor:
t.Fatalf("should not proceed to Consume")
}

if clientRetries <= 0 {
t.Errorf("clientRetries = %v; want > 0", clientRetries)
if clientRetries <= 1 {
t.Errorf("clientRetries = %v; want > 1", clientRetries)
}

if err != nil && !errors.Is(err, context.DeadlineExceeded) {
Expand Down
13 changes: 7 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
block4 := fetchResponse4.GetBlock("my_topic", 0)
block4.PreferredReadReplica = -1

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -925,7 +925,7 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) {
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = -1

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -981,7 +981,7 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) {
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -1051,7 +1051,7 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -1109,9 +1109,10 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
//
// See https://github.com/IBM/sarama/issues/1927
func TestConsumeMessagesTrackLeader(t *testing.T) {
cfg := NewConfig()
cfg := NewTestConfig()
cfg.ClientID = t.Name()
cfg.Metadata.RefreshFrequency = time.Millisecond * 50
cfg.Consumer.Retry.Backoff = 0
cfg.Net.MaxOpenRequests = 1
cfg.Version = V2_1_0_0

Expand Down Expand Up @@ -1996,7 +1997,7 @@ func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) {
broker: &brokerConsumer{
broker: &Broker{},
},
conf: NewConfig(),
conf: NewTestConfig(),
topic: "my_topic",
partition: 0,
}
Expand Down
8 changes: 4 additions & 4 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestFuncAdminQuotas(t *testing.T) {
t.Fatal(err)
}

config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Version = kafkaVersion
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -137,21 +137,21 @@ func TestFuncAdminDescribeGroups(t *testing.T) {
t.Fatal(err)
}

config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Version = kafkaVersion
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

config1 := NewTestConfig()
config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
Expand Down
6 changes: 3 additions & 3 deletions functional_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestFuncConnectionFailure(t *testing.T) {
FunctionalTestEnv.Proxies["kafka1"].Enabled = false
SaveProxy(t, "kafka1")

config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Metadata.Retry.Max = 1

_, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
Expand All @@ -30,7 +30,7 @@ func TestFuncClientMetadata(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 10 * time.Millisecond
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFuncClientCoordinator(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig())
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestConsumerFetchFollowerFailover(t *testing.T) {
)

newConfig := func() *Config {
config := NewConfig()
config := NewFunctionalTestConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
Expand Down Expand Up @@ -81,7 +81,8 @@ func TestConsumerFetchFollowerFailover(t *testing.T) {
go func() {
for i := 0; i < numMsg; i++ {
msg := &ProducerMessage{
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))}
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i)),
}
if _, offset, err := producer.SendMessage(msg); err != nil {
t.Error(i, err)
} else if offset%50 == 0 {
Expand Down
8 changes: 4 additions & 4 deletions functional_consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestFuncConsumerGroupRebalanceAfterAddingPartitions(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestFuncConsumerGroupOffsetDeletion(t *testing.T) {
defer teardownFunctionalTest(t)
// create a client with 2.4.0 version as it is the minimal version
// that supports DeleteOffsets request
config := NewTestConfig()
config := NewFunctionalTestConfig()
config.Version = V2_4_0_0
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
defer safeClose(t, client)
Expand Down Expand Up @@ -315,7 +315,7 @@ func markOffset(t *testing.T, offsetMgr OffsetManager, topic string, partition i
}

func testFuncConsumerGroupFuzzySeed(topic string) error {
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig())
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
if err != nil {
return err
}
Expand Down Expand Up @@ -399,7 +399,7 @@ type testFuncConsumerGroupMember struct {
}

func defaultConfig(clientID string) *Config {
config := NewConfig()
config := NewFunctionalTestConfig()
config.ClientID = clientID
config.Version = V0_10_2_0
config.Consumer.Return.Errors = true
Expand Down
Loading

0 comments on commit 991b2b0

Please sign in to comment.