Skip to content
This repository has been archived by the owner on Apr 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #47 from verdie-g/criteo/syncFebruary2020
Browse files Browse the repository at this point in the history
Version 1.4.0
  • Loading branch information
ychuzevi authored Feb 26, 2020
2 parents e64f781 + 4287a2a commit db9416b
Show file tree
Hide file tree
Showing 37 changed files with 1,149 additions and 494 deletions.
1 change: 1 addition & 0 deletions kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="TestMessageKeyPartitionSelection.cs" />
<Compile Include="TestVarIntConverter.cs" />
<Compile Include="TestBatching.cs" />
<Compile Include="TestClient.cs" />
Expand Down
37 changes: 30 additions & 7 deletions kafka-sharp/kafka-sharp.UTest/Mocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,24 @@ public static void Reset()
}
}

class NodeMock : INode
class NodeMock : INode, IEquatable<NodeMock>
{
private MetadataResponse _response;
public NodeMock() : this(new MetadataResponse()) { }
public NodeMock()
: this("node1")
{ }

public NodeMock(MetadataResponse response)
public NodeMock(string name)
: this(new MetadataResponse(), name)
{ }

public NodeMock(MetadataResponse response, string name)
{
Name = name;
_response = response;
}

public string Name
{
get { return "Some node"; }
}
public string Name { get; set; }

public bool Produce(ProduceMessage message)
{
Expand Down Expand Up @@ -261,6 +265,25 @@ public bool Post(IBatchByTopic<OffsetMessage> batch)
{
throw new NotImplementedException();
}

public override bool Equals(object obj)
{
return Equals(obj as NodeMock);
}

public bool Equals(NodeMock other)
{
if (ReferenceEquals(null, other))
return false;
if (ReferenceEquals(this, other))
return true;
return string.Equals(Name, other.Name);
}

public override int GetHashCode()
{
return Name == null ? 0 : Name.GetHashCode();
}
}

class ClusterMock : ICluster
Expand Down
8 changes: 4 additions & 4 deletions kafka-sharp/kafka-sharp.UTest/TestBatching.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void TestAccumulatorByTopicByPartitionTimeElapsed()
[TestCase(500000, 100)]
public void TestAccumulatorByNodeByTopic(int batchSize, int time)
{
INode n1 = new NodeMock();
INode n2 = new NodeMock();
INode n1 = new NodeMock("n1");
INode n2 = new NodeMock("n2");
var accumulator = new AccumulatorByNodeByTopic<Tuple<string, int>>(t => t.Item1, batchSize, TimeSpan.FromMilliseconds(time));

var count = new CountdownEvent(2);
Expand Down Expand Up @@ -198,8 +198,8 @@ public void TestAccumulatorByNodeByTopic(int batchSize, int time)
[TestCase(500000, 100)]
public void TestAccumulatorByNodeByTopicByPartition(int batchSize, int time)
{
INode n1 = new NodeMock();
INode n2 = new NodeMock();
INode n1 = new NodeMock("n1");
INode n2 = new NodeMock("n2");
var accumulator = new AccumulatorByNodeByTopicByPartition<Tuple<string, int, int>>(t => t.Item1, t => t.Item2, batchSize, TimeSpan.FromMilliseconds(time));

var count = new CountdownEvent(2);
Expand Down
4 changes: 4 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ private void Init(Configuration configuration)
{
_node = new Mock<INode>();
var brokerMeta = new BrokerMeta();
brokerMeta.Host = "brokerHost";
_node.Setup(n => n.FetchMetadata())
.Returns(Task.FromResult(new MetadataResponse
{
BrokersMeta = new[] {brokerMeta}, TopicsMeta = new TopicMeta[0]
}));
_node.Setup(n => n.Equals(It.IsAny<object>())).Returns(true);
_node.Setup(n => n.Equals(It.IsAny<INode>())).Returns(true);
_node.Setup(n => n.GetHashCode()).Returns(0);
_producer = new Mock<IProduceRouter>();
_consumer = new Mock<IConsumeRouter>();
var logger = new Mock<ILogger>();
Expand Down
5 changes: 4 additions & 1 deletion kafka-sharp/kafka-sharp.UTest/TestCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public void Setup()
private Mock<INode> GenerateNodeMock(int port)
{
var nodeMock = new Mock<INode>();
nodeMock.Setup(n => n.Name).Returns("localhost:" + port);
var name = "localhost:" + port;
nodeMock.Setup(n => n.Name).Returns(name);
nodeMock.Setup(n => n.FetchMetadata()).Returns(Task.FromResult(TestData.TestMetadataResponse));
nodeMock.Setup(n => n.FetchMetadata(It.IsAny<IEnumerable<string>>())).Returns(Task.FromResult(TestData.TestMetadataResponse));
nodeMock.Setup(n => n.Stop()).Returns(Task.FromResult(true));
nodeMock.Setup(n => n.Equals(It.IsAny<INode>())).Returns((INode other) => name == other.Name);
nodeMock.Setup(n => n.GetHashCode()).Returns(name.GetHashCode);
return nodeMock;
}

Expand Down
8 changes: 8 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestGeneral.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace tests_kafka_sharp
[TestFixture]
internal class TestGeneral
{
private static ISerializer Serializer = new StringSerializer();

private static ClusterClient InitCluster(Configuration configuration, ILogger logger, MetadataResponse metadata, bool forceErrors = false, bool forceConnectionErrors = false, int responseDelay = 0)
{
TestData.Reset();
Expand All @@ -36,6 +38,7 @@ public void TestOneProduce()
ErrorStrategy = ErrorStrategy.Discard,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
const int expectedLatency = 5;
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: false, responseDelay: expectedLatency);

Expand Down Expand Up @@ -106,6 +109,7 @@ public void TestMultipleProduce()
ErrorStrategy = ErrorStrategy.Discard,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand All @@ -120,6 +124,7 @@ public void TestMultipleProduceGlobalAccumulator()
Seeds = "localhost:1,localhost:2,localhost:3",
BatchStrategy = BatchStrategy.Global
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand All @@ -134,6 +139,7 @@ public void TestMultipleProduceConcurrencyOne()
Seeds = "localhost:1,localhost:2,localhost:3",
TaskScheduler = new ActionBlockTaskScheduler(1)
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand Down Expand Up @@ -196,6 +202,7 @@ public void TestMultipleProduceWithNetworkErrorsAndRetry()
MaxSuccessiveNodeErrors = 10,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: true);

cluster.Produce("topic1", "key", "value");
Expand Down Expand Up @@ -245,6 +252,7 @@ public void TestBigShake()
ErrorStrategy = ErrorStrategy.Retry,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: true, forceConnectionErrors: true);

cluster.Produce("topic1", "key", "value");
Expand Down
87 changes: 87 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using Kafka.Cluster;
using Kafka.Protocol;
using Kafka.Public;
using Kafka.Routing;
using Kafka.Routing.PartitionSelection;
using Moq;
using NUnit.Framework;

namespace tests_kafka_sharp
{
internal class TestMessageKeyPartitionSelection
{
private static readonly ISerializer Serializer = new StringSerializer();
private static readonly RoundRobinPartitionSelection RoundRobinPartitionSelection = new RoundRobinPartitionSelection();

[Test]
public void Test_MessageKeyPartitionSelection_Is_Consistent()
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message1 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime());
var message2 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyOtherKey" }, new DateTime());

var expectedPartition1 = partitioner.GetPartition(message1, partitions);
var expectedPartition2 = partitioner.GetPartition(message2, partitions);
for (var i = 0; i < 300; i++)
{
var currentPartition1 = partitioner.GetPartition(message1, partitions);
var currentPartition2 = partitioner.GetPartition(message2, partitions);
Assert.AreEqual(expectedPartition1.Id, currentPartition1.Id);
Assert.AreEqual(expectedPartition2.Id, currentPartition2.Id);
}
}

[Test]
public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_MessageKey_Null()
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = null }, new DateTime());

var partition = partitioner.GetPartition(message, partitions);
Assert.IsTrue(partition.Id != Partition.None.Id);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_Partition_Blacklisted(int partitionIdBlacklisted)
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var blacklistedPartitions = new Dictionary<int, DateTime> { { partitionIdBlacklisted, DateTime.MaxValue } };
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime());

for (var i = 0; i < 300; i++)
{
var partition = partitioner.GetPartition(message, partitions, blacklistedPartitions);
Assert.IsTrue(partition.Id != Partition.None.Id);
Assert.IsTrue(partition.Id != partitionIdBlacklisted);
}
}
}
}
44 changes: 29 additions & 15 deletions kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Kafka.Cluster;
using Kafka.Protocol;
using Kafka.Public;
using Kafka.Routing;
using Kafka.Routing.PartitionSelection;
using NUnit.Framework;

namespace tests_kafka_sharp
Expand All @@ -27,13 +30,16 @@ public void TestRoundRobinPartitionAssign(int delay)
new Partition {Id = 3, Leader = nodeMock},
new Partition {Id = 4, Leader = nodeMock},
};
var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);
delay = delay <= 0 ? 1 : delay;
foreach (var partition in partitions)
{
for (var j = 0; j < delay; ++j)
{
Assert.AreEqual(partition.Id, partitioner.GetPartition(Partitions.Any, partitions).Id);
Assert.AreEqual(partition.Id, partitioner
.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions)
.Id);
}
}
}
Expand All @@ -42,8 +48,10 @@ public void TestRoundRobinPartitionAssign(int delay)
public void TestRoundRobinPartitionAssignNoPartitionReturnsNone()
{
var partitions = new Partition[0];
var partitioner = new PartitionSelector();
Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition(Partitions.Any, partitions)));
var partitionStrategy = new RoundRobinPartitionSelection();
var partitioner = new PartitionSelector(partitionStrategy);
Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition(
ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions)));
}

[Test]
Expand All @@ -62,15 +70,16 @@ public void TestFilter()
filter[0] = DateTime.UtcNow;
filter[2] = DateTime.UtcNow;
filter[4] = DateTime.UtcNow;
var partitioner = new PartitionSelector();
var partitionStrategy = new RoundRobinPartitionSelection();
var partitioner = new PartitionSelector(partitionStrategy);

var partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()),partitions, filter);
Assert.AreEqual(1, partition.Id);

partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);
Assert.AreEqual(3, partition.Id);

partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);
Assert.AreEqual(1, partition.Id);
}

Expand All @@ -93,9 +102,10 @@ public void TestRobinPartitionAssignWhenFiltered()

int delay = partitions.Length + 2;

var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);

var partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);

Assert.AreEqual(1, partition.Id);

Expand Down Expand Up @@ -128,7 +138,8 @@ public void TestFilterWithHighDelay()
// Pick a delay greater than the number of partitions
int delay = partitions.Length + 2;

var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);

var firstBatch = GetPartitions(delay, partitioner, partitions, filter);

Expand All @@ -152,7 +163,8 @@ public void TestOverflow()

var partitions = Enumerable.Range(0, 10).Select(i => new Partition { Id = i, Leader = nodeMock }).ToArray();

var partitioner = new PartitionSelector(1, int.MaxValue);
var partitionStrategy = new RoundRobinPartitionSelection(delay: 1, startSeed: int.MaxValue);
var partitioner = new PartitionSelector(partitionStrategy);

var batch = GetPartitions(partitions.Length, partitioner, partitions, null);

Expand Down Expand Up @@ -180,12 +192,14 @@ public void TestRoundRobinPartitionWithStartSeed(int startSeed, int delay)
new Partition {Id = 3, Leader = nodeMock},
new Partition {Id = 4, Leader = nodeMock},
};
var partitioner = new PartitionSelector(delay, startSeed);
var partitionStrategy = new RoundRobinPartitionSelection(delay: delay, startSeed: startSeed);
var partitioner = new PartitionSelector(partitionStrategy);
foreach (var partition in partitions)
{
for (var j = 0; j < delay; ++j)
{
Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition(Partitions.Any, partitions).Id);
Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition(
ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions).Id);
}
}
}
Expand All @@ -196,7 +210,7 @@ private static List<Partition> GetPartitions(int count, PartitionSelector partit

for (int i = 0; i < count; i++)
{
result.Add(partitioner.GetPartition(Partitions.Any, partitions, filter));
result.Add(partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter));
}

return result;
Expand Down
Loading

0 comments on commit db9416b

Please sign in to comment.