Skip to content

Commit

Permalink
fix #1080 keep track off timeout over multiple delegated calls into c…
Browse files Browse the repository at this point in the history
…luster

Conflicts:
	src/Elasticsearch.Net/Connection/Configuration/IConnectionConfigurationValues.cs
	src/Elasticsearch.Net/Connection/RequestHandlers/RequestHandlerBase.cs
	src/Tests/Elasticsearch.Net.Tests.Unit/Elasticsearch.Net.Tests.Unit.csproj
  • Loading branch information
Mpdreamz authored and gmarz committed Dec 9, 2014
1 parent d0bc85a commit 5c230c5
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class ConnectionConfiguration<T> : IConnectionConfigurationValues, IHideO
private int? _maxDeadTimeout;
int? IConnectionConfigurationValues.MaxDeadTimeout { get{ return _maxDeadTimeout; } }

private TimeSpan? _maxRetryTimeout;
TimeSpan? IConnectionConfigurationValues.MaxRetryTimeout { get{ return _maxRetryTimeout; } }

private string _proxyUsername;
string IConnectionConfigurationValues.ProxyUsername { get{ return _proxyUsername; } }

Expand Down Expand Up @@ -278,6 +281,19 @@ public T SetMaxDeadTimeout(int timeout)
this._maxDeadTimeout = timeout;
return (T) this;
}

/// <summary>
/// Limits the total runtime including retries separately from <see cref="Timeout"/>
/// <pre>
/// When not specified defaults to <see cref="Timeout"/> which itself defaults to 60seconds
/// </pre>
/// </summary>
public T SetMaxRetryTimeout(TimeSpan maxRetryTimeout)
{
this._maxRetryTimeout = maxRetryTimeout;
return (T) this;
}

/// <summary>
/// Semaphore asynchronous connections automatically by giving
/// it a maximum concurrent connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,36 @@

namespace Elasticsearch.Net.Connection
{
//TODO change timeouts to TimeSpans in 2.0?

public interface IConnectionConfigurationValues
{
IConnectionPool ConnectionPool { get; }

int MaximumAsyncConnections { get; }
int Timeout { get; }

/// <summary>
/// The timeout in milliseconds to use for ping calls that are issues to check whether a node is up or not.
/// </summary>
int? PingTimeout { get; }

int? DeadTimeout { get; }
int? MaxDeadTimeout { get; }
int? MaxRetries { get; }

/// <summary>
/// Limits the total runtime including retries separately from <see cref="Timeout"/>
/// <pre>
/// When not specified defaults to <see cref="Timeout"/> which itself defaults to 60seconds
/// </pre>
/// </summary>
TimeSpan? MaxRetryTimeout { get; }

/// <summary>
/// This signals that we do not want to send initial pings to unknown/previously dead nodes
/// and just send the call straightaway
/// </summary>
bool DisablePings { get; }
bool EnableCompressedResponses { get; }

Expand Down
7 changes: 7 additions & 0 deletions src/Elasticsearch.Net/Connection/ITransportDelegator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ internal interface ITransportDelegator

bool SniffingDisabled(IRequestConfiguration requestConfiguration);
bool SniffOnFaultDiscoveredMoreNodes(ITransportRequestState requestState, int retried, ElasticsearchResponse<Stream> streamResponse);

/// <summary>
/// Returns whether the current delegation over nodes took too long and we should quit.
/// if <see cref="ConnectionSettings.SetMaxRetryTimeout"/> is set we'll use that timeout otherwise we default to th value of
/// <see cref="ConnectionSettings.SetTimeout"/> which itself defaults to 60 seconds
/// </summary>
bool TookTooLongToRetry(ITransportRequestState requestState);

/// <summary>
/// Selects next node uri on request state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class RequestHandlerBase
{
protected const int BufferSize = 4096;
protected static readonly string MaxRetryExceptionMessage = "Failed after retrying {2} times: '{0} {1}'. {3}";
protected static readonly string TookTooLongExceptionMessage = "Retry timeout {4} was hit after retrying {2} times: '{0} {1}'. {3}";
protected static readonly string MaxRetryInnerMessage = "InnerException: {0}, InnerMessage: {1}, InnerStackTrace: {2}";

protected readonly IConnectionConfigurationValues _settings;
Expand Down Expand Up @@ -123,39 +124,59 @@ protected bool DoneProcessing<T>(

protected void ThrowMaxRetryExceptionWhenNeeded<T>(TransportRequestState<T> requestState, int maxRetries)
{
if (requestState.Retried < maxRetries) return;
var tookToLong = this._delegator.TookTooLongToRetry(requestState);

//not out of date and we havent depleted our retries, get the hell out of here
if (!tookToLong && requestState.Retried < maxRetries) return;

var innerExceptions = requestState.SeenExceptions.Where(e => e != null).ToList();
var innerException = !innerExceptions.HasAny()
? null
: (innerExceptions.Count() == 1)
? innerExceptions.First()
: new AggregateException(requestState.SeenExceptions);
var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, innerException);

//When we are not using pooling we forcefully rethrow the exception
//and never wrap it in a maxretry exception
if (!requestState.UsingPooling && innerException != null)
throw innerException;

var exceptionMessage = tookToLong
? CreateTookTooLongExceptionMessage(requestState, innerException)
: CreateMaxRetryExceptionMessage(requestState, innerException);
throw new MaxRetryException(exceptionMessage, innerException);
}

protected string CreateInnerExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
{
if (e == null) return null;
var aggregate = e as AggregateException;
if (aggregate == null)
return "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
aggregate = aggregate.Flatten();
var innerExceptions = aggregate.InnerExceptions
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
.ToList();
return "\r\n" + string.Join("\r\n", innerExceptions);
}

protected string CreateMaxRetryExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
{
string innerException = null;
if (e != null)
{
var aggregate = e as AggregateException;
if (aggregate != null)
{
aggregate = aggregate.Flatten();
var innerExceptions = aggregate.InnerExceptions
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
.ToList();
innerException = "\r\n" + string.Join("\r\n", innerExceptions);
}
else
innerException = "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
}
string innerException = CreateInnerExceptionMessage(requestState, e);
var exceptionMessage = MaxRetryExceptionMessage
.F(requestState.Method, requestState.Path, requestState.Retried, innerException);
return exceptionMessage;
}

protected string CreateTookTooLongExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
{
string innerException = CreateInnerExceptionMessage(requestState, e);
var timeout = this._settings.MaxRetryTimeout.GetValueOrDefault(TimeSpan.FromMilliseconds(this._settings.Timeout));
var exceptionMessage = TookTooLongExceptionMessage
.F(requestState.Method, requestState.Path, requestState.Retried, innerException, timeout);
return exceptionMessage;
}

protected void OptionallyCloseResponseStreamAndSetSuccess<T>(
ITransportRequestState requestState,
ElasticsearchServerError error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public interface ITransportRequestState
Uri CreatePathOnCurrentNode(string path);
IRequestConfiguration RequestConfiguration { get; }
int Retried { get; }
DateTime StartedOn { get; }
bool SniffedOnConnectionFailure { get; set; }
int? Seed { get; set; }
Uri CurrentNode { get; set; }
Expand Down
24 changes: 22 additions & 2 deletions src/Elasticsearch.Net/Connection/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ bool ITransportDelegator.Ping(ITransportRequestState requestState)
using (response.Response)
return response.Success;
}
catch(ElasticsearchAuthenticationException)
catch (ElasticsearchAuthenticationException)
{
throw;
}
Expand Down Expand Up @@ -188,7 +188,7 @@ IList<Uri> ITransportDelegator.Sniff(ITransportRequestState ownerState = null)
}
if (response.HttpStatusCode.HasValue && response.HttpStatusCode == (int)HttpStatusCode.Unauthorized)
throw new ElasticsearchAuthenticationException(response);
if (response.Response == null)
if (response.Response == null)
return null;

using (response.Response)
Expand Down Expand Up @@ -246,6 +246,26 @@ void ITransportDelegator.SniffOnConnectionFailure(ITransportRequestState request

/* REQUEST STATE *** ********************************************/

/// <summary>
/// Returns whether the current delegation over nodes took too long and we should quit.
/// if <see cref="ConnectionSettings.SetMaxRetryTimeout"/> is set we'll use that timeout otherwise we default to th value of
/// <see cref="ConnectionSettings.SetTimeout"/> which itself defaults to 60 seconds
/// </summary>
bool ITransportDelegator.TookTooLongToRetry(ITransportRequestState requestState)
{
var timeout = this.Settings.MaxRetryTimeout.GetValueOrDefault(TimeSpan.FromMilliseconds(this.Settings.Timeout));
var startedOn = requestState.StartedOn;
var now = this._dateTimeProvider.Now();

//we apply a soft margin so that if a request timesout at 59 seconds when the maximum is 60
//we also abort.
var margin = (timeout.TotalMilliseconds / 100.0) * 98;
var marginTimeSpan = TimeSpan.FromMilliseconds(margin);
var timespanCall = (now - startedOn);
var tookToLong = timespanCall >= marginTimeSpan;
return tookToLong;
}

/// <summary>
/// Returns either the fixed maximum set on the connection configuration settings or the number of nodes
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void SniffOnConnectionFaultCausesSniffOn503()
Assert.Throws<MaxRetryException>(()=>client1.Info()); //info call 5

sniffCall.MustHaveHappened(Repeated.Exactly.Once);
nowCall.MustHaveHappened(Repeated.Exactly.Times(8));
nowCall.MustHaveHappened(Repeated.Exactly.Times(10));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
<Compile Include="Exceptions\ExceptionsBubbleTests.cs" />
<Compile Include="Failover\Concurrent\ConcurrencyTestConnection.cs" />
<Compile Include="Failover\Retries\ClientExceptionRetryHandlingTests.cs" />
<Compile Include="Failover\Timeout\DontRetryAfterMaxRetryTimeoutTests.cs" />
<Compile Include="Failover\Timeout\DontRetryAfterDefaultTimeoutTests.cs" />
<Compile Include="Memory\Helpers\AsyncMemorySetup.cs" />
<Compile Include="Memory\Helpers\IMemorySetup.cs" />
<Compile Include="Memory\ResponseAsyncCodePathsMemoryTests.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Autofac;
using Autofac.Extras.FakeItEasy;
using Elasticsearch.Net.Connection;
using Elasticsearch.Net.Connection.Configuration;
using Elasticsearch.Net.ConnectionPool;
using Elasticsearch.Net.Exceptions;
using Elasticsearch.Net.Providers;
using Elasticsearch.Net.Tests.Unit.Stubs;
using FakeItEasy;
using FluentAssertions;
using NUnit.Framework;

namespace Elasticsearch.Net.Tests.Unit.Failover.Timeout
{
[TestFixture]
public class DontRetryAfterDefaultTimeoutTests
{
[Test]
public void FailEarlyIfTimeoutIsExhausted()
{
using (var fake = new AutoFake())
{
var dateTimeProvider = ProvideDateTimeProvider(fake);
var config = ProvideConfiguration(dateTimeProvider);
var connection = ProvideConnection(fake, config, dateTimeProvider);

var getCall = FakeCalls.GetSyncCall(fake);
var ok = FakeResponse.Ok(config);
var bad = FakeResponse.Bad(config);
getCall.ReturnsNextFromSequence(
bad, //info 1 - 9204
bad, //info 2 - 9203 DEAD
ok //info 2 retry - 9202
);

var seenNodes = new List<Uri>();
getCall.Invokes((Uri u, IRequestConfiguration o) => seenNodes.Add(u));

var pingCall = FakeCalls.PingAtConnectionLevel(fake);
pingCall.Returns(ok);

var client1 = fake.Resolve<ElasticsearchClient>();

//event though the third node should have returned ok, the first 2 calls took a minute
var e = Assert.Throws<MaxRetryException>(() => client1.Info());
e.Message.Should()
.StartWith("Retry timeout 00:01:00 was hit after retrying 1 times:");

IElasticsearchResponse response = null;
Assert.DoesNotThrow(() => response = client1.Info() );
response.Should().NotBeNull();
response.Success.Should().BeTrue();

}
}

[Test]
public void FailEarlyIfTimeoutIsExhausted_Async()
{
using (var fake = new AutoFake())
{
var dateTimeProvider = ProvideDateTimeProvider(fake);
var config = ProvideConfiguration(dateTimeProvider);
var connection = ProvideConnection(fake, config, dateTimeProvider);

var getCall = FakeCalls.GetCall(fake);
var ok = Task.FromResult(FakeResponse.Ok(config));
var bad = Task.FromResult(FakeResponse.Bad(config));
getCall.ReturnsNextFromSequence(
bad,
bad,
ok
);

var seenNodes = new List<Uri>();
getCall.Invokes((Uri u, IRequestConfiguration o) => seenNodes.Add(u));

var pingCall = FakeCalls.PingAtConnectionLevelAsync(fake);
pingCall.Returns(ok);

var client1 = fake.Resolve<ElasticsearchClient>();
//event though the third node should have returned ok, the first 2 calls took a minute
var e = Assert.Throws<MaxRetryException>(async () => await client1.InfoAsync());
e.Message.Should()
.StartWith("Retry timeout 00:01:00 was hit after retrying 1 times:");

IElasticsearchResponse response = null;
Assert.DoesNotThrow(async () => response = await client1.InfoAsync() );
response.Should().NotBeNull();
response.Success.Should().BeTrue();
}
}

private static IConnection ProvideConnection(AutoFake fake, ConnectionConfiguration config, IDateTimeProvider dateTimeProvider)
{
fake.Provide<IConnectionConfigurationValues>(config);
var param = new TypedParameter(typeof(IDateTimeProvider), dateTimeProvider);
var transport = fake.Provide<ITransport, Transport>(param);
var connection = fake.Resolve<IConnection>();
return connection;
}

private static ConnectionConfiguration ProvideConfiguration(IDateTimeProvider dateTimeProvider)
{
var connectionPool = new StaticConnectionPool(new[]
{
new Uri("http://localhost:9204"),
new Uri("http://localhost:9203"),
new Uri("http://localhost:9202"),
new Uri("http://localhost:9201")
}, randomizeOnStartup: false, dateTimeProvider: dateTimeProvider);
var config = new ConnectionConfiguration(connectionPool).EnableMetrics();
return config;
}

private static IDateTimeProvider ProvideDateTimeProvider(AutoFake fake)
{
var now = DateTime.UtcNow;
var dateTimeProvider = fake.Resolve<IDateTimeProvider>();
var nowCall = A.CallTo(() => dateTimeProvider.Now());
nowCall.ReturnsNextFromSequence(
now, //initital sniff now from constructor
now, //pool select next node
now.AddSeconds(30), //info 1 took to long?
now.AddSeconds(30), //pool select next node?
now.AddMinutes(1) //info 2 took to long?
);
A.CallTo(() => dateTimeProvider.AliveTime(A<Uri>._, A<int>._)).Returns(new DateTime());
//dead time will return a fixed timeout of 1 minute
A.CallTo(() => dateTimeProvider.DeadTime(A<Uri>._, A<int>._, A<int?>._, A<int?>._))
.Returns(DateTime.UtcNow.AddMinutes(1));
//make sure the transport layer uses a different datetimeprovider
fake.Provide<IDateTimeProvider>(new DateTimeProvider());
return dateTimeProvider;
}
}
}
Loading

0 comments on commit 5c230c5

Please sign in to comment.