Skip to content

Commit

Permalink
Fixed potential corner case exceptions when disposing
Browse files Browse the repository at this point in the history
  • Loading branch information
IharBury committed Sep 14, 2017
1 parent 0c293fc commit 688a49d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
42 changes: 32 additions & 10 deletions Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class PollingEventStoreAdapter : IDisposable
/// </summary>
private readonly LruCache<long, Transaction> transactionCacheByPreviousCheckpoint;

private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults;

/// <summary>
Expand Down Expand Up @@ -173,11 +172,14 @@ private async Task<Page> LoadNextPageSequentially(long previousCheckpoint, strin
{
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

if (isDisposed)
{
return new Page(previousCheckpoint, new Transaction[0]);
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter)).Debug(() =>
$"Page loading for subscription {subscriptionId} cancelled because the adapter is disposed.");
#endif

throw new OperationCanceledException();
}

CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest =
Expand Down Expand Up @@ -246,8 +248,22 @@ private Task<Page> TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo
$"for a page after checkpoint {previousCheckpoint}.");
#endif

// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
if (isDisposed)
{
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter))
.Debug(() => $"The loader {loader.Id} is cancelled because the adapter is disposed.");
#endif

// If the adapter is disposed before the current task is set, we cancel the task
// so we do not touch the event store.
taskCompletionSource.SetCanceled();
}
else
{
// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
}
}
else
{
Expand Down Expand Up @@ -394,17 +410,23 @@ public void Dispose()
{
isDisposed = true;

cancellationTokenSource.Cancel();

foreach (Subscription subscription in subscriptions.ToArray())
{
subscription.Complete();
}

// New loading tasks are no longer started at this point.
// After the current loading task is finished, the event store is no longer used and can be disposed.
Task loaderToWaitFor = Volatile.Read(ref currentLoader);
loaderToWaitFor?.Wait();

cancellationTokenSource.Dispose();
try
{
loaderToWaitFor?.Wait();
}
catch (AggregateException)
{
// Ignore.
}

(eventStore as IDisposable)?.Dispose();
}
Expand Down
10 changes: 9 additions & 1 deletion Src/LiquidProjections.PollingEventStore/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,15 @@ public void Dispose()
cancellationTokenSource.Cancel();
}
Task?.Wait();
try
{
Task?.Wait();
}
catch (AggregateException)
{
// Ignore.
}
cancellationTokenSource.Dispose();
}
Expand Down

0 comments on commit 688a49d

Please sign in to comment.