diff --git a/docs/Producing.md b/docs/Producing.md index bb9a9b47a..a72897053 100644 --- a/docs/Producing.md +++ b/docs/Producing.md @@ -213,16 +213,20 @@ kafka.producer({ createPartitioner: MyPartitioner }) ### Default Partitioners -KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `JavaCompatiblePartitioner`. - -The `JavaCompatiblePartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics. - -Use the `JavaCompatiblePartitioner` by importing it and providing it to the Producer constructor: - -```javascript -const { Partitioners } = require('kafkajs') -kafka.producer({ createPartitioner: Partitioners.JavaCompatiblePartitioner }) -``` +KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `LegacyPartitioner`. + +The `DefaultPartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics. + +> 🚨 **Important** 🚨 +> +> **The `LegacyPartitioner` was the default until v2.0.0. If you are upgrading from a version +older and want to retain the previous partitioning behavior, use the `LegacyPartitioner` +by importing it and providing it to the Producer constructor:** +> +> ```javascript +> const { Partitioners } = require('kafkajs') +> kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }) +> ``` ## Retry diff --git a/src/producer/partitioners/defaultJava/Test.java b/src/producer/partitioners/default/Test.java similarity index 100% rename from src/producer/partitioners/defaultJava/Test.java rename to src/producer/partitioners/default/Test.java diff --git a/src/producer/partitioners/default/index.js b/src/producer/partitioners/default/index.js index d441fd6ab..cbdc4f073 100644 --- a/src/producer/partitioners/default/index.js +++ b/src/producer/partitioners/default/index.js @@ -1,4 +1,4 @@ const murmur2 = require('./murmur2') -const createDefaultPartitioner = require('./partitioner') +const createDefaultPartitioner = require('../legacy/partitioner') module.exports = createDefaultPartitioner(murmur2) diff --git a/src/producer/partitioners/default/murmur2.js b/src/producer/partitioners/default/murmur2.js index 3b5661e21..02e531d5c 100644 --- a/src/producer/partitioners/default/murmur2.js +++ b/src/producer/partitioners/default/murmur2.js @@ -1,22 +1,23 @@ /* eslint-disable */ +const Long = require('../../../utils/long') // Based on the kafka client 0.10.2 murmur2 implementation // https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364 -const SEED = 0x9747b28c +const SEED = Long.fromValue(0x9747b28c) // 'm' and 'r' are mixing constants generated offline. // They're not really 'magic', they just happen to work well. -const M = 0x5bd1e995 -const R = 24 +const M = Long.fromValue(0x5bd1e995) +const R = Long.fromValue(24) module.exports = key => { const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key)) const length = data.length // Initialize the hash to a random value - let h = SEED ^ length - let length4 = length / 4 + let h = Long.fromValue(SEED.xor(length)) + let length4 = Math.floor(length / 4) for (let i = 0; i < length4; i++) { const i4 = i * 4 @@ -25,27 +26,28 @@ module.exports = key => { ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24) - k *= M - k ^= k >>> R - k *= M - h *= M - h ^= k + k = Long.fromValue(k) + k = k.multiply(M) + k = k.xor(k.toInt() >>> R) + k = Long.fromValue(k).multiply(M) + h = h.multiply(M) + h = h.xor(k) } // Handle the last few bytes of the input array switch (length % 4) { case 3: - h ^= (data[(length & ~3) + 2] & 0xff) << 16 + h = h.xor((data[(length & ~3) + 2] & 0xff) << 16) case 2: - h ^= (data[(length & ~3) + 1] & 0xff) << 8 + h = h.xor((data[(length & ~3) + 1] & 0xff) << 8) case 1: - h ^= data[length & ~3] & 0xff - h *= M + h = h.xor(data[length & ~3] & 0xff) + h = h.multiply(M) } - h ^= h >>> 13 - h *= M - h ^= h >>> 15 + h = h.xor(h.toInt() >>> 13) + h = h.multiply(M) + h = h.xor(h.toInt() >>> 15) - return h + return h.toInt() } diff --git a/src/producer/partitioners/default/murmur2.spec.js b/src/producer/partitioners/default/murmur2.spec.js index 2a681578c..6723b9607 100644 --- a/src/producer/partitioners/default/murmur2.spec.js +++ b/src/producer/partitioners/default/murmur2.spec.js @@ -8,29 +8,31 @@ describe('Producer > Partitioner > Default > murmur2', () => { }) test('it handles numeric input', () => { - expect(murmur2(0)).toEqual(272173970) + expect(murmur2(0)).toEqual(971027396) }) test('it handles buffer input', () => { - expect(murmur2(Buffer.from('1'))).toEqual(1311020360) + expect(murmur2(Buffer.from('1'))).toEqual(-1993445489) }) }) +// Generated with src/producer/partitioners/defaultJava/Test.java const testData = { - '0': 272173970, - '1': 1311020360, - '128': 2053105854, - '2187': -2081355488, - '16384': 204404061, - '78125': -677491393, - '279936': -622460209, - '823543': 651276451, - '2097152': 944683677, - '4782969': -892695770, - '10000000': -1778616326, - '19487171': -518311627, - '35831808': 556972389, - '62748517': -233806557, - '105413504': -109398538, - '170859375': 102939717, + '0': 971027396, + '1': -1993445489, + '128': -326012175, + '2187': -1508407203, + '16384': -325739742, + '78125': -1654490814, + '279936': 1462227128, + '823543': -2014198330, + '2097152': 607668903, + '4782969': -1182699775, + '10000000': -1830336757, + '19487171': -1603849305, + '35831808': -857013643, + '62748517': -1167431028, + '105413504': -381294639, + '170859375': -1658323481, + '100:48069': 1009543857, } diff --git a/src/producer/partitioners/defaultJava/index.js b/src/producer/partitioners/defaultJava/index.js deleted file mode 100644 index 57a7d9f05..000000000 --- a/src/producer/partitioners/defaultJava/index.js +++ /dev/null @@ -1,4 +0,0 @@ -const murmur2 = require('./murmur2') -const createDefaultPartitioner = require('../default/partitioner') - -module.exports = createDefaultPartitioner(murmur2) diff --git a/src/producer/partitioners/defaultJava/murmur2.spec.js b/src/producer/partitioners/defaultJava/murmur2.spec.js deleted file mode 100644 index 452b4908b..000000000 --- a/src/producer/partitioners/defaultJava/murmur2.spec.js +++ /dev/null @@ -1,38 +0,0 @@ -const murmur2 = require('./murmur2') - -describe('Producer > Partitioner > DefaultJava > murmur2', () => { - test('it works', () => { - Object.keys(testData).forEach(key => { - expect(murmur2(key)).toEqual(testData[key]) - }) - }) - - test('it handles numeric input', () => { - expect(murmur2(0)).toEqual(971027396) - }) - - test('it handles buffer input', () => { - expect(murmur2(Buffer.from('1'))).toEqual(-1993445489) - }) -}) - -// Generated with src/producer/partitioners/defaultJava/Test.java -const testData = { - '0': 971027396, - '1': -1993445489, - '128': -326012175, - '2187': -1508407203, - '16384': -325739742, - '78125': -1654490814, - '279936': 1462227128, - '823543': -2014198330, - '2097152': 607668903, - '4782969': -1182699775, - '10000000': -1830336757, - '19487171': -1603849305, - '35831808': -857013643, - '62748517': -1167431028, - '105413504': -381294639, - '170859375': -1658323481, - '100:48069': 1009543857, -} diff --git a/src/producer/partitioners/index.js b/src/producer/partitioners/index.js index 6cb104902..44d8233bb 100644 --- a/src/producer/partitioners/index.js +++ b/src/producer/partitioners/index.js @@ -1,7 +1,7 @@ const DefaultPartitioner = require('./default') -const JavaCompatiblePartitioner = require('./defaultJava') +const LegacyPartitioner = require('./legacy') module.exports = { DefaultPartitioner, - JavaCompatiblePartitioner, + LegacyPartitioner, } diff --git a/src/producer/partitioners/legacy/index.js b/src/producer/partitioners/legacy/index.js new file mode 100644 index 000000000..be3ce76d6 --- /dev/null +++ b/src/producer/partitioners/legacy/index.js @@ -0,0 +1,4 @@ +const murmur2 = require('./murmur2') +const createLegacyPartitioner = require('./partitioner') + +module.exports = createLegacyPartitioner(murmur2) diff --git a/src/producer/partitioners/default/index.spec.js b/src/producer/partitioners/legacy/index.spec.js similarity index 98% rename from src/producer/partitioners/default/index.spec.js rename to src/producer/partitioners/legacy/index.spec.js index ca796f5eb..9d2a9b735 100644 --- a/src/producer/partitioners/default/index.spec.js +++ b/src/producer/partitioners/legacy/index.spec.js @@ -1,6 +1,6 @@ const createPartitioner = require('./index') -describe('Producer > Partitioner > Default', () => { +describe('Producer > Partitioner > Legacy', () => { let topic, partitioner, partitionMetadata beforeEach(() => { diff --git a/src/producer/partitioners/defaultJava/murmur2.js b/src/producer/partitioners/legacy/murmur2.js similarity index 55% rename from src/producer/partitioners/defaultJava/murmur2.js rename to src/producer/partitioners/legacy/murmur2.js index 02e531d5c..3b5661e21 100644 --- a/src/producer/partitioners/defaultJava/murmur2.js +++ b/src/producer/partitioners/legacy/murmur2.js @@ -1,23 +1,22 @@ /* eslint-disable */ -const Long = require('../../../utils/long') // Based on the kafka client 0.10.2 murmur2 implementation // https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364 -const SEED = Long.fromValue(0x9747b28c) +const SEED = 0x9747b28c // 'm' and 'r' are mixing constants generated offline. // They're not really 'magic', they just happen to work well. -const M = Long.fromValue(0x5bd1e995) -const R = Long.fromValue(24) +const M = 0x5bd1e995 +const R = 24 module.exports = key => { const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key)) const length = data.length // Initialize the hash to a random value - let h = Long.fromValue(SEED.xor(length)) - let length4 = Math.floor(length / 4) + let h = SEED ^ length + let length4 = length / 4 for (let i = 0; i < length4; i++) { const i4 = i * 4 @@ -26,28 +25,27 @@ module.exports = key => { ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24) - k = Long.fromValue(k) - k = k.multiply(M) - k = k.xor(k.toInt() >>> R) - k = Long.fromValue(k).multiply(M) - h = h.multiply(M) - h = h.xor(k) + k *= M + k ^= k >>> R + k *= M + h *= M + h ^= k } // Handle the last few bytes of the input array switch (length % 4) { case 3: - h = h.xor((data[(length & ~3) + 2] & 0xff) << 16) + h ^= (data[(length & ~3) + 2] & 0xff) << 16 case 2: - h = h.xor((data[(length & ~3) + 1] & 0xff) << 8) + h ^= (data[(length & ~3) + 1] & 0xff) << 8 case 1: - h = h.xor(data[length & ~3] & 0xff) - h = h.multiply(M) + h ^= data[length & ~3] & 0xff + h *= M } - h = h.xor(h.toInt() >>> 13) - h = h.multiply(M) - h = h.xor(h.toInt() >>> 15) + h ^= h >>> 13 + h *= M + h ^= h >>> 15 - return h.toInt() + return h } diff --git a/src/producer/partitioners/legacy/murmur2.spec.js b/src/producer/partitioners/legacy/murmur2.spec.js new file mode 100644 index 000000000..2a681578c --- /dev/null +++ b/src/producer/partitioners/legacy/murmur2.spec.js @@ -0,0 +1,36 @@ +const murmur2 = require('./murmur2') + +describe('Producer > Partitioner > Default > murmur2', () => { + test('it works', () => { + Object.keys(testData).forEach(key => { + expect(murmur2(key)).toEqual(testData[key]) + }) + }) + + test('it handles numeric input', () => { + expect(murmur2(0)).toEqual(272173970) + }) + + test('it handles buffer input', () => { + expect(murmur2(Buffer.from('1'))).toEqual(1311020360) + }) +}) + +const testData = { + '0': 272173970, + '1': 1311020360, + '128': 2053105854, + '2187': -2081355488, + '16384': 204404061, + '78125': -677491393, + '279936': -622460209, + '823543': 651276451, + '2097152': 944683677, + '4782969': -892695770, + '10000000': -1778616326, + '19487171': -518311627, + '35831808': 556972389, + '62748517': -233806557, + '105413504': -109398538, + '170859375': 102939717, +} diff --git a/src/producer/partitioners/default/partitioner.js b/src/producer/partitioners/legacy/partitioner.js similarity index 100% rename from src/producer/partitioners/default/partitioner.js rename to src/producer/partitioners/legacy/partitioner.js diff --git a/src/producer/partitioners/default/randomBytes.js b/src/producer/partitioners/legacy/randomBytes.js similarity index 100% rename from src/producer/partitioners/default/randomBytes.js rename to src/producer/partitioners/legacy/randomBytes.js diff --git a/src/producer/partitioners/default/randomBytes.spec.js b/src/producer/partitioners/legacy/randomBytes.spec.js similarity index 100% rename from src/producer/partitioners/default/randomBytes.spec.js rename to src/producer/partitioners/legacy/randomBytes.spec.js diff --git a/types/index.d.ts b/types/index.d.ts index 480ee0701..271fc95ee 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -91,11 +91,11 @@ export interface PartitionerArgs { export type ICustomPartitioner = () => (args: PartitionerArgs) => number export type DefaultPartitioner = ICustomPartitioner -export type JavaCompatiblePartitioner = ICustomPartitioner +export type LegacyPartitioner = ICustomPartitioner export const Partitioners: { - DefaultPartitioner: DefaultPartitioner - JavaCompatiblePartitioner: JavaCompatiblePartitioner + DefaultPartitioner: DefaultPartitioner, + LegacyPartitioner: LegacyPartitioner, } export type PartitionMetadata = {