From ab5c14e678c3c66709f48874b71078762cb8dbd5 Mon Sep 17 00:00:00 2001 From: Jules Bovet Date: Thu, 31 Jan 2019 17:49:32 +0100 Subject: [PATCH 01/15] [Kafka] Introduce partition selector with message key Currently, there is only one partition selection mode, which is the round robin one. There's a need to introduce a new selection strategy depending on the keys of messages to determine the partition the messages should go to. It would allow to compact the same messages and reduce heavily the number of messages. This new partition selection strategy guarantees that messages with the same key will be sent to the same partition, given the same number of available partitions. It is not using a consistent hashing though, which means that if new nodes are introduced or old ones are removed from the cluster, new messages with the same key as old messages will not be sent to the same partition. JIRA: WBSC-3909 Change-Id: Ie75b40123fd90550aecafd9c33d2b82952470469 --- .../kafka-sharp.UTest/Kafka.UTest.csproj | 1 + kafka-sharp/kafka-sharp.UTest/TestGeneral.cs | 8 ++ .../TestMessageKeyPartitionSelection.cs | 86 +++++++++++++++++++ .../kafka-sharp.UTest/TestPartitioner.cs | 44 ++++++---- kafka-sharp/kafka-sharp.UTest/TestRouter.cs | 2 +- .../kafka-sharp.UTest/TestSerialization.cs | 3 - kafka-sharp/kafka-sharp/Kafka.csproj | 8 +- kafka-sharp/kafka-sharp/Protocol/Message.cs | 7 +- .../kafka-sharp/Public/ClusterClient.cs | 3 +- .../kafka-sharp/Public/Configuration.cs | 7 ++ .../Public/PartitionSelectionConfig.cs | 59 +++++++++++++ .../kafka-sharp/Public/Serialization.cs | 2 +- .../PartitionSelection/IPartitionSelection.cs | 23 +++++ .../MessageKeyPartitionSelection.cs | 71 +++++++++++++++ .../PartitionSelection/PartitionSelector.cs | 52 +++++++++++ .../RoundRobinPartitionSelection.cs | 63 ++++++++++++++ .../kafka-sharp/Routing/PartitionSelector.cs | 79 ----------------- .../kafka-sharp/Routing/ProducerRouter.cs | 13 +-- 18 files changed, 418 insertions(+), 113 deletions(-) create mode 100644 kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs create mode 100644 kafka-sharp/kafka-sharp/Public/PartitionSelectionConfig.cs create mode 100644 kafka-sharp/kafka-sharp/Routing/PartitionSelection/IPartitionSelection.cs create mode 100644 kafka-sharp/kafka-sharp/Routing/PartitionSelection/MessageKeyPartitionSelection.cs create mode 100644 kafka-sharp/kafka-sharp/Routing/PartitionSelection/PartitionSelector.cs create mode 100644 kafka-sharp/kafka-sharp/Routing/PartitionSelection/RoundRobinPartitionSelection.cs delete mode 100644 kafka-sharp/kafka-sharp/Routing/PartitionSelector.cs diff --git a/kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj b/kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj index 2ec4f32..75adf68 100644 --- a/kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj +++ b/kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj @@ -69,6 +69,7 @@ + diff --git a/kafka-sharp/kafka-sharp.UTest/TestGeneral.cs b/kafka-sharp/kafka-sharp.UTest/TestGeneral.cs index 92f7ebe..d07a709 100644 --- a/kafka-sharp/kafka-sharp.UTest/TestGeneral.cs +++ b/kafka-sharp/kafka-sharp.UTest/TestGeneral.cs @@ -12,6 +12,8 @@ namespace tests_kafka_sharp [TestFixture] internal class TestGeneral { + private static ISerializer Serializer = new StringSerializer(); + private static ClusterClient InitCluster(Configuration configuration, ILogger logger, MetadataResponse metadata, bool forceErrors = false, bool forceConnectionErrors = false, int responseDelay = 0) { TestData.Reset(); @@ -36,6 +38,7 @@ public void TestOneProduce() ErrorStrategy = ErrorStrategy.Discard, Seeds = "localhost:1,localhost:2,localhost:3" }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); const int expectedLatency = 5; var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: false, responseDelay: expectedLatency); @@ -106,6 +109,7 @@ public void TestMultipleProduce() ErrorStrategy = ErrorStrategy.Discard, Seeds = "localhost:1,localhost:2,localhost:3" }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); TestMultipleProduce(configuration); } @@ -120,6 +124,7 @@ public void TestMultipleProduceGlobalAccumulator() Seeds = "localhost:1,localhost:2,localhost:3", BatchStrategy = BatchStrategy.Global }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); TestMultipleProduce(configuration); } @@ -134,6 +139,7 @@ public void TestMultipleProduceConcurrencyOne() Seeds = "localhost:1,localhost:2,localhost:3", TaskScheduler = new ActionBlockTaskScheduler(1) }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); TestMultipleProduce(configuration); } @@ -196,6 +202,7 @@ public void TestMultipleProduceWithNetworkErrorsAndRetry() MaxSuccessiveNodeErrors = 10, Seeds = "localhost:1,localhost:2,localhost:3" }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: true); cluster.Produce("topic1", "key", "value"); @@ -245,6 +252,7 @@ public void TestBigShake() ErrorStrategy = ErrorStrategy.Retry, Seeds = "localhost:1,localhost:2,localhost:3" }; + configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer); var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: true, forceConnectionErrors: true); cluster.Produce("topic1", "key", "value"); diff --git a/kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs b/kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs new file mode 100644 index 0000000..2beaa4a --- /dev/null +++ b/kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using Kafka.Cluster; +using Kafka.Protocol; +using Kafka.Public; +using Kafka.Routing; +using Kafka.Routing.PartitionSelection; +using NUnit.Framework; + +namespace tests_kafka_sharp +{ + internal class TestMessageKeyPartitionSelection + { + private static readonly ISerializer Serializer = new StringSerializer(); + private static readonly RoundRobinPartitionSelection RoundRobinPartitionSelection = new RoundRobinPartitionSelection(); + + [Test] + public void Test_MessageKeyPartitionSelection_Is_Consistent() + { + var nodeMock = new NodeMock(); + var partitions = new[] + { + new Partition {Id = 0, Leader = nodeMock}, + new Partition {Id = 1, Leader = nodeMock}, + new Partition {Id = 2, Leader = nodeMock}, + }; + var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection); + var partitioner = new PartitionSelector(partitionStrategy); + var message1 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime()); + var message2 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyOtherKey" }, new DateTime()); + + var expectedPartition1 = partitioner.GetPartition(message1, partitions); + var expectedPartition2 = partitioner.GetPartition(message2, partitions); + for (var i = 0; i < 300; i++) + { + var currentPartition1 = partitioner.GetPartition(message1, partitions); + var currentPartition2 = partitioner.GetPartition(message2, partitions); + Assert.AreEqual(expectedPartition1.Id, currentPartition1.Id); + Assert.AreEqual(expectedPartition2.Id, currentPartition2.Id); + } + } + + [Test] + public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_MessageKey_Null() + { + var nodeMock = new NodeMock(); + var partitions = new[] + { + new Partition {Id = 0, Leader = nodeMock}, + new Partition {Id = 1, Leader = nodeMock}, + new Partition {Id = 2, Leader = nodeMock}, + }; + var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection); + var partitioner = new PartitionSelector(partitionStrategy); + var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = null }, new DateTime()); + + var partition = partitioner.GetPartition(message, partitions); + Assert.IsTrue(partition.Id != Partition.None.Id); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(2)] + public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_Partition_Blacklisted(int partitionIdBlacklisted) + { + var nodeMock = new NodeMock(); + var partitions = new[] + { + new Partition {Id = 0, Leader = nodeMock}, + new Partition {Id = 1, Leader = nodeMock}, + new Partition {Id = 2, Leader = nodeMock}, + }; + var blacklistedPartitions = new Dictionary { { partitionIdBlacklisted, DateTime.MaxValue } }; + var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection); + var partitioner = new PartitionSelector(partitionStrategy); + var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime()); + + for (var i = 0; i < 300; i++) + { + var partition = partitioner.GetPartition(message, partitions, blacklistedPartitions); + Assert.IsTrue(partition.Id != Partition.None.Id); + Assert.IsTrue(partition.Id != partitionIdBlacklisted); + } + } + } +} diff --git a/kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs b/kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs index 9b2093c..3f1a78d 100644 --- a/kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs +++ b/kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs @@ -1,8 +1,11 @@ using System; using System.Collections.Generic; using System.Linq; +using Kafka.Cluster; +using Kafka.Protocol; using Kafka.Public; using Kafka.Routing; +using Kafka.Routing.PartitionSelection; using NUnit.Framework; namespace tests_kafka_sharp @@ -27,13 +30,16 @@ public void TestRoundRobinPartitionAssign(int delay) new Partition {Id = 3, Leader = nodeMock}, new Partition {Id = 4, Leader = nodeMock}, }; - var partitioner = new PartitionSelector(delay); + var partitionStrategy = new RoundRobinPartitionSelection(delay); + var partitioner = new PartitionSelector(partitionStrategy); delay = delay <= 0 ? 1 : delay; foreach (var partition in partitions) { for (var j = 0; j < delay; ++j) { - Assert.AreEqual(partition.Id, partitioner.GetPartition(Partitions.Any, partitions).Id); + Assert.AreEqual(partition.Id, partitioner + .GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions) + .Id); } } } @@ -42,8 +48,10 @@ public void TestRoundRobinPartitionAssign(int delay) public void TestRoundRobinPartitionAssignNoPartitionReturnsNone() { var partitions = new Partition[0]; - var partitioner = new PartitionSelector(); - Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition(Partitions.Any, partitions))); + var partitionStrategy = new RoundRobinPartitionSelection(); + var partitioner = new PartitionSelector(partitionStrategy); + Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition( + ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions))); } [Test] @@ -62,15 +70,16 @@ public void TestFilter() filter[0] = DateTime.UtcNow; filter[2] = DateTime.UtcNow; filter[4] = DateTime.UtcNow; - var partitioner = new PartitionSelector(); + var partitionStrategy = new RoundRobinPartitionSelection(); + var partitioner = new PartitionSelector(partitionStrategy); - var partition = partitioner.GetPartition(Partitions.Any, partitions, filter); + var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()),partitions, filter); Assert.AreEqual(1, partition.Id); - partition = partitioner.GetPartition(Partitions.Any, partitions, filter); + partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter); Assert.AreEqual(3, partition.Id); - partition = partitioner.GetPartition(Partitions.Any, partitions, filter); + partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter); Assert.AreEqual(1, partition.Id); } @@ -93,9 +102,10 @@ public void TestRobinPartitionAssignWhenFiltered() int delay = partitions.Length + 2; - var partitioner = new PartitionSelector(delay); + var partitionStrategy = new RoundRobinPartitionSelection(delay); + var partitioner = new PartitionSelector(partitionStrategy); - var partition = partitioner.GetPartition(Partitions.Any, partitions, filter); + var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter); Assert.AreEqual(1, partition.Id); @@ -128,7 +138,8 @@ public void TestFilterWithHighDelay() // Pick a delay greater than the number of partitions int delay = partitions.Length + 2; - var partitioner = new PartitionSelector(delay); + var partitionStrategy = new RoundRobinPartitionSelection(delay); + var partitioner = new PartitionSelector(partitionStrategy); var firstBatch = GetPartitions(delay, partitioner, partitions, filter); @@ -152,7 +163,8 @@ public void TestOverflow() var partitions = Enumerable.Range(0, 10).Select(i => new Partition { Id = i, Leader = nodeMock }).ToArray(); - var partitioner = new PartitionSelector(1, int.MaxValue); + var partitionStrategy = new RoundRobinPartitionSelection(delay: 1, startSeed: int.MaxValue); + var partitioner = new PartitionSelector(partitionStrategy); var batch = GetPartitions(partitions.Length, partitioner, partitions, null); @@ -180,12 +192,14 @@ public void TestRoundRobinPartitionWithStartSeed(int startSeed, int delay) new Partition {Id = 3, Leader = nodeMock}, new Partition {Id = 4, Leader = nodeMock}, }; - var partitioner = new PartitionSelector(delay, startSeed); + var partitionStrategy = new RoundRobinPartitionSelection(delay: delay, startSeed: startSeed); + var partitioner = new PartitionSelector(partitionStrategy); foreach (var partition in partitions) { for (var j = 0; j < delay; ++j) { - Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition(Partitions.Any, partitions).Id); + Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition( + ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions).Id); } } } @@ -196,7 +210,7 @@ private static List GetPartitions(int count, PartitionSelector partit for (int i = 0; i < count; i++) { - result.Add(partitioner.GetPartition(Partitions.Any, partitions, filter)); + result.Add(partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter)); } return result; diff --git a/kafka-sharp/kafka-sharp.UTest/TestRouter.cs b/kafka-sharp/kafka-sharp.UTest/TestRouter.cs index 9b836d2..b8a2b45 100644 --- a/kafka-sharp/kafka-sharp.UTest/TestRouter.cs +++ b/kafka-sharp/kafka-sharp.UTest/TestRouter.cs @@ -183,7 +183,7 @@ public void TestSerializeOnProduce() n.Produce( It.Is( p => - p.Message.SerializedKeyValue != null && p.Message.Key == null && p.Message.Value == null))); + p.Message.SerializedKeyValue != null && p.Message.Key != null && p.Message.Value == null))); } [Test] diff --git a/kafka-sharp/kafka-sharp.UTest/TestSerialization.cs b/kafka-sharp/kafka-sharp.UTest/TestSerialization.cs index 0434d6a..2fdd9a7 100644 --- a/kafka-sharp/kafka-sharp.UTest/TestSerialization.cs +++ b/kafka-sharp/kafka-sharp.UTest/TestSerialization.cs @@ -81,7 +81,6 @@ public void TestSerializeOneMessageWithPreserializedKeyValue() { var message = new Message { Key = Key, Value = Value }; message.SerializeKeyValue(new ReusableMemoryStream(null), new Tuple(null, null)); - Assert.IsNull(message.Key); Assert.IsNull(message.Value); Assert.IsNotNull(message.SerializedKeyValue); TestSerializeOneMessageCommon(message); @@ -128,7 +127,6 @@ public void TestSerializeOneMessageIMemorySerializableWithPreserializedKeyValue( { var message = new Message { Key = new SimpleSerializable(Key), Value = new SimpleSerializable(Value) }; message.SerializeKeyValue(new ReusableMemoryStream(null), new Tuple(null, null)); - Assert.IsNull(message.Key); Assert.IsNull(message.Value); Assert.IsNotNull(message.SerializedKeyValue); TestSerializeOneMessageCommon(message); @@ -226,7 +224,6 @@ public void TestSerializeOneEmptyMessageWithPreserializedKeyValue(MessageVersion { var message = new Message { Key = new byte[0], Value = new byte[0] }; message.SerializeKeyValue(new ReusableMemoryStream(null), new Tuple(null, null)); - Assert.IsNull(message.Key); Assert.IsNull(message.Value); Assert.IsNotNull(message.SerializedKeyValue); TestSerializeOneEmptyMessageCommon(message, messageVersion); diff --git a/kafka-sharp/kafka-sharp/Kafka.csproj b/kafka-sharp/kafka-sharp/Kafka.csproj index f0b11cb..a3b0f13 100644 --- a/kafka-sharp/kafka-sharp/Kafka.csproj +++ b/kafka-sharp/kafka-sharp/Kafka.csproj @@ -120,11 +120,15 @@ + - + + + + @@ -132,7 +136,7 @@ -