diff --git a/jest.config.js b/jest.config.js index f60f269..b99ecff 100644 --- a/jest.config.js +++ b/jest.config.js @@ -3,7 +3,6 @@ module.exports = { moduleNameMapper: { testHelpers: '/testHelpers/index.js', }, - setupFilesAfterEnv: ['/testHelpers/setup.js'], testPathIgnorePatterns: ['/node_modules/'], testRunner: 'jest-circus/runner', testEnvironment: 'node', diff --git a/package.json b/package.json index fbafa43..539291b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sweet-security/kafkas", - "version": "0.1.5", + "version": "0.1.6", "description": "An kafkajs decorator with cooperative sticky assigner support", "main": "index.js", "types": "types/index.d.ts", diff --git a/src/consumer/assigners/cooperativeStickyAssigner/cooperativeStickyAssigner.spec.js b/src/consumer/assigners/cooperativeStickyAssigner/cooperativeStickyAssigner.spec.js new file mode 100644 index 0000000..da13047 --- /dev/null +++ b/src/consumer/assigners/cooperativeStickyAssigner/cooperativeStickyAssigner.spec.js @@ -0,0 +1,83 @@ +const { + extractTopicPartitions, + hasImbalance, + unloadOverloadedMembers, + getUnassignedPartitions, + assignUnassignedPartitions, + calculateAvgPartitions, +} = require('./utils') + +describe('CooperativeSticky', () => { + const members = [ + { memberId: 'member0' }, + { memberId: 'member1' }, + { memberId: 'member2' }, + { memberId: 'member3' }, + { memberId: 'member4' }, + { memberId: 'member5' }, + ] + const topics = ['topic1', 'topic2', 'topic3'] + const allTopicsPartitions = { + topic1: [0, 1, 2, 3, 4, 5], + topic2: [0, 1, 2, 3, 4, 5], + topic3: [0, 1, 2, 3, 4, 5], + } + + let assignment, topicsPartitions + + beforeEach(() => { + topicsPartitions = topics.flatMap(topic => { + const partitionsMetadata = allTopicsPartitions[topic] + return partitionsMetadata.map(partitionMetadata => ({ + topic, + partitionId: partitionMetadata, + })) + }) + + assignment = {} + + for (const member of members) { + assignment[member.memberId] = {} + } + }) + + it('avgPartitions', async () => { + const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length) + expect(avgPartitions).toEqual(3) + expect(hasImbalance(assignment, avgPartitions)).toBeFalsy() + }) + + it('first assignment no imbalance', async () => { + const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length) + expect(hasImbalance(assignment, avgPartitions)).toBeFalsy() + }) + + it('first assignment unassigned partitions', async () => { + const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions) + expect(unassignedPartitions).toEqual(topicsPartitions) + }) + + it('first assignment sanity', async () => { + const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions) + assignUnassignedPartitions(assignment, unassignedPartitions, members) + for (const member of members) { + expect(assignment[member.memberId]).toBeDefined() + } + }) + + it('first assignment is balanced', async () => { + const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length) + const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions) + assignUnassignedPartitions(assignment, unassignedPartitions, members) + for (const member of members) { + const assignedMemberPartitions = assignment[member.memberId] + let assignedMemberPartitionsCount = 0 + + for (const topic in assignedMemberPartitions) { + assignedMemberPartitionsCount += assignedMemberPartitions[topic].length + } + + expect(assignedMemberPartitionsCount).toEqual(avgPartitions) + } + }) +}) diff --git a/src/consumer/assigners/cooperativeStickyAssigner/index.js b/src/consumer/assigners/cooperativeStickyAssigner/index.js index ab80252..af84e2b 100644 --- a/src/consumer/assigners/cooperativeStickyAssigner/index.js +++ b/src/consumer/assigners/cooperativeStickyAssigner/index.js @@ -1,11 +1,12 @@ const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol') const { + extractTopicPartitions, + calculateAvgPartitions, hasImbalance, unloadOverloadedMembers, getUnassignedPartitions, - getMemberAssignedPartitionCount, + assignUnassignedPartitions, } = require('./utils') -const { minBy } = require('lodash') /** * CooperativeStickyAssigner @@ -15,7 +16,6 @@ module.exports = ({ cluster }) => ({ name: 'CooperativeStickyAssigner', version: 0, async assign({ members, topics, currentAssignment }) { - const membersCount = members.length const assignment = {} // // Initialize assignment map for each member @@ -24,38 +24,17 @@ module.exports = ({ cluster }) => ({ } // Step 0: Fetch current partition metadata for topics - const topicsPartitions = topics.flatMap(topic => { - const partitionsMetadata = cluster.findTopicPartitionMetadata(topic) - return partitionsMetadata.map(partitionMetadata => ({ - topic, - partitionId: partitionMetadata.partitionId, - })) - }) + const topicsPartitions = extractTopicPartitions(topics, cluster) // Step 1: Detect imbalance and redistribute partitions if necessary - const totalPartitions = topicsPartitions.length - const avgPartitions = Math.ceil(totalPartitions / membersCount) + const avgPartitions = calculateAvgPartitions(topicsPartitions.length, members.length) if (hasImbalance(assignment, avgPartitions)) { unloadOverloadedMembers(assignment, avgPartitions) } + // Step 2: If not already assigned, distribute using round-robin balancing const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions) - for (const unassignedPartition of unassignedPartitions) { - const memberWithLeastPartitions = minBy(members, member => - getMemberAssignedPartitionCount(assignment, member.memberId) - )?.memberId - - if (!memberWithLeastPartitions) { - continue - } - - if (!assignment[memberWithLeastPartitions][unassignedPartition.topic]) { - assignment[memberWithLeastPartitions][unassignedPartition.topic] = [] - } - assignment[memberWithLeastPartitions][unassignedPartition.topic].push( - unassignedPartition.partitionId - ) - } + assignUnassignedPartitions(assignment, unassignedPartitions, members) return encodeAssignment(assignment, this.version) }, diff --git a/src/consumer/assigners/cooperativeStickyAssigner/utils.js b/src/consumer/assigners/cooperativeStickyAssigner/utils.js index b602d49..121b114 100644 --- a/src/consumer/assigners/cooperativeStickyAssigner/utils.js +++ b/src/consumer/assigners/cooperativeStickyAssigner/utils.js @@ -1,4 +1,19 @@ -const { minBy } = require('lodash') +const { maxBy, minBy } = require('lodash') + +const extractTopicPartitions = (topics, cluster) => { + return topics.flatMap(topic => { + const partitionsMetadata = cluster.findTopicPartitionMetadata(topic) + return partitionsMetadata.map(partitionMetadata => ({ + topic, + partitionId: partitionMetadata.partitionId, + })) + }) +} + +const calculateAvgPartitions = (topicsPartitionsCount, membersCount) => { + return Math.ceil(topicsPartitionsCount / membersCount); +} + const hasImbalance = (assignment, avgPartitions) => { return Object.values(assignment).some( topicPartitions => Object.values(topicPartitions).flat().length > avgPartitions @@ -15,8 +30,8 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => { let partitionsRemovedCount = 0 while (partitionsRemovedCount < partitionsToRemove) { - // Sort by partition count - const [topic, partitions] = minBy(Object.entries(assignment[memberId]), [ + // Find the topic with the largest partition count in the member assignment + const [topicWithMostPartitions, partitions] = maxBy(Object.entries(assignment[memberId]), [ ([_, assignedTopicPartitions]) => assignedTopicPartitions.length, ]) @@ -24,11 +39,11 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => { partitionsRemovedCount++ if (removedPartitionId !== undefined) { - removedPartitions.push({ partitionId: removedPartitionId, topic }) + removedPartitions.push({ partitionId: removedPartitionId, topic: topicWithMostPartitions }) } if (partitions.length === 0) { - delete assignment[memberId][topic] + delete assignment[memberId][topicWithMostPartitions] } } } @@ -37,6 +52,25 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => { return removedPartitions } +const assignUnassignedPartitions = (assignment, unassignedPartitions, members) => { + for (const unassignedPartition of unassignedPartitions) { + const memberWithLeastPartitions = minBy(members, member => + getMemberAssignedPartitionCount(assignment, member.memberId) + )?.memberId + + if (!memberWithLeastPartitions) { + continue + } + + if (!assignment[memberWithLeastPartitions][unassignedPartition.topic]) { + assignment[memberWithLeastPartitions][unassignedPartition.topic] = [] + } + assignment[memberWithLeastPartitions][unassignedPartition.topic].push( + unassignedPartition.partitionId + ) + } +} + const getMemberAssignedPartitionCount = (assignment, memberId) => { return Object.values(assignment[memberId] || {}).flat().length } @@ -52,8 +86,10 @@ const getUnassignedPartitions = (currentAssignment, topicsPartitions) => { } module.exports = { + extractTopicPartitions, + calculateAvgPartitions, getUnassignedPartitions, - getMemberAssignedPartitionCount, unloadOverloadedMembers, hasImbalance, + assignUnassignedPartitions, }