From 41754335c4c3d5678f98f0bc7fbd1906299e04cc Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 4 Aug 2023 12:28:10 +0100 Subject: [PATCH] fix(proto): use full range of SyncGroupRequest Signed-off-by: Dominic Evans --- .pre-commit-config.yaml | 2 +- consumer_group.go | 11 +++++++++-- mockresponses.go | 2 ++ sync_group_request.go | 4 +++- sync_group_response.go | 4 +++- 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d0b13672..d7271ee2e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,7 +15,7 @@ repos: - id: conventional-commit-msg-validation name: commit message conventional validation language: pygrep - entry: '^(breaking|build|chore|ci|docs|feat|fix|perf|refactor|revert|style|test){1}(\([\w\-\.]+\))?(!)?: ([\w `])+([\s\S]*)' + entry: '^(?:fixup! )?(breaking|build|chore|ci|docs|feat|fix|perf|refactor|revert|style|test){1}(\([\w\-\.]+\))?(!)?: ([\w `])+([\s\S]*)' args: [--multiline, --negate] stages: [commit-msg] - id: commit-msg-needs-to-be-signed-off diff --git a/consumer_group.go b/consumer_group.go index de119d520..a0a1e1a41 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -507,12 +507,19 @@ func (c *consumerGroup) syncGroupRequest( GenerationId: generationID, } + // Versions 1 and 2 are the same as version 0. + if c.config.Version.IsAtLeast(V0_11_0_0) { + req.Version = 1 + } + if c.config.Version.IsAtLeast(V2_0_0_0) { + req.Version = 2 + } + // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts. if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 3 - } - if c.groupInstanceId != nil { req.GroupInstanceId = c.groupInstanceId } + for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) diff --git a/mockresponses.go b/mockresponses.go index 15b4367f9..ef127769e 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1305,7 +1305,9 @@ func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse { } func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*SyncGroupRequest) resp := &SyncGroupResponse{ + Version: req.version(), Err: m.Err, MemberAssignment: m.MemberAssignment, } diff --git a/sync_group_request.go b/sync_group_request.go index 581b1ea29..95efc2858 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -135,8 +135,10 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion { return V2_0_0_0 case 1: return V0_11_0_0 - default: + case 0: return V0_9_0_0 + default: + return V2_3_0_0 } } diff --git a/sync_group_response.go b/sync_group_response.go index ca47f790d..f7da15b4f 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -71,8 +71,10 @@ func (r *SyncGroupResponse) requiredVersion() KafkaVersion { return V2_0_0_0 case 1: return V0_11_0_0 - default: + case 0: return V0_9_0_0 + default: + return V2_3_0_0 } }