Skip to content

Commit

Permalink
fix: RemoveMemberFromConsumerGroup needs >= 2.4.0
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 2, 2023
1 parent 15e0ebd commit 12e0507
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
4 changes: 4 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,10 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
}

controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions functional_consumer_staticmembership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) {
}

func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {
checkKafkaVersion(t, "2.3.0")
checkKafkaVersion(t, "2.4.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
groupID := testFuncConsumerGroupID(t)
Expand All @@ -76,15 +76,15 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {

config1 := NewTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Version = V2_4_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Version = V2_4_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
Expand Down

0 comments on commit 12e0507

Please sign in to comment.