Skip to content
This repository has been archived by the owner on Apr 9, 2022. It is now read-only.

Version 1.4.0 #47

Merged
merged 15 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="TestMessageKeyPartitionSelection.cs" />
<Compile Include="TestVarIntConverter.cs" />
<Compile Include="TestBatching.cs" />
<Compile Include="TestClient.cs" />
Expand Down
37 changes: 30 additions & 7 deletions kafka-sharp/kafka-sharp.UTest/Mocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,24 @@ public static void Reset()
}
}

class NodeMock : INode
class NodeMock : INode, IEquatable<NodeMock>
{
private MetadataResponse _response;
public NodeMock() : this(new MetadataResponse()) { }
public NodeMock()
: this("node1")
{ }

public NodeMock(MetadataResponse response)
public NodeMock(string name)
: this(new MetadataResponse(), name)
{ }

public NodeMock(MetadataResponse response, string name)
{
Name = name;
_response = response;
}

public string Name
{
get { return "Some node"; }
}
public string Name { get; set; }

public bool Produce(ProduceMessage message)
{
Expand Down Expand Up @@ -261,6 +265,25 @@ public bool Post(IBatchByTopic<OffsetMessage> batch)
{
throw new NotImplementedException();
}

public override bool Equals(object obj)
{
return Equals(obj as NodeMock);
}

public bool Equals(NodeMock other)
{
if (ReferenceEquals(null, other))
return false;
if (ReferenceEquals(this, other))
return true;
return string.Equals(Name, other.Name);
}

public override int GetHashCode()
{
return Name == null ? 0 : Name.GetHashCode();
}
}

class ClusterMock : ICluster
Expand Down
8 changes: 4 additions & 4 deletions kafka-sharp/kafka-sharp.UTest/TestBatching.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void TestAccumulatorByTopicByPartitionTimeElapsed()
[TestCase(500000, 100)]
public void TestAccumulatorByNodeByTopic(int batchSize, int time)
{
INode n1 = new NodeMock();
INode n2 = new NodeMock();
INode n1 = new NodeMock("n1");
INode n2 = new NodeMock("n2");
var accumulator = new AccumulatorByNodeByTopic<Tuple<string, int>>(t => t.Item1, batchSize, TimeSpan.FromMilliseconds(time));

var count = new CountdownEvent(2);
Expand Down Expand Up @@ -198,8 +198,8 @@ public void TestAccumulatorByNodeByTopic(int batchSize, int time)
[TestCase(500000, 100)]
public void TestAccumulatorByNodeByTopicByPartition(int batchSize, int time)
{
INode n1 = new NodeMock();
INode n2 = new NodeMock();
INode n1 = new NodeMock("n1");
INode n2 = new NodeMock("n2");
var accumulator = new AccumulatorByNodeByTopicByPartition<Tuple<string, int, int>>(t => t.Item1, t => t.Item2, batchSize, TimeSpan.FromMilliseconds(time));

var count = new CountdownEvent(2);
Expand Down
4 changes: 4 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ private void Init(Configuration configuration)
{
_node = new Mock<INode>();
var brokerMeta = new BrokerMeta();
brokerMeta.Host = "brokerHost";
_node.Setup(n => n.FetchMetadata())
.Returns(Task.FromResult(new MetadataResponse
{
BrokersMeta = new[] {brokerMeta}, TopicsMeta = new TopicMeta[0]
}));
_node.Setup(n => n.Equals(It.IsAny<object>())).Returns(true);
_node.Setup(n => n.Equals(It.IsAny<INode>())).Returns(true);
_node.Setup(n => n.GetHashCode()).Returns(0);
_producer = new Mock<IProduceRouter>();
_consumer = new Mock<IConsumeRouter>();
var logger = new Mock<ILogger>();
Expand Down
5 changes: 4 additions & 1 deletion kafka-sharp/kafka-sharp.UTest/TestCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public void Setup()
private Mock<INode> GenerateNodeMock(int port)
{
var nodeMock = new Mock<INode>();
nodeMock.Setup(n => n.Name).Returns("localhost:" + port);
var name = "localhost:" + port;
nodeMock.Setup(n => n.Name).Returns(name);
nodeMock.Setup(n => n.FetchMetadata()).Returns(Task.FromResult(TestData.TestMetadataResponse));
nodeMock.Setup(n => n.FetchMetadata(It.IsAny<IEnumerable<string>>())).Returns(Task.FromResult(TestData.TestMetadataResponse));
nodeMock.Setup(n => n.Stop()).Returns(Task.FromResult(true));
nodeMock.Setup(n => n.Equals(It.IsAny<INode>())).Returns((INode other) => name == other.Name);
nodeMock.Setup(n => n.GetHashCode()).Returns(name.GetHashCode);
return nodeMock;
}

Expand Down
8 changes: 8 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestGeneral.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace tests_kafka_sharp
[TestFixture]
internal class TestGeneral
{
private static ISerializer Serializer = new StringSerializer();

private static ClusterClient InitCluster(Configuration configuration, ILogger logger, MetadataResponse metadata, bool forceErrors = false, bool forceConnectionErrors = false, int responseDelay = 0)
{
TestData.Reset();
Expand All @@ -36,6 +38,7 @@ public void TestOneProduce()
ErrorStrategy = ErrorStrategy.Discard,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
const int expectedLatency = 5;
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: false, responseDelay: expectedLatency);

Expand Down Expand Up @@ -106,6 +109,7 @@ public void TestMultipleProduce()
ErrorStrategy = ErrorStrategy.Discard,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand All @@ -120,6 +124,7 @@ public void TestMultipleProduceGlobalAccumulator()
Seeds = "localhost:1,localhost:2,localhost:3",
BatchStrategy = BatchStrategy.Global
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand All @@ -134,6 +139,7 @@ public void TestMultipleProduceConcurrencyOne()
Seeds = "localhost:1,localhost:2,localhost:3",
TaskScheduler = new ActionBlockTaskScheduler(1)
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
TestMultipleProduce(configuration);
}

Expand Down Expand Up @@ -196,6 +202,7 @@ public void TestMultipleProduceWithNetworkErrorsAndRetry()
MaxSuccessiveNodeErrors = 10,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: false, forceConnectionErrors: true);

cluster.Produce("topic1", "key", "value");
Expand Down Expand Up @@ -245,6 +252,7 @@ public void TestBigShake()
ErrorStrategy = ErrorStrategy.Retry,
Seeds = "localhost:1,localhost:2,localhost:3"
};
configuration.SerializationConfig.SetDefaultSerializers(Serializer, Serializer);
var cluster = InitCluster(configuration, logger, TestData.TestMetadataResponse, forceErrors: true, forceConnectionErrors: true);

cluster.Produce("topic1", "key", "value");
Expand Down
87 changes: 87 additions & 0 deletions kafka-sharp/kafka-sharp.UTest/TestMessageKeyPartitionSelection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using Kafka.Cluster;
using Kafka.Protocol;
using Kafka.Public;
using Kafka.Routing;
using Kafka.Routing.PartitionSelection;
using Moq;
using NUnit.Framework;

namespace tests_kafka_sharp
{
internal class TestMessageKeyPartitionSelection
{
private static readonly ISerializer Serializer = new StringSerializer();
private static readonly RoundRobinPartitionSelection RoundRobinPartitionSelection = new RoundRobinPartitionSelection();

[Test]
public void Test_MessageKeyPartitionSelection_Is_Consistent()
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message1 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime());
var message2 = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyOtherKey" }, new DateTime());

var expectedPartition1 = partitioner.GetPartition(message1, partitions);
var expectedPartition2 = partitioner.GetPartition(message2, partitions);
for (var i = 0; i < 300; i++)
{
var currentPartition1 = partitioner.GetPartition(message1, partitions);
var currentPartition2 = partitioner.GetPartition(message2, partitions);
Assert.AreEqual(expectedPartition1.Id, currentPartition1.Id);
Assert.AreEqual(expectedPartition2.Id, currentPartition2.Id);
}
}

[Test]
public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_MessageKey_Null()
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = null }, new DateTime());

var partition = partitioner.GetPartition(message, partitions);
Assert.IsTrue(partition.Id != Partition.None.Id);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
public void Test_MessageKeyPartitionSelection_Fallbacks_To_RoundRobin_If_Partition_Blacklisted(int partitionIdBlacklisted)
{
var nodeMock = new NodeMock();
var partitions = new[]
{
new Partition {Id = 0, Leader = nodeMock},
new Partition {Id = 1, Leader = nodeMock},
new Partition {Id = 2, Leader = nodeMock},
};
var blacklistedPartitions = new Dictionary<int, DateTime> { { partitionIdBlacklisted, DateTime.MaxValue } };
var partitionStrategy = new MessageKeyPartitionSelection(Serializer, RoundRobinPartitionSelection, Mock.Of<ILogger>());
var partitioner = new PartitionSelector(partitionStrategy);
var message = ProduceMessage.New(string.Empty, Partitions.Any, new Message { Key = "ThisIsMyKey" }, new DateTime());

for (var i = 0; i < 300; i++)
{
var partition = partitioner.GetPartition(message, partitions, blacklistedPartitions);
Assert.IsTrue(partition.Id != Partition.None.Id);
Assert.IsTrue(partition.Id != partitionIdBlacklisted);
}
}
}
}
44 changes: 29 additions & 15 deletions kafka-sharp/kafka-sharp.UTest/TestPartitioner.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Kafka.Cluster;
using Kafka.Protocol;
using Kafka.Public;
using Kafka.Routing;
using Kafka.Routing.PartitionSelection;
using NUnit.Framework;

namespace tests_kafka_sharp
Expand All @@ -27,13 +30,16 @@ public void TestRoundRobinPartitionAssign(int delay)
new Partition {Id = 3, Leader = nodeMock},
new Partition {Id = 4, Leader = nodeMock},
};
var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);
delay = delay <= 0 ? 1 : delay;
foreach (var partition in partitions)
{
for (var j = 0; j < delay; ++j)
{
Assert.AreEqual(partition.Id, partitioner.GetPartition(Partitions.Any, partitions).Id);
Assert.AreEqual(partition.Id, partitioner
.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions)
.Id);
}
}
}
Expand All @@ -42,8 +48,10 @@ public void TestRoundRobinPartitionAssign(int delay)
public void TestRoundRobinPartitionAssignNoPartitionReturnsNone()
{
var partitions = new Partition[0];
var partitioner = new PartitionSelector();
Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition(Partitions.Any, partitions)));
var partitionStrategy = new RoundRobinPartitionSelection();
var partitioner = new PartitionSelector(partitionStrategy);
Assert.AreEqual(0, Partition.None.CompareTo(partitioner.GetPartition(
ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions)));
}

[Test]
Expand All @@ -62,15 +70,16 @@ public void TestFilter()
filter[0] = DateTime.UtcNow;
filter[2] = DateTime.UtcNow;
filter[4] = DateTime.UtcNow;
var partitioner = new PartitionSelector();
var partitionStrategy = new RoundRobinPartitionSelection();
var partitioner = new PartitionSelector(partitionStrategy);

var partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()),partitions, filter);
Assert.AreEqual(1, partition.Id);

partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);
Assert.AreEqual(3, partition.Id);

partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);
Assert.AreEqual(1, partition.Id);
}

Expand All @@ -93,9 +102,10 @@ public void TestRobinPartitionAssignWhenFiltered()

int delay = partitions.Length + 2;

var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);

var partition = partitioner.GetPartition(Partitions.Any, partitions, filter);
var partition = partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter);

Assert.AreEqual(1, partition.Id);

Expand Down Expand Up @@ -128,7 +138,8 @@ public void TestFilterWithHighDelay()
// Pick a delay greater than the number of partitions
int delay = partitions.Length + 2;

var partitioner = new PartitionSelector(delay);
var partitionStrategy = new RoundRobinPartitionSelection(delay);
var partitioner = new PartitionSelector(partitionStrategy);

var firstBatch = GetPartitions(delay, partitioner, partitions, filter);

Expand All @@ -152,7 +163,8 @@ public void TestOverflow()

var partitions = Enumerable.Range(0, 10).Select(i => new Partition { Id = i, Leader = nodeMock }).ToArray();

var partitioner = new PartitionSelector(1, int.MaxValue);
var partitionStrategy = new RoundRobinPartitionSelection(delay: 1, startSeed: int.MaxValue);
var partitioner = new PartitionSelector(partitionStrategy);

var batch = GetPartitions(partitions.Length, partitioner, partitions, null);

Expand Down Expand Up @@ -180,12 +192,14 @@ public void TestRoundRobinPartitionWithStartSeed(int startSeed, int delay)
new Partition {Id = 3, Leader = nodeMock},
new Partition {Id = 4, Leader = nodeMock},
};
var partitioner = new PartitionSelector(delay, startSeed);
var partitionStrategy = new RoundRobinPartitionSelection(delay: delay, startSeed: startSeed);
var partitioner = new PartitionSelector(partitionStrategy);
foreach (var partition in partitions)
{
for (var j = 0; j < delay; ++j)
{
Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition(Partitions.Any, partitions).Id);
Assert.AreEqual((partition.Id + startSeed) % partitions.Length, partitioner.GetPartition(
ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions).Id);
}
}
}
Expand All @@ -196,7 +210,7 @@ private static List<Partition> GetPartitions(int count, PartitionSelector partit

for (int i = 0; i < count; i++)
{
result.Add(partitioner.GetPartition(Partitions.Any, partitions, filter));
result.Add(partitioner.GetPartition(ProduceMessage.New(string.Empty, Partitions.Any, new Message(), new DateTime()), partitions, filter));
}

return result;
Expand Down
Loading