Skip to content

Commit

Permalink
Merge pull request #17 from gigya/feature/bug_NoLeaderElectedForParti…
Browse files Browse the repository at this point in the history
…tion

Fix bug in manual consumer regarding leader -1
  • Loading branch information
eran committed May 19, 2016
2 parents e115153 + 48a7f88 commit 5681464
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 21 deletions.
10 changes: 9 additions & 1 deletion src/KafkaNetClient/BrokerRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private BrokerRoute GetCachedRoute(string topic, Partition partition)
var route = TryGetRouteFromCache(topic, partition);
if (route != null) return route;

throw new LeaderNotFoundException(string.Format("Lead broker cannot be found for parition: {0}, leader: {1}", partition.PartitionId, partition.LeaderId));
throw new LeaderNotFoundException(string.Format("Lead broker cannot be found for partition: {0}, leader: {1}", partition.PartitionId, partition.LeaderId));
}

private BrokerRoute TryGetRouteFromCache(string topic, Partition partition)
Expand All @@ -269,6 +269,14 @@ private BrokerRoute TryGetRouteFromCache(string topic, Partition partition)

private void UpdateInternalMetadataCache(MetadataResponse metadata)
{
var noLeaderElectedForPartition =
metadata.Topics.Select(x => new {topic = x.Name, partition = x.Partitions.FirstOrDefault(i => i.LeaderId == -1)})
.FirstOrDefault(x => x.partition != null);

if (noLeaderElectedForPartition != null)
throw new NoLeaderElectedForPartition(string.Format("topic:{0} partition:{1}",
noLeaderElectedForPartition.topic, noLeaderElectedForPartition.partition));

//resolve each broker
var brokerEndpoints = metadata.Brokers.Select(broker => new
{
Expand Down
11 changes: 11 additions & 0 deletions src/KafkaNetClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ await Task.Run(() =>
needToRefreshMetadata = true;
_options.Log.ErrorFormat(ex.Message);
}
catch (NoLeaderElectedForPartition ex)
{
needToRefreshMetadata = true;
_options.Log.ErrorFormat(ex.Message);
}
catch (LeaderNotFoundException ex)//the numbar partition of can be change
{
needToRefreshMetadata = true;
_options.Log.ErrorFormat(ex.Message);
}
catch (TaskCanceledException ex)
{
//TODO :LOG
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaNetClient/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
[assembly: GuidAttribute("eb234ec0-d838-4abd-9224-479ca06f969d")]
[assembly: AssemblyVersion("1.0.1.0")]
[assembly: AssemblyFileVersion("1.0.1.0")]
[assembly: AssemblyVersion("1.0.2.0")]
[assembly: AssemblyFileVersion("1.0.2.0")]
20 changes: 20 additions & 0 deletions src/KafkaNetClient/Protocol/Protocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,26 @@ public InvalidPartitionException(string message, Exception innerException)
}
}

[Serializable]
public class NoLeaderElectedForPartition : ApplicationException
{
public NoLeaderElectedForPartition(string message, params object[] args)
: base(string.Format(message, args))
{
}

public NoLeaderElectedForPartition(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}

public NoLeaderElectedForPartition(string message, Exception innerException)
: base(message, innerException)
{
}
}


[Serializable]
public class BrokerException : ApplicationException
{
Expand Down
40 changes: 25 additions & 15 deletions src/KafkaNetClient/ProtocolGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
int retryTime = 0;
while (retryTime < _maxRetry)
{
bool needToRefreshTopicMetadata;
ExceptionDispatchInfo exception = null;
string errorDetails;
bool needToRefreshTopicMetadata = false;
ExceptionDispatchInfo exceptionInfo = null;
string errorDetails = "";

try
{
Expand All @@ -63,7 +63,7 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
return null;
}

var error = (ErrorResponseCode)response.Error;
var error = (ErrorResponseCode) response.Error;
if (error == ErrorResponseCode.NoError)
{
return response;
Expand All @@ -75,15 +75,25 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
}
catch (ResponseTimeoutException ex)
{
exception = ExceptionDispatchInfo.Capture(ex);
needToRefreshTopicMetadata = true;
errorDetails = ex.GetType().Name;
exceptionInfo = ExceptionDispatchInfo.Capture(ex);
}
catch (BrokerConnectionException ex)
{
exception = ExceptionDispatchInfo.Capture(ex);
exceptionInfo = ExceptionDispatchInfo.Capture(ex);
}
catch (NoLeaderElectedForPartition ex)
{
exceptionInfo = ExceptionDispatchInfo.Capture(ex);
}
catch (LeaderNotFoundException ex)//the numbar partition of can be change
{
exceptionInfo = ExceptionDispatchInfo.Capture(ex);
}

if (exceptionInfo != null)
{
needToRefreshTopicMetadata = true;
errorDetails = ex.GetType().Name;
errorDetails = exceptionInfo.SourceException.GetType().Name;
}

retryTime++;
Expand All @@ -99,9 +109,9 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
_brokerRouter.Log.ErrorFormat("ProtocolGateway sending request failed");

// If an exception was thrown, we want to propagate it
if (exception != null)
if (exceptionInfo != null)
{
exception.Throw();
exceptionInfo.Throw();
}

// Otherwise, the error was from Kafka, throwing application exception
Expand All @@ -114,10 +124,10 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top

private static bool CanRecoverByRefreshMetadata(ErrorResponseCode error)
{
return error == ErrorResponseCode.BrokerNotAvailable ||
error == ErrorResponseCode.ConsumerCoordinatorNotAvailableCode ||
error == ErrorResponseCode.LeaderNotAvailable ||
error == ErrorResponseCode.NotLeaderForPartition;
return error == ErrorResponseCode.BrokerNotAvailable ||
error == ErrorResponseCode.ConsumerCoordinatorNotAvailableCode ||
error == ErrorResponseCode.LeaderNotAvailable ||
error == ErrorResponseCode.NotLeaderForPartition;
}

public void Dispose()
Expand Down
44 changes: 44 additions & 0 deletions src/kafka-tests/Fakes/BrokerRouterProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public static async Task<MetadataResponse> CreateMetadataResponseWithMultipleBro
};
}


public static async Task<MetadataResponse> CreateMetadataResponseWithSingleBroker()
{
return new MetadataResponse
Expand Down Expand Up @@ -164,6 +165,49 @@ public static async Task<MetadataResponse> CreateMetadataResponseWithSingleBroke
};
}

public static async Task<MetadataResponse> CreateMetadataResponseWithNotEndToElectLeader()
{
return new MetadataResponse
{
CorrelationId = 1,
Brokers = new List<Broker>
{
new Broker
{
Host = "localhost",
Port = 2,
BrokerId = 1
},
},
Topics = new List<Topic>
{
new Topic
{
ErrorCode = 0,
Name = TestTopic,
Partitions = new List<Partition>
{
new Partition
{
ErrorCode = 0,
Isrs = new List<int> {1},
PartitionId = 0,
LeaderId = -1,
Replicas = new List<int> {1},
},
new Partition
{
ErrorCode = 0,
Isrs = new List<int> {1},
PartitionId = 1,
LeaderId = 1,
Replicas = new List<int> {1},
}
}
}
}
};
}
public static async Task<MetadataResponse> CreateMetaResponseWithException()
{
throw new Exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ await Task.Run((() =>
[TestCase(1000, 70)]
[TestCase(30000, 550)]
[TestCase(50000, 850)]
[TestCase(200000, 8050)]
public async Task ConsumerShouldConsumeInSameOrderAsAsyncProduced_dataLoad(int numberOfMessage, int timeoutInMs)
{
int partition = 0;
Expand Down
12 changes: 12 additions & 0 deletions src/kafka-tests/Unit/BrokerRouterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,25 @@ public async Task BrokerRouteShouldReturnTopicFromCache()
var result1 = router.GetTopicMetadataFromLocalCache(TestTopic);
var result2 = router.GetTopicMetadataFromLocalCache(TestTopic);

Assert.AreEqual(1, router.GetAllTopicMetadataFromLocalCache().Count);
Assert.That(routerProxy.BrokerConn0.MetadataRequestCallCount, Is.EqualTo(1));
Assert.That(result1.Count, Is.EqualTo(1));
Assert.That(result1[0].Name, Is.EqualTo(TestTopic));
Assert.That(result2.Count, Is.EqualTo(1));
Assert.That(result2[0].Name, Is.EqualTo(TestTopic));
}

[Test, Repeat(IntegrationConfig.NumberOfRepeat)]
public async Task BrokerRouteShouldThrowNoLeaderElectedForPartition()
{
var routerProxy = new BrokerRouterProxy(_kernel);
routerProxy.MetadataResponse = BrokerRouterProxy.CreateMetadataResponseWithNotEndToElectLeader;

var router = routerProxy.Create();
Assert.Throws<NoLeaderElectedForPartition>(async () =>await router.RefreshMissingTopicMetadata(TestTopic));
Assert.AreEqual(0, router.GetAllTopicMetadataFromLocalCache().Count);
}

[Test, Repeat(IntegrationConfig.NumberOfRepeat)]
public async Task BrokerRouteShouldReturnAllTopicsFromCache()
{
Expand Down
10 changes: 7 additions & 3 deletions src/kafka-tests/Unit/ProtocolGatewayTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task ShouldTryToRefreshMataDataIfCanRecoverByRefreshMetadata(ErrorR
Assert.That(routerProxy.BrokerConn0.MetadataRequestCallCount, Is.EqualTo(2));
Assert.That(routerProxy.BrokerConn0.FetchRequestCallCount, Is.EqualTo(2));
}

[ExpectedException(typeof(FormatException))]
[Test, Repeat(IntegrationConfig.NumberOfRepeat)]
public async Task ShouldThrowFormatExceptionWhenTopicIsInvalid()
Expand All @@ -72,14 +72,18 @@ public async Task ShouldThrowFormatExceptionWhenTopicIsInvalid()
}

[Test, Repeat(IntegrationConfig.NumberOfRepeat)]
public async Task ShouldTryToRefreshMataDataIfSocketException()
[TestCase(typeof(BrokerConnectionException))]
[TestCase(typeof(ResponseTimeoutException))]
[TestCase(typeof(NoLeaderElectedForPartition))]
[TestCase(typeof(LeaderNotFoundException))]
public async Task ShouldTryToRefreshMataDataIfOnExceptions(Type exceptionType)
{
var routerProxy = new BrokerRouterProxy(_kernel);
routerProxy._cacheExpiration = TimeSpan.FromMilliseconds(10);
var router = routerProxy.Create();
ProtocolGateway protocolGateway = new ProtocolGateway(router);

routerProxy.BrokerConn0.FetchResponseFunction = FailedInFirstMessageException(typeof(BrokerConnectionException), routerProxy._cacheExpiration);
routerProxy.BrokerConn0.FetchResponseFunction = FailedInFirstMessageException(exceptionType, routerProxy._cacheExpiration);
routerProxy.BrokerConn0.MetadataResponseFunction = BrokerRouterProxy.CreateMetadataResponseWithMultipleBrokers;

await protocolGateway.SendProtocolRequest(new FetchRequest(), BrokerRouterProxy.TestTopic, _partitionId);
Expand Down

0 comments on commit 5681464

Please sign in to comment.