Skip to content

Commit

Permalink
feat[POC]: incremental cooperative balance strategy
Browse files Browse the repository at this point in the history
Signed-off-by: napallday <[email protected]>
  • Loading branch information
napallday committed Aug 20, 2023
1 parent 8681621 commit 1c8e617
Show file tree
Hide file tree
Showing 16 changed files with 1,569 additions and 515 deletions.
128 changes: 128 additions & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"

CooperativeStickyBalanceStrategyName = "cooperative-sticky"

defaultGeneration = -1
)

Expand All @@ -40,6 +42,45 @@ func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {

// --------------------------------------------------------------------

type RebalanceProtocol int

const (
EAGER RebalanceProtocol = iota
COOPERATIVE
)

func (p RebalanceProtocol) String() string {
switch p {
case EAGER:
return "EAGER"
case COOPERATIVE:
return "COOPERATIVE"
default:
return "UNKNOWN"
}
}

type RebalanceProtocolSlice []RebalanceProtocol

func (p RebalanceProtocolSlice) Len() int { return len(p) }
func (p RebalanceProtocolSlice) Less(i, j int) bool { return p[i] < p[j] }
func (p RebalanceProtocolSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

func (p RebalanceProtocolSlice) retainAll(p2 RebalanceProtocolSlice) RebalanceProtocolSlice {
var result RebalanceProtocolSlice
set := make(map[RebalanceProtocol]bool, len(p2))
for _, v := range p2 {
set[v] = true
}

for _, v := range p {
if set[v] {
result = append(result, v)
}
}
return result
}

// BalanceStrategy is used to balance topics and partitions
// across members of a consumer group
type BalanceStrategy interface {
Expand All @@ -53,6 +94,8 @@ type BalanceStrategy interface {
// AssignmentData returns the serialized assignment data for the specified
// memberID
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)

SupportedProtocols() RebalanceProtocolSlice
}

// --------------------------------------------------------------------
Expand Down Expand Up @@ -111,6 +154,12 @@ func NewBalanceStrategySticky() BalanceStrategy {
// Deprecated: use NewBalanceStrategySticky to avoid data race issue
var BalanceStrategySticky = NewBalanceStrategySticky()

func NewBalanceStrategyCooperativeSticky() BalanceStrategy {
return &cooperativeStickyBalanceStrategy{
stickyBalanceStrategy: &stickyBalanceStrategy{},
}
}

// --------------------------------------------------------------------

type balanceStrategy struct {
Expand Down Expand Up @@ -161,6 +210,10 @@ func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]in
return nil, nil
}

func (s *balanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

type stickyBalanceStrategy struct {
movements partitionMovements
}
Expand Down Expand Up @@ -279,6 +332,10 @@ func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[strin
}, nil)
}

func (s *stickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
Expand Down Expand Up @@ -343,6 +400,73 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

type cooperativeStickyBalanceStrategy struct {
stickyBalanceStrategy *stickyBalanceStrategy
}

func (cs *cooperativeStickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
assignments, err := cs.stickyBalanceStrategy.Plan(members, topics)
if err != nil {
return nil, err
}
partitionsTransferredOwnership := computePartitionsTransferringOwnership(members, assignments)
return adjustAssignment(assignments, partitionsTransferredOwnership), nil
}

// Following the cooperative rebalancing protocol requires removing partitions that must first be revoked from the assignment
func adjustAssignment(assignments BalanceStrategyPlan, partitionsTransferredOwnership map[topicPartitionAssignment]bool) BalanceStrategyPlan {
newAssignments := make(BalanceStrategyPlan)
for memberID, assignment := range assignments {
newAssignments[memberID] = make(map[string][]int32)
for topic, partitions := range assignment {
for _, partition := range partitions {
tp := topicPartitionAssignment{Topic: topic, Partition: partition}
if !partitionsTransferredOwnership[tp] {
newAssignments[memberID][topic] = append(newAssignments[memberID][topic], partition)
}
}
}
}
return newAssignments
}

func computePartitionsTransferringOwnership(members map[string]ConsumerGroupMemberMetadata, assignments BalanceStrategyPlan) map[topicPartitionAssignment]bool {
partitionsTransferringOwnership := make(map[topicPartitionAssignment]bool)
previousAssignmentSet := make(map[topicPartitionAssignment]string)
for prevMember, metadata := range members {
for _, ownedTopicPartitions := range metadata.OwnedPartitions {
for _, partition := range ownedTopicPartitions.Partitions {
previousAssignmentSet[topicPartitionAssignment{Topic: ownedTopicPartitions.Topic, Partition: partition}] = prevMember
}
}
}

for currMember, assignment := range assignments {
for topic, partitions := range assignment {
for _, partition := range partitions {
tp := topicPartitionAssignment{Topic: topic, Partition: partition}
prevMember, exist := previousAssignmentSet[tp]
if exist && prevMember != currMember {
partitionsTransferringOwnership[tp] = true
}
}
}
}
return partitionsTransferringOwnership
}

func (cs *cooperativeStickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return cs.stickyBalanceStrategy.AssignmentData(memberID, topics, generationID)
}

func (cs *cooperativeStickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{COOPERATIVE, EAGER}
}

func (cs *cooperativeStickyBalanceStrategy) Name() string {
return CooperativeStickyBalanceStrategyName
}

// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
// which assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
Expand Down Expand Up @@ -416,6 +540,10 @@ func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][
return nil, nil // do nothing for now
}

func (b *roundRobinBalancer) SupportedProtocols() RebalanceProtocolSlice {
return []RebalanceProtocol{EAGER}
}

type topicAndPartition struct {
topic string
partition int32
Expand Down
Loading

0 comments on commit 1c8e617

Please sign in to comment.