Skip to content

Commit

Permalink
fix(consumer): use newer LeaveGroup as appropriate
Browse files Browse the repository at this point in the history
Use the correct version and format of LeaveGroup as determined by the
configured Version field.

Fixes #2486

Signed-off-by: Dominic Evans <[email protected]>
  • Loading branch information
dnwe committed Aug 2, 2023
1 parent a9126ad commit 5b9058f
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
userData: config.Consumer.Group.Member.UserData,
metricRegistry: newCleanupRegistry(config.MetricRegistry),
}
if client.Config().Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) {
cg.groupInstanceId = &client.Config().Consumer.Group.InstanceId
if config.Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) {
cg.groupInstanceId = &config.Consumer.Group.InstanceId
}
return cg, nil
}
Expand Down Expand Up @@ -556,32 +556,43 @@ func (c *consumerGroup) leave() error {
return err
}

// KIP-345 if groupInstanceId is set, don not leave group when consumer closed.
// Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now
if c.groupInstanceId == nil {
resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
GroupId: c.groupID,
// as per KIP-345 if groupInstanceId is set, i.e. static membership is in action, then do not leave group when consumer closed, just clear memberID
if c.groupInstanceId != nil {
c.memberID = ""
return nil
}
req := &LeaveGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
}
if c.config.Version.IsAtLeast(V0_11_0_0) {
req.Version = 1
}
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 2
}
if c.config.Version.IsAtLeast(V2_4_0_0) {
req.Version = 3
req.Members = append(req.Members, MemberIdentity{
MemberId: c.memberID,
})
if err != nil {
_ = coordinator.Close()
return err
}
}

// Unset memberID
c.memberID = ""
resp, err := coordinator.LeaveGroup(req)
if err != nil {
_ = coordinator.Close()
return err
}

// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
}
} else {
c.memberID = ""
// clear the memberID
c.memberID = ""

switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
}
return nil
}

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
Expand Down

0 comments on commit 5b9058f

Please sign in to comment.