Skip to content

Commit

Permalink
fix: loopCheckPartitionNumbers may not cause rebalance after adding n…
Browse files Browse the repository at this point in the history
…ew partitions

Signed-off-by: napallday <[email protected]>
  • Loading branch information
napallday committed Aug 1, 2023
1 parent bbee916 commit 51ff5be
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 10 deletions.
31 changes: 21 additions & 10 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ type ConsumerGroup interface {
type consumerGroup struct {
client Client

config *Config
consumer Consumer
groupID string
groupInstanceId *string
memberID string
errors chan error
config *Config
consumer Consumer
groupID string
groupInstanceId *string
memberID string
errors chan error
startTopicPartitions map[string][]int32

lock sync.Mutex
errorsLock sync.RWMutex
Expand Down Expand Up @@ -201,6 +202,16 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
if err := c.client.RefreshMetadata(topics...); err != nil {
return err
}
topicPartitions := make(map[string][]int32)
for _, topic := range topics {
partitions, err := c.client.Partitions(topic)
if err != nil {
return err
}
topicPartitions[topic] = partitions
}

c.startTopicPartitions = topicPartitions

// Init session
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
Expand Down Expand Up @@ -622,10 +633,10 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons
pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
defer session.cancel()
defer pause.Stop()
var oldTopicToPartitionNum map[string]int
var err error
if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
return
oldTopicToPartitionNum := make(map[string]int, len(c.startTopicPartitions))

for topic, partitions := range c.startTopicPartitions {
oldTopicToPartitionNum[topic] = len(partitions)
}
for {
if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
Expand Down
135 changes: 135 additions & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,138 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {

wg.Wait()
}

// TestConsumerGroupLoopCheckPartitionRace checks a very edge case that if
// between the first metadata refresh in group.Consume and loopCheckPartitionNumbers,
// another metadata refresh gets new partitions, the consumer group needs a rebalance.
// See issue #2460 for more details.
func TestConsumerGroupLoopCheckPartitionRace(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama]", log.LstdFlags)
config := NewTestConfig()
config.ClientID = t.Name()
config.Version = V2_0_0_0
config.Metadata.RefreshFrequency = 0 // disable backgroundMetadataUpdater
config.Metadata.Full = false // disable automatic metadata refresh when creating a client

broker0 := NewMockBroker(t, 0)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockSequence(
NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my-topic", 0, broker0.BrokerID()),
NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my-topic", 0, broker0.BrokerID()).
SetLeader("my-topic", 1, broker0.BrokerID()),
),
"OffsetRequest": NewMockSequence(
NewMockOffsetResponse(t).
SetOffset("my-topic", 0, OffsetOldest, 0).
SetOffset("my-topic", 0, OffsetNewest, 1).
SetOffset("my-topic", 1, OffsetOldest, 0).
SetOffset("my-topic", 1, OffsetNewest, 1),
),

"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, "my-group", broker0),
"HeartbeatRequest": NewMockHeartbeatResponse(t),
"JoinGroupRequest": NewMockSequence(
NewMockJoinGroupResponse(t).SetGroupProtocol(RangeBalanceStrategyName),
),
"SyncGroupRequest": NewMockSequence(
NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Topics: map[string][]int32{
"my-topic": {0},
},
}),
NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Topics: map[string][]int32{
"my-topic": {0, 1},
},
}),
),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(
"my-group", "my-topic", 0, 0, "", ErrNoError,
).SetOffset(
"my-group", "my-topic", 1, 0, "", ErrNoError,
).SetError(ErrNoError),
"FetchRequest": NewMockSequence(
NewMockFetchResponse(t, 1),
),
})

client, err := NewClient([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer func() { _ = client.Close() }()

group, err := NewConsumerGroupFromClient("my-group", client)
if err != nil {
t.Fatal(err)
}
defer func() { _ = group.Close() }()

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)

h := &loopCheckerHandler{t, cancel, ctx, group.(*consumerGroup)}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
topics := []string{"my-topic"}
for {
if err := group.Consume(ctx, topics, h); err != nil {
t.Error(err)
}
select {
case <-ctx.Done():
return
default:
}
}
}()
wg.Wait()
}

type loopCheckerHandler struct {
*testing.T
cancel context.CancelFunc
ctx context.Context
group *consumerGroup
}

func (h *loopCheckerHandler) Setup(s ConsumerGroupSession) error {
if len(s.Claims()["my-topic"]) == 1 {
if err := h.group.client.RefreshMetadata("my-topic"); err != nil {
h.Error(err)
}
// change RefreshFrequency so that loopCheckPartitionNumbers won't return directly
h.group.client.(*nopCloserClient).Client.(*client).conf.Metadata.RefreshFrequency = 5 * time.Millisecond
go h.group.loopCheckPartitionNumbers([]string{"my-topic"}, s.(*consumerGroupSession))
} else {
h.cancel()
}
return nil
}
func (h *loopCheckerHandler) Cleanup(_ ConsumerGroupSession) error {
return nil
}

func (h *loopCheckerHandler) ConsumeClaim(sess ConsumerGroupSession, _ ConsumerGroupClaim) error {
for {
select {
case <-sess.Context().Done():
return nil
case <-h.ctx.Done():
if len(sess.Claims()["my-topic"]) == 1 {
h.Error("test timeout")
}
return nil
}
}
}

0 comments on commit 51ff5be

Please sign in to comment.