From 5b562340d960990ac775f98d5dea311b03907cf6 Mon Sep 17 00:00:00 2001 From: Eran Ofer Date: Wed, 18 May 2016 19:14:27 +0300 Subject: [PATCH 1/2] Fix bug in manual consumer regarding leader -1 Kafka doc: "The node id for the kafka broker currently acting as leader for this partition. If no leader exists because we are in the middle of a leader election this id will be -1." --- src/KafkaNetClient/BrokerRouter.cs | 10 ++++- src/KafkaNetClient/Consumer.cs | 11 +++++ src/KafkaNetClient/Protocol/Protocol.cs | 20 +++++++++ src/KafkaNetClient/ProtocolGateway.cs | 40 ++++++++++------- src/kafka-tests/Fakes/BrokerRouterProxy.cs | 44 +++++++++++++++++++ .../ProducerConsumerIntegrationTests.cs | 1 + src/kafka-tests/Unit/BrokerRouterTests.cs | 12 +++++ src/kafka-tests/Unit/ProtocolGatewayTest.cs | 10 +++-- 8 files changed, 129 insertions(+), 19 deletions(-) diff --git a/src/KafkaNetClient/BrokerRouter.cs b/src/KafkaNetClient/BrokerRouter.cs index 10a78c84..90aa5e45 100644 --- a/src/KafkaNetClient/BrokerRouter.cs +++ b/src/KafkaNetClient/BrokerRouter.cs @@ -248,7 +248,7 @@ private BrokerRoute GetCachedRoute(string topic, Partition partition) var route = TryGetRouteFromCache(topic, partition); if (route != null) return route; - throw new LeaderNotFoundException(string.Format("Lead broker cannot be found for parition: {0}, leader: {1}", partition.PartitionId, partition.LeaderId)); + throw new LeaderNotFoundException(string.Format("Lead broker cannot be found for partition: {0}, leader: {1}", partition.PartitionId, partition.LeaderId)); } private BrokerRoute TryGetRouteFromCache(string topic, Partition partition) @@ -269,6 +269,14 @@ private BrokerRoute TryGetRouteFromCache(string topic, Partition partition) private void UpdateInternalMetadataCache(MetadataResponse metadata) { + var noLeaderElectedForPartition = + metadata.Topics.Select(x => new {topic = x.Name, partition = x.Partitions.FirstOrDefault(i => i.LeaderId == -1)}) + .FirstOrDefault(x => x.partition != null); + + if (noLeaderElectedForPartition != null) + throw new NoLeaderElectedForPartition(string.Format("topic:{0} partition:{1}", + noLeaderElectedForPartition.topic, noLeaderElectedForPartition.partition)); + //resolve each broker var brokerEndpoints = metadata.Brokers.Select(broker => new { diff --git a/src/KafkaNetClient/Consumer.cs b/src/KafkaNetClient/Consumer.cs index a4dedcfa..f6f16b7f 100644 --- a/src/KafkaNetClient/Consumer.cs +++ b/src/KafkaNetClient/Consumer.cs @@ -226,6 +226,17 @@ await Task.Run(() => needToRefreshMetadata = true; _options.Log.ErrorFormat(ex.Message); } + catch (NoLeaderElectedForPartition ex) + { + needToRefreshMetadata = true; + _options.Log.ErrorFormat(ex.Message); + } + catch (LeaderNotFoundException ex)//the numbar partition of can be change + { + needToRefreshMetadata = true; + _options.Log.ErrorFormat(ex.Message); + } + catch (TaskCanceledException ex) { //TODO :LOG diff --git a/src/KafkaNetClient/Protocol/Protocol.cs b/src/KafkaNetClient/Protocol/Protocol.cs index 98314ba7..fa0188cd 100644 --- a/src/KafkaNetClient/Protocol/Protocol.cs +++ b/src/KafkaNetClient/Protocol/Protocol.cs @@ -223,6 +223,26 @@ public InvalidPartitionException(string message, Exception innerException) } } + [Serializable] + public class NoLeaderElectedForPartition : ApplicationException + { + public NoLeaderElectedForPartition(string message, params object[] args) + : base(string.Format(message, args)) + { + } + + public NoLeaderElectedForPartition(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + + public NoLeaderElectedForPartition(string message, Exception innerException) + : base(message, innerException) + { + } + } + + [Serializable] public class BrokerException : ApplicationException { diff --git a/src/KafkaNetClient/ProtocolGateway.cs b/src/KafkaNetClient/ProtocolGateway.cs index 58687855..01377591 100644 --- a/src/KafkaNetClient/ProtocolGateway.cs +++ b/src/KafkaNetClient/ProtocolGateway.cs @@ -44,9 +44,9 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top int retryTime = 0; while (retryTime < _maxRetry) { - bool needToRefreshTopicMetadata; - ExceptionDispatchInfo exception = null; - string errorDetails; + bool needToRefreshTopicMetadata = false; + ExceptionDispatchInfo exceptionInfo = null; + string errorDetails = ""; try { @@ -63,7 +63,7 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top return null; } - var error = (ErrorResponseCode)response.Error; + var error = (ErrorResponseCode) response.Error; if (error == ErrorResponseCode.NoError) { return response; @@ -75,15 +75,25 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top } catch (ResponseTimeoutException ex) { - exception = ExceptionDispatchInfo.Capture(ex); - needToRefreshTopicMetadata = true; - errorDetails = ex.GetType().Name; + exceptionInfo = ExceptionDispatchInfo.Capture(ex); } catch (BrokerConnectionException ex) { - exception = ExceptionDispatchInfo.Capture(ex); + exceptionInfo = ExceptionDispatchInfo.Capture(ex); + } + catch (NoLeaderElectedForPartition ex) + { + exceptionInfo = ExceptionDispatchInfo.Capture(ex); + } + catch (LeaderNotFoundException ex)//the numbar partition of can be change + { + exceptionInfo = ExceptionDispatchInfo.Capture(ex); + } + + if (exceptionInfo != null) + { needToRefreshTopicMetadata = true; - errorDetails = ex.GetType().Name; + errorDetails = exceptionInfo.SourceException.GetType().Name; } retryTime++; @@ -99,9 +109,9 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top _brokerRouter.Log.ErrorFormat("ProtocolGateway sending request failed"); // If an exception was thrown, we want to propagate it - if (exception != null) + if (exceptionInfo != null) { - exception.Throw(); + exceptionInfo.Throw(); } // Otherwise, the error was from Kafka, throwing application exception @@ -114,10 +124,10 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top private static bool CanRecoverByRefreshMetadata(ErrorResponseCode error) { - return error == ErrorResponseCode.BrokerNotAvailable || - error == ErrorResponseCode.ConsumerCoordinatorNotAvailableCode || - error == ErrorResponseCode.LeaderNotAvailable || - error == ErrorResponseCode.NotLeaderForPartition; + return error == ErrorResponseCode.BrokerNotAvailable || + error == ErrorResponseCode.ConsumerCoordinatorNotAvailableCode || + error == ErrorResponseCode.LeaderNotAvailable || + error == ErrorResponseCode.NotLeaderForPartition; } public void Dispose() diff --git a/src/kafka-tests/Fakes/BrokerRouterProxy.cs b/src/kafka-tests/Fakes/BrokerRouterProxy.cs index 01f480a4..54215715 100644 --- a/src/kafka-tests/Fakes/BrokerRouterProxy.cs +++ b/src/kafka-tests/Fakes/BrokerRouterProxy.cs @@ -120,6 +120,7 @@ public static async Task CreateMetadataResponseWithMultipleBro }; } + public static async Task CreateMetadataResponseWithSingleBroker() { return new MetadataResponse @@ -164,6 +165,49 @@ public static async Task CreateMetadataResponseWithSingleBroke }; } + public static async Task CreateMetadataResponseWithNotEndToElectLeader() + { + return new MetadataResponse + { + CorrelationId = 1, + Brokers = new List + { + new Broker + { + Host = "localhost", + Port = 2, + BrokerId = 1 + }, + }, + Topics = new List + { + new Topic + { + ErrorCode = 0, + Name = TestTopic, + Partitions = new List + { + new Partition + { + ErrorCode = 0, + Isrs = new List {1}, + PartitionId = 0, + LeaderId = -1, + Replicas = new List {1}, + }, + new Partition + { + ErrorCode = 0, + Isrs = new List {1}, + PartitionId = 1, + LeaderId = 1, + Replicas = new List {1}, + } + } + } + } + }; + } public static async Task CreateMetaResponseWithException() { throw new Exception(); diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index 48a7a31b..d9ede51c 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -195,6 +195,7 @@ await Task.Run((() => [TestCase(1000, 70)] [TestCase(30000, 550)] [TestCase(50000, 850)] + [TestCase(200000, 8050)] public async Task ConsumerShouldConsumeInSameOrderAsAsyncProduced_dataLoad(int numberOfMessage, int timeoutInMs) { int partition = 0; diff --git a/src/kafka-tests/Unit/BrokerRouterTests.cs b/src/kafka-tests/Unit/BrokerRouterTests.cs index b2fee60f..b70acd0b 100644 --- a/src/kafka-tests/Unit/BrokerRouterTests.cs +++ b/src/kafka-tests/Unit/BrokerRouterTests.cs @@ -134,6 +134,7 @@ public async Task BrokerRouteShouldReturnTopicFromCache() var result1 = router.GetTopicMetadataFromLocalCache(TestTopic); var result2 = router.GetTopicMetadataFromLocalCache(TestTopic); + Assert.AreEqual(1, router.GetAllTopicMetadataFromLocalCache().Count); Assert.That(routerProxy.BrokerConn0.MetadataRequestCallCount, Is.EqualTo(1)); Assert.That(result1.Count, Is.EqualTo(1)); Assert.That(result1[0].Name, Is.EqualTo(TestTopic)); @@ -141,6 +142,17 @@ public async Task BrokerRouteShouldReturnTopicFromCache() Assert.That(result2[0].Name, Is.EqualTo(TestTopic)); } + [Test, Repeat(IntegrationConfig.NumberOfRepeat)] + public async Task BrokerRouteShouldThrowNoLeaderElectedForPartition() + { + var routerProxy = new BrokerRouterProxy(_kernel); + routerProxy.MetadataResponse = BrokerRouterProxy.CreateMetadataResponseWithNotEndToElectLeader; + + var router = routerProxy.Create(); + Assert.Throws(async () =>await router.RefreshMissingTopicMetadata(TestTopic)); + Assert.AreEqual(0, router.GetAllTopicMetadataFromLocalCache().Count); + } + [Test, Repeat(IntegrationConfig.NumberOfRepeat)] public async Task BrokerRouteShouldReturnAllTopicsFromCache() { diff --git a/src/kafka-tests/Unit/ProtocolGatewayTest.cs b/src/kafka-tests/Unit/ProtocolGatewayTest.cs index fb629c93..8488eca2 100644 --- a/src/kafka-tests/Unit/ProtocolGatewayTest.cs +++ b/src/kafka-tests/Unit/ProtocolGatewayTest.cs @@ -58,7 +58,7 @@ public async Task ShouldTryToRefreshMataDataIfCanRecoverByRefreshMetadata(ErrorR Assert.That(routerProxy.BrokerConn0.MetadataRequestCallCount, Is.EqualTo(2)); Assert.That(routerProxy.BrokerConn0.FetchRequestCallCount, Is.EqualTo(2)); } - + [ExpectedException(typeof(FormatException))] [Test, Repeat(IntegrationConfig.NumberOfRepeat)] public async Task ShouldThrowFormatExceptionWhenTopicIsInvalid() @@ -72,14 +72,18 @@ public async Task ShouldThrowFormatExceptionWhenTopicIsInvalid() } [Test, Repeat(IntegrationConfig.NumberOfRepeat)] - public async Task ShouldTryToRefreshMataDataIfSocketException() + [TestCase(typeof(BrokerConnectionException))] + [TestCase(typeof(ResponseTimeoutException))] + [TestCase(typeof(NoLeaderElectedForPartition))] + [TestCase(typeof(LeaderNotFoundException))] + public async Task ShouldTryToRefreshMataDataIfOnExceptions(Type exceptionType) { var routerProxy = new BrokerRouterProxy(_kernel); routerProxy._cacheExpiration = TimeSpan.FromMilliseconds(10); var router = routerProxy.Create(); ProtocolGateway protocolGateway = new ProtocolGateway(router); - routerProxy.BrokerConn0.FetchResponseFunction = FailedInFirstMessageException(typeof(BrokerConnectionException), routerProxy._cacheExpiration); + routerProxy.BrokerConn0.FetchResponseFunction = FailedInFirstMessageException(exceptionType, routerProxy._cacheExpiration); routerProxy.BrokerConn0.MetadataResponseFunction = BrokerRouterProxy.CreateMetadataResponseWithMultipleBrokers; await protocolGateway.SendProtocolRequest(new FetchRequest(), BrokerRouterProxy.TestTopic, _partitionId); From 48a7f88fa884bb2886c3e2c365d3b2bfcbb027f9 Mon Sep 17 00:00:00 2001 From: Eran Ofer Date: Thu, 19 May 2016 17:50:12 +0300 Subject: [PATCH 2/2] update AssemblyVersion --- src/KafkaNetClient/Properties/AssemblyInfo.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/KafkaNetClient/Properties/AssemblyInfo.cs b/src/KafkaNetClient/Properties/AssemblyInfo.cs index ffe8ef8d..cbfae959 100644 --- a/src/KafkaNetClient/Properties/AssemblyInfo.cs +++ b/src/KafkaNetClient/Properties/AssemblyInfo.cs @@ -11,5 +11,5 @@ [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] [assembly: GuidAttribute("eb234ec0-d838-4abd-9224-479ca06f969d")] -[assembly: AssemblyVersion("1.0.1.0")] -[assembly: AssemblyFileVersion("1.0.1.0")] +[assembly: AssemblyVersion("1.0.2.0")] +[assembly: AssemblyFileVersion("1.0.2.0")]