Skip to content

Commit

Permalink
Merge pull request #225 from EventStore/discovery-delay
Browse files Browse the repository at this point in the history
Add a delay if producing the ChannelInfo throws an exception
  • Loading branch information
hayley-jean authored Dec 13, 2022
2 parents ba8d821 + ee0906b commit 8ec1805
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/EventStore.Client/EventStoreClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ protected EventStoreClientBase(EventStoreClientSettings? settings,
_channelInfoProvider = new SharingProvider<ReconnectionRequired, ChannelInfo>(
factory: (endPoint, onBroken) =>
GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token),
factoryRetryDelay: Settings.ConnectivitySettings.DiscoveryInterval,
initialInput: ReconnectionRequired.Rediscover.Instance,
loggerFactory: Settings.LoggerFactory);
}
Expand Down
6 changes: 5 additions & 1 deletion src/EventStore.Client/SharingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ public SharingProvider(ILoggerFactory? loggerFactory) {

internal class SharingProvider<TInput, TOutput> : SharingProvider {
private readonly Func<TInput, Action<TInput>, Task<TOutput>> _factory;
private readonly TimeSpan _factoryRetryDelay;
private readonly TInput _initialInput;
private TaskCompletionSource<TOutput> _currentBox;

public SharingProvider(
Func<TInput, Action<TInput>, Task<TOutput>> factory,
TimeSpan factoryRetryDelay,
TInput initialInput,
ILoggerFactory? loggerFactory = null) : base(loggerFactory) {

_factory = factory;
_factoryRetryDelay = factoryRetryDelay;
_initialInput = initialInput;
_currentBox = new(TaskCreationOptions.RunContinuationsAsynchronously);
_ = FillBoxAsync(_currentBox, input: initialInput);
Expand Down Expand Up @@ -87,7 +90,8 @@ private async Task FillBoxAsync(TaskCompletionSource<TOutput> box, TInput input)
Log.LogDebug("{type} produced!", typeof(TOutput).Name);
} catch (Exception ex) {
await Task.Yield(); // avoid risk of stack overflow
Log.LogDebug(ex, "{type} production failed", typeof(TOutput).Name);
Log.LogDebug(ex, "{type} production failed. Retrying in {delay}", typeof(TOutput).Name, _factoryRetryDelay);
await Task.Delay(_factoryRetryDelay).ConfigureAwait(false);
box.TrySetException(ex);
OnBroken(box, _initialInput);
}
Expand Down
70 changes: 33 additions & 37 deletions test/EventStore.Client.Streams.Tests/reconnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ public reconnection(Fixture fixture) {
public async Task when_the_connection_is_lost() {
var streamName = _fixture.GetStreamName();
var eventCount = 512;
var tcs = new TaskCompletionSource<object?>();
var signal = new TaskCompletionSource<object?>();
var events = new List<ResolvedEvent>();
var resubscribe = new TaskCompletionSource<StreamSubscription>();
var receivedAllEvents = new TaskCompletionSource();
var serverRestarted = new TaskCompletionSource();
var receivedEvents = new List<ResolvedEvent>();
var resubscribed = new TaskCompletionSource<StreamSubscription>();

using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName, FromStream.Start,
EventAppeared, subscriptionDropped: SubscriptionDropped)
Expand All @@ -34,17 +34,17 @@ await _fixture.Client
await Task.Delay(TimeSpan.FromSeconds(2));

await _fixture.TestServer.StartAsync().WithTimeout();
signal.SetResult(null);
serverRestarted.SetResult();

await resubscribe.Task.WithTimeout(TimeSpan.FromSeconds(10));
await resubscribed.Task.WithTimeout(TimeSpan.FromSeconds(10));

await tcs.Task.WithTimeout(TimeSpan.FromSeconds(10));
await receivedAllEvents.Task.WithTimeout(TimeSpan.FromSeconds(10));

async Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) {
await signal.Task;
events.Add(e);
if (events.Count == eventCount) {
tcs.TrySetResult(null);
await serverRestarted.Task;
receivedEvents.Add(e);
if (receivedEvents.Count == eventCount) {
receivedAllEvents.TrySetResult();
}
}

Expand All @@ -53,33 +53,29 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason,
return;
}

if (ex is not RpcException {
Status: {
StatusCode: StatusCode.Unavailable
}
}) {
tcs.TrySetException(ex);
if (ex is not RpcException { Status.StatusCode: StatusCode.Unavailable }) {
receivedAllEvents.TrySetException(ex);
} else {
Resubscribe();

void Resubscribe() {
var task = _fixture.Client.SubscribeToStreamAsync(streamName,
FromStream.After(events[^1].OriginalEventNumber),
EventAppeared, subscriptionDropped: SubscriptionDropped);
task.ContinueWith(_ => resubscribe.SetResult(_.Result),
TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith(_ => {
var ex = _.Exception!.GetBaseException();
if (ex is RpcException {
StatusCode: StatusCode.DeadlineExceeded
}) {
Task.Delay(200).ContinueWith(_ => Resubscribe());
} else {
resubscribe.SetException(_.Exception!.GetBaseException());
}
},
TaskContinuationOptions.OnlyOnFaulted);
var _ = ResubscribeAsync();
}
}

async Task ResubscribeAsync() {
try {
var sub = await _fixture.Client.SubscribeToStreamAsync(
streamName,
FromStream.After(receivedEvents[^1].OriginalEventNumber),
EventAppeared,
subscriptionDropped: SubscriptionDropped);
resubscribed.SetResult(sub);
} catch (Exception ex) {
ex = ex.GetBaseException();

if (ex is RpcException) {
await Task.Delay(200);
var _ = ResubscribeAsync();
} else {
resubscribed.SetException(ex);
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion test/EventStore.Client.Tests/SharingProviderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class SharingProviderTests {
public async Task CanGetCurrent() {
var sut = new SharingProvider<int, int>(
factory: async (x, _) => x + 1,
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: 5);
Assert.Equal(6, await sut.CurrentAsync);
}
Expand All @@ -19,6 +20,7 @@ public async Task CanReset() {
var count = 0;
var sut = new SharingProvider<bool, int>(
factory: async (_, _) => count++,
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: true);

Assert.Equal(0, await sut.CurrentAsync);
Expand All @@ -35,6 +37,7 @@ public async Task CanReturnBroken() {
onBroken = f;
return count++;
},
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: true);

Assert.Equal(0, await sut.CurrentAsync);
Expand All @@ -55,6 +58,7 @@ public async Task CanReturnSameBoxTwice() {
onBroken = f;
return count++;
},
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: true);

Assert.Equal(0, await sut.CurrentAsync);
Expand All @@ -80,6 +84,7 @@ public async Task CanReturnPendingBox() {
await trigger.WaitAsync();
return count;
},
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: true);


Expand Down Expand Up @@ -110,6 +115,7 @@ public async Task CanReturnPendingBox() {
public async Task FactoryCanThrow() {
var sut = new SharingProvider<int, int>(
factory: (x, _) => throw new Exception($"input {x}"),
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: 0);

// exception propagated to consumer
Expand All @@ -131,6 +137,7 @@ public async Task FactoryCanCallOnBrokenSynchronously() {
onBroken(5);
return x;
},
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: 0);

// onBroken was called but it didn't do anything
Expand All @@ -147,6 +154,7 @@ public async Task FactoryCanCallOnBrokenSynchronouslyAndThrow() {
}
return x;
},
factoryRetryDelay: TimeSpan.FromSeconds(0),
initialInput: 0);

var ex = await Assert.ThrowsAsync<Exception>(async () => {
Expand Down Expand Up @@ -184,7 +192,7 @@ async Task<int> Factory(int input, Action<int> onBroken) {
}
}

var sut = new SharingProvider<int, int>(Factory, 0);
var sut = new SharingProvider<int, int>(Factory, TimeSpan.FromSeconds(0), 0);

// got an item (0)
completeConstruction.Release();
Expand Down

0 comments on commit 8ec1805

Please sign in to comment.