Skip to content

Commit

Permalink
feat(consumer): incremental cooperative balance strategy
Browse files Browse the repository at this point in the history
Add support for the incremental cooperative protocol as outlined in
KIP-429

https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol

Signed-off-by: napallday <[email protected]>
  • Loading branch information
napallday committed Sep 12, 2023
1 parent 98ec384 commit 4c83bd0
Show file tree
Hide file tree
Showing 16 changed files with 1,651 additions and 58 deletions.
197 changes: 186 additions & 11 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"

CooperativeStickyBalanceStrategyName = "cooperative-sticky"

defaultGeneration = -1
)

Expand All @@ -40,6 +42,45 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {

// --------------------------------------------------------------------

type RebalanceProtocol int

const (
EAGER RebalanceProtocol = iota
COOPERATIVE
)

func (p RebalanceProtocol) String() string {
switch p {
case EAGER:
return "EAGER"
case COOPERATIVE:
return "COOPERATIVE"
default:
return "UNKNOWN"
}
}

type RebalanceProtocolSlice []RebalanceProtocol

func (p RebalanceProtocolSlice) Len() int { return len(p) }
func (p RebalanceProtocolSlice) Less(i, j int) bool { return p[i] < p[j] }
func (p RebalanceProtocolSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func (p RebalanceProtocolSlice) retainAll(p2 RebalanceProtocolSlice) RebalanceProtocolSlice {
var result RebalanceProtocolSlice
set := make(map[RebalanceProtocol]bool, len(p2))
for _, v := range p2 {
set[v] = true
}

for _, v := range p {
if set[v] {
result = append(result, v)
}
}
return result
}

// BalanceStrategy is used to balance topics and partitions
// across members of a consumer group
type BalanceStrategy interface {
Expand All @@ -53,6 +94,8 @@ type BalanceStrategy interface {
// AssignmentData returns the serialized assignment data for the specified
// memberID
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)

SupportedProtocols() RebalanceProtocolSlice
}

// --------------------------------------------------------------------
Expand Down Expand Up @@ -111,6 +154,14 @@ func NewBalanceStrategySticky() BalanceStrategy {
// Deprecated: use NewBalanceStrategySticky to avoid data race issue
var BalanceStrategySticky = NewBalanceStrategySticky()

func NewBalanceStrategyCooperativeSticky() BalanceStrategy {
cs := &cooperativeStickyBalanceStrategy{
stickyBalanceStrategy: &stickyBalanceStrategy{},
}
cs.stickyBalanceStrategy.parent = cs
return cs
}

// --------------------------------------------------------------------

type balanceStrategy struct {
Expand Down Expand Up @@ -161,8 +212,13 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in
return nil, nil
}

func (s *balanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

type stickyBalanceStrategy struct {
movements partitionMovements
parent *cooperativeStickyBalanceStrategy
}

// Name implements BalanceStrategy.
Expand All @@ -177,7 +233,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
}

// prepopulate the current assignment state from userdata on the consumer group members
currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
currentAssignment, prevAssignment, err := s.prepopulateCurrentAssignments(members)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,6 +335,10 @@ func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[strin
}, nil)
}

func (s *stickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
Expand Down Expand Up @@ -343,6 +403,75 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

type cooperativeStickyBalanceStrategy struct {
stickyBalanceStrategy *stickyBalanceStrategy
}

func (cs *cooperativeStickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
assignments, err := cs.stickyBalanceStrategy.Plan(members, topics)
if err != nil {
return nil, err
}
partitionsTransferredOwnership := computePartitionsTransferringOwnership(members, assignments)
return adjustAssignment(assignments, partitionsTransferredOwnership), nil
}

// Following the cooperative rebalancing protocol requires removing partitions that must first be revoked from the assignment
func adjustAssignment(assignments BalanceStrategyPlan, partitionsTransferredOwnership map[topicPartitionAssignment]bool) BalanceStrategyPlan {
newAssignments := make(BalanceStrategyPlan)
for memberID, assignment := range assignments {
newAssignments[memberID] = make(map[string][]int32)
for topic, partitions := range assignment {
for _, partition := range partitions {
tp := topicPartitionAssignment{Topic: topic, Partition: partition}
if !partitionsTransferredOwnership[tp] {
newAssignments[memberID][topic] = append(newAssignments[memberID][topic], partition)
}
}
}
}
return newAssignments
}

func computePartitionsTransferringOwnership(members map[string]ConsumerGroupMemberMetadata, assignments BalanceStrategyPlan) map[topicPartitionAssignment]bool {
partitionsTransferringOwnership := make(map[topicPartitionAssignment]bool)
previousAssignmentSet := make(map[topicPartitionAssignment]string)
for prevMember, metadata := range members {
for _, ownedTopicPartitions := range metadata.OwnedPartitions {
for _, partition := range ownedTopicPartitions.Partitions {
previousAssignmentSet[topicPartitionAssignment{Topic: ownedTopicPartitions.Topic, Partition: partition}] = prevMember
}
}
}

for currMember, assignment := range assignments {
for topic, partitions := range assignment {
for _, partition := range partitions {
tp := topicPartitionAssignment{Topic: topic, Partition: partition}
prevMember, exist := previousAssignmentSet[tp]
if exist && prevMember != currMember {
partitionsTransferringOwnership[tp] = true
}
}
}
}
return partitionsTransferringOwnership
}

func (cs *cooperativeStickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return encode(&CooperativeStickyAssignorUserDataV0{
Generation: generationID,
}, nil)
}

func (cs *cooperativeStickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{COOPERATIVE, EAGER}
}

func (cs *cooperativeStickyBalanceStrategy) Name() string {
return CooperativeStickyBalanceStrategyName
}

// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
// which assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
Expand Down Expand Up @@ -416,6 +545,10 @@ func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][
return nil, nil // do nothing for now
}

func (b *roundRobinBalancer) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

type topicAndPartition struct {
topic string
partition int32
Expand Down Expand Up @@ -619,7 +752,7 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti
}

// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
func deserializeStickyUserData(userDataBytes []byte) (MemberData, error) {
userDataV1 := &StickyAssignorUserDataV1{}
if err := decode(userDataBytes, userDataV1, nil); err != nil {
userDataV0 := &StickyAssignorUserDataV0{}
Expand All @@ -631,6 +764,37 @@ func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUs
return userDataV1, nil
}

func deserializeCooperativeStickyMemberData(metadata ConsumerGroupMemberMetadata) (MemberData, error) {
// metadata.OwnedPartitions to []topicPartitionAssignment
var partitions []topicPartitionAssignment
for _, ownedPartitions := range metadata.OwnedPartitions {
for _, ownedPartition := range ownedPartitions.Partitions {
partitions = append(partitions, topicPartitionAssignment{Topic: ownedPartitions.Topic, Partition: ownedPartition})
}
}

if metadata.Version >= 2 {
return &CooperativeStickyMemberData{
PartitionsAssignments: partitions,
Generation: metadata.GenerationID,
}, nil
}

userDataV0 := &CooperativeStickyAssignorUserDataV0{}
if err := decode(metadata.UserData, userDataV0, nil); err != nil {
return nil, err
}

memberData := &CooperativeStickyMemberData{
PartitionsAssignments: partitions,
Generation: userDataV0.Generation,
}
if metadata.RackID != nil {
memberData.RackID = *metadata.RackID
}
return memberData, nil
}

// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
// to those topic partitions currently reported by the Kafka cluster.
func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
Expand Down Expand Up @@ -832,35 +996,46 @@ func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitio
// We need to process subscriptions' user data with each consumer's reported generation in mind
// higher generations overwrite lower generations in case of a conflict
// note that a conflict could exist only if user data is for different generations
func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
func (s *stickyBalanceStrategy) prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
currentAssignment := make(map[string][]topicPartitionAssignment)
prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)

var err error
var memberData MemberData
// for each partition we create a sorted map of its consumers by generation
sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
for memberID, meta := range members {
consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
if len(meta.UserData) == 0 {
continue
}
if s.parent != nil {
// cooperative sticky assignor
memberData, err = deserializeCooperativeStickyMemberData(meta)
} else {
// sticky assignor
memberData, err = deserializeStickyUserData(meta.UserData)
}
if err != nil {
return nil, nil, err
}
for _, partition := range consumerUserData.partitions() {
for _, partition := range memberData.partitions() {
if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
if consumerUserData.hasGeneration() {
if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
if memberData.hasGeneration() {
if _, generationExists := consumers[memberData.generation()]; generationExists {
// same partition is assigned to two consumers during the same rebalance.
// log a warning and skip this record
Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, memberData.generation())
continue
} else {
consumers[consumerUserData.generation()] = memberID
consumers[memberData.generation()] = memberID
}
} else {
consumers[defaultGeneration] = memberID
}
} else {
generation := defaultGeneration
if consumerUserData.hasGeneration() {
generation = consumerUserData.generation()
if memberData.hasGeneration() {
generation = memberData.generation()
}
sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
}
Expand Down
19 changes: 12 additions & 7 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
}
}

func Test_deserializeTopicPartitionAssignment(t *testing.T) {
func Test_deserializeStickyUserData(t *testing.T) {
type args struct {
userDataBytes []byte
}
tests := []struct {
name string
args args
want StickyAssignorUserData
want MemberData
wantErr bool
}{
{
Expand Down Expand Up @@ -271,18 +271,22 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
got, err := deserializeStickyUserData(tt.args.userDataBytes)
if (err != nil) != tt.wantErr {
t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("deserializeStickyUserData() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
t.Errorf("deserializeStickyUserData() = %v, want %v", got, tt.want)
}
})
}
}

func Test_deserializeCooperativeStickyUserData(t *testing.T) {
// todo: add unit test
}

func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
strategy := NewBalanceStrategyRoundRobin()

Expand Down Expand Up @@ -443,14 +447,15 @@ func Test_prepopulateCurrentAssignments(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
_, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)
s := &stickyBalanceStrategy{}
_, gotPrevAssignments, err := s.prepopulateCurrentAssignments(tt.args.members)

if (err != nil) != tt.wantErr {
t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
}

if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
t.Errorf("deserializeStickyUserData() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
}
})
}
Expand Down
Loading

0 comments on commit 4c83bd0

Please sign in to comment.