Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: #1314 partition watcher doesn't reacts on partition number changing #1365

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 49 additions & 44 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (g *Generation) heartbeatLoop(interval time.Duration) {
// a bad spot and should rebalance. Commonly you will see an error here if there
// is a problem with the connection to the coordinator and a rebalance will
// establish a new connection to the coordinator.
func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
func (g *Generation) partitionWatcher(interval time.Duration, topic string, startPartitions int) {
g.Start(func(ctx context.Context) {
g.log(func(l Logger) {
l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
Expand All @@ -509,14 +509,6 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

ops, err := g.conn.readPartitions(topic)
if err != nil {
g.logError(func(l Logger) {
l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
})
return
}
oParts := len(ops)
for {
select {
case <-ctx.Done():
Expand All @@ -525,7 +517,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
ops, err := g.conn.readPartitions(topic)
switch {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
if len(ops) != startPartitions {
g.log(func(l Logger) {
l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
})
Expand Down Expand Up @@ -651,11 +643,17 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
}

cg := &ConsumerGroup{
config: config,
next: make(chan *Generation),
errs: make(chan error),
done: make(chan struct{}),
config: config,
partitionsPerTopic: make(map[string]int, len(config.Topics)),
next: make(chan *Generation),
errs: make(chan error),
done: make(chan struct{}),
}

for _, topic := range config.Topics {
cg.partitionsPerTopic[topic] = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this being set to zero instead of the actual value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petedannemann
This code can be removed. I forgot that I fill this map in the assignTopicPartitions function and only after that I run partitionWatcher

Should I remove this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if it's not necessary let's remove this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petedannemann
Removed this part of code

}

cg.wg.Add(1)
go func() {
cg.run()
Expand All @@ -670,9 +668,10 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
// Generation is where partition assignments and offset management occur.
// Callers will use Next to get a handle to the Generation.
type ConsumerGroup struct {
config ConsumerGroupConfig
next chan *Generation
errs chan error
config ConsumerGroupConfig
partitionsPerTopic map[string]int
next chan *Generation
errs chan error

closeOnce sync.Once
wg sync.WaitGroup
Expand Down Expand Up @@ -789,25 +788,21 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
}
defer conn.Close()

var generationID int32
var groupAssignments GroupMemberAssignments
var assignments map[string][]int32

// join group. this will join the group and prepare assignments if our
// consumer is elected leader. it may also change or assign the member ID.
memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
memberID, generationID, groupAssignments, iAmLeader, err := cg.joinGroup(conn, memberID)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to join group %s: %v", cg.config.ID, err)
})
return memberID, err
}
cg.withLogger(func(log Logger) {
log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
log.Printf("Joined group %s as member %s (leader %t) in generation %d", cg.config.ID, memberID, iAmLeader, generationID)
})

// sync group
assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
assignments, err := cg.syncGroup(conn, memberID, generationID, groupAssignments)
if err != nil {
cg.withErrorLogger(func(log Logger) {
log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
Expand Down Expand Up @@ -843,9 +838,9 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
// any of these functions exit, then the generation is determined to be
// complete.
gen.heartbeatLoop(cg.config.HeartbeatInterval)
if cg.config.WatchPartitionChanges {
for _, topic := range cg.config.Topics {
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
if cg.config.WatchPartitionChanges && iAmLeader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide where you found this in the Kafka docs or code base? I guess this makes sense as the leader will trigger a consumer balance, which will let follows pick up partition changes

Copy link
Author

@arxon31 arxon31 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petedannemann
So it's my mistake, when I said that kafka says that this is correct. I misunderstood Jun Rao from this video. He did not say who exactly should run the partition watcher mechanism. But it still seems to me that partitionWatcher should only be launched by the leader. This way we create less load on the cluster and if leader disconnects broker also triggers rebalance for group and leader election

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about this causing some unintended regression. Could we break that change out into a separate PR and let us think it through for a bit?

Copy link
Author

@arxon31 arxon31 Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petedannemann
Removed from this PR
Should I make an issue where we can discuss this or just a new PR?

for topic, startPartitions := range cg.partitionsPerTopic {
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic, startPartitions)
}
}

Expand Down Expand Up @@ -925,24 +920,24 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, bool, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
return "", 0, nil, err
return "", 0, nil, false, err
}

response, err := conn.joinGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
if err != nil {
return "", 0, nil, err
return "", 0, nil, false, err
}

memberID = response.MemberID
Expand All @@ -953,10 +948,12 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
})

var assignments GroupMemberAssignments
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
iAmLeader := response.MemberID == response.LeaderID

if iAmLeader {
v, err := cg.assignTopicPartitions(conn, response)
if err != nil {
return memberID, 0, nil, err
return memberID, 0, nil, false, err
}
assignments = v

Expand All @@ -973,7 +970,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
})

return memberID, generationID, assignments, nil
return memberID, generationID, assignments, iAmLeader, nil
}

// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
Expand Down Expand Up @@ -1036,6 +1033,14 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
return nil, err
}

// resetting old values of partitions per topic
cg.partitionsPerTopic = make(map[string]int, len(cg.config.Topics))

// setting new values of partitions per topic
for _, partition := range partitions {
cg.partitionsPerTopic[partition.Topic] += 1
}

cg.withLogger(func(l Logger) {
l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
for _, member := range members {
Expand Down Expand Up @@ -1073,11 +1078,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
Expand Down
2 changes: 1 addition & 1 deletion consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func TestGenerationExitsOnPartitionChange(t *testing.T) {

done := make(chan struct{})
go func() {
gen.partitionWatcher(watchTime, "topic-1")
gen.partitionWatcher(watchTime, "topic-1", 1)
close(done)
}()

Expand Down