Skip to content

Commit

Permalink
Merge pull request #26 from sweet-security/0.1.6
Browse files Browse the repository at this point in the history
0.1.6
  • Loading branch information
TsurEdoe authored Sep 23, 2024
2 parents 312bd53 + 0d55c48 commit 632d178
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 36 deletions.
1 change: 0 additions & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module.exports = {
moduleNameMapper: {
testHelpers: '<rootDir>/testHelpers/index.js',
},
setupFilesAfterEnv: ['<rootDir>/testHelpers/setup.js'],
testPathIgnorePatterns: ['/node_modules/'],
testRunner: 'jest-circus/runner',
testEnvironment: 'node',
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sweet-security/kafkas",
"version": "0.1.5",
"version": "0.1.6",
"description": "An kafkajs decorator with cooperative sticky assigner support",
"main": "index.js",
"types": "types/index.d.ts",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
const {
extractTopicPartitions,
hasImbalance,
unloadOverloadedMembers,
getUnassignedPartitions,
assignUnassignedPartitions,
calculateAvgPartitions,
} = require('./utils')

describe('CooperativeSticky', () => {
const members = [
{ memberId: 'member0' },
{ memberId: 'member1' },
{ memberId: 'member2' },
{ memberId: 'member3' },
{ memberId: 'member4' },
{ memberId: 'member5' },
]
const topics = ['topic1', 'topic2', 'topic3']
const allTopicsPartitions = {
topic1: [0, 1, 2, 3, 4, 5],
topic2: [0, 1, 2, 3, 4, 5],
topic3: [0, 1, 2, 3, 4, 5],
}

let assignment, topicsPartitions

beforeEach(() => {
topicsPartitions = topics.flatMap(topic => {
const partitionsMetadata = allTopicsPartitions[topic]
return partitionsMetadata.map(partitionMetadata => ({
topic,
partitionId: partitionMetadata,
}))
})

assignment = {}

for (const member of members) {
assignment[member.memberId] = {}
}
})

it('avgPartitions', async () => {
const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length)
expect(avgPartitions).toEqual(3)
expect(hasImbalance(assignment, avgPartitions)).toBeFalsy()
})

it('first assignment no imbalance', async () => {
const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length)
expect(hasImbalance(assignment, avgPartitions)).toBeFalsy()
})

it('first assignment unassigned partitions', async () => {
const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions)
expect(unassignedPartitions).toEqual(topicsPartitions)
})

it('first assignment sanity', async () => {
const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions)
assignUnassignedPartitions(assignment, unassignedPartitions, members)
for (const member of members) {
expect(assignment[member.memberId]).toBeDefined()
}
})

it('first assignment is balanced', async () => {
const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length)
const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions)
assignUnassignedPartitions(assignment, unassignedPartitions, members)
for (const member of members) {
const assignedMemberPartitions = assignment[member.memberId]
let assignedMemberPartitionsCount = 0

for (const topic in assignedMemberPartitions) {
assignedMemberPartitionsCount += assignedMemberPartitions[topic].length
}

expect(assignedMemberPartitionsCount).toEqual(avgPartitions)
}
})
})
35 changes: 7 additions & 28 deletions src/consumer/assigners/cooperativeStickyAssigner/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol')
const {
extractTopicPartitions,
calculateAvgPartitions,
hasImbalance,
unloadOverloadedMembers,
getUnassignedPartitions,
getMemberAssignedPartitionCount,
assignUnassignedPartitions,
} = require('./utils')
const { minBy } = require('lodash')

/**
* CooperativeStickyAssigner
Expand All @@ -15,7 +16,6 @@ module.exports = ({ cluster }) => ({
name: 'CooperativeStickyAssigner',
version: 0,
async assign({ members, topics, currentAssignment }) {
const membersCount = members.length
const assignment = {}

// // Initialize assignment map for each member
Expand All @@ -24,38 +24,17 @@ module.exports = ({ cluster }) => ({
}

// Step 0: Fetch current partition metadata for topics
const topicsPartitions = topics.flatMap(topic => {
const partitionsMetadata = cluster.findTopicPartitionMetadata(topic)
return partitionsMetadata.map(partitionMetadata => ({
topic,
partitionId: partitionMetadata.partitionId,
}))
})
const topicsPartitions = extractTopicPartitions(topics, cluster)

// Step 1: Detect imbalance and redistribute partitions if necessary
const totalPartitions = topicsPartitions.length
const avgPartitions = Math.ceil(totalPartitions / membersCount)
const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length)
if (hasImbalance(assignment, avgPartitions)) {
unloadOverloadedMembers(assignment, avgPartitions)
}

// Step 2: If not already assigned, distribute using round-robin balancing
const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions)
for (const unassignedPartition of unassignedPartitions) {
const memberWithLeastPartitions = minBy(members, member =>
getMemberAssignedPartitionCount(assignment, member.memberId)
)?.memberId

if (!memberWithLeastPartitions) {
continue
}

if (!assignment[memberWithLeastPartitions][unassignedPartition.topic]) {
assignment[memberWithLeastPartitions][unassignedPartition.topic] = []
}
assignment[memberWithLeastPartitions][unassignedPartition.topic].push(
unassignedPartition.partitionId
)
}
assignUnassignedPartitions(assignment, unassignedPartitions, members)

return encodeAssignment(assignment, this.version)
},
Expand Down
48 changes: 42 additions & 6 deletions src/consumer/assigners/cooperativeStickyAssigner/utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
const { minBy } = require('lodash')
const { maxBy, minBy } = require('lodash')

const extractTopicPartitions = (topics, cluster) => {
return topics.flatMap(topic => {
const partitionsMetadata = cluster.findTopicPartitionMetadata(topic)
return partitionsMetadata.map(partitionMetadata => ({
topic,
partitionId: partitionMetadata.partitionId,
}))
})
}

const calculateAvgPartitions = (topicsPartitionsCount, membersCount) => {
return Math.ceil(topicsPartitionsCount / membersCount);
}

const hasImbalance = (assignment, avgPartitions) => {
return Object.values(assignment).some(
topicPartitions => Object.values(topicPartitions).flat().length > avgPartitions
Expand All @@ -15,20 +30,20 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => {
let partitionsRemovedCount = 0

while (partitionsRemovedCount < partitionsToRemove) {
// Sort by partition count
const [topic, partitions] = minBy(Object.entries(assignment[memberId]), [
// Find the topic with the largest partition count in the member assignment
const [topicWithMostPartitions, partitions] = maxBy(Object.entries(assignment[memberId]), [
([_, assignedTopicPartitions]) => assignedTopicPartitions.length,
])

const removedPartitionId = partitions.pop()
partitionsRemovedCount++

if (removedPartitionId !== undefined) {
removedPartitions.push({ partitionId: removedPartitionId, topic })
removedPartitions.push({ partitionId: removedPartitionId, topic: topicWithMostPartitions })
}

if (partitions.length === 0) {
delete assignment[memberId][topic]
delete assignment[memberId][topicWithMostPartitions]
}
}
}
Expand All @@ -37,6 +52,25 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => {
return removedPartitions
}

const assignUnassignedPartitions = (assignment, unassignedPartitions, members) => {
for (const unassignedPartition of unassignedPartitions) {
const memberWithLeastPartitions = minBy(members, member =>
getMemberAssignedPartitionCount(assignment, member.memberId)
)?.memberId

if (!memberWithLeastPartitions) {
continue
}

if (!assignment[memberWithLeastPartitions][unassignedPartition.topic]) {
assignment[memberWithLeastPartitions][unassignedPartition.topic] = []
}
assignment[memberWithLeastPartitions][unassignedPartition.topic].push(
unassignedPartition.partitionId
)
}
}

const getMemberAssignedPartitionCount = (assignment, memberId) => {
return Object.values(assignment[memberId] || {}).flat().length
}
Expand All @@ -52,8 +86,10 @@ const getUnassignedPartitions = (currentAssignment, topicsPartitions) => {
}

module.exports = {
extractTopicPartitions,
calculateAvgPartitions,
getUnassignedPartitions,
getMemberAssignedPartitionCount,
unloadOverloadedMembers,
hasImbalance,
assignUnassignedPartitions,
}

0 comments on commit 632d178

Please sign in to comment.