From 688a49d9355993b06f88968637989bea97ee7b46 Mon Sep 17 00:00:00 2001 From: Ihar Bury Date: Thu, 14 Sep 2017 08:17:31 +0200 Subject: [PATCH] Fixed potential corner case exceptions when disposing --- .../PollingEventStoreAdapter.cs | 42 ++++++++++++++----- .../Subscription.cs | 10 ++++- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs index aa8366e..f36f108 100644 --- a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs +++ b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs @@ -32,7 +32,6 @@ public class PollingEventStoreAdapter : IDisposable /// private readonly LruCache transactionCacheByPreviousCheckpoint; - private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults; /// @@ -173,11 +172,14 @@ private async Task LoadNextPageSequentially(long previousCheckpoint, strin { while (true) { - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - if (isDisposed) { - return new Page(previousCheckpoint, new Transaction[0]); +#if DEBUG + LogProvider.GetLogger(typeof(PollingEventStoreAdapter)).Debug(() => + $"Page loading for subscription {subscriptionId} cancelled because the adapter is disposed."); +#endif + + throw new OperationCanceledException(); } CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest = @@ -246,8 +248,22 @@ private Task TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo $"for a page after checkpoint {previousCheckpoint}."); #endif - // Ignore result. - Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId); + if (isDisposed) + { +#if DEBUG + LogProvider.GetLogger(typeof(PollingEventStoreAdapter)) + .Debug(() => $"The loader {loader.Id} is cancelled because the adapter is disposed."); +#endif + + // If the adapter is disposed before the current task is set, we cancel the task + // so we do not touch the event store. + taskCompletionSource.SetCanceled(); + } + else + { + // Ignore result. + Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId); + } } else { @@ -394,17 +410,23 @@ public void Dispose() { isDisposed = true; - cancellationTokenSource.Cancel(); - foreach (Subscription subscription in subscriptions.ToArray()) { subscription.Complete(); } + // New loading tasks are no longer started at this point. + // After the current loading task is finished, the event store is no longer used and can be disposed. Task loaderToWaitFor = Volatile.Read(ref currentLoader); - loaderToWaitFor?.Wait(); - cancellationTokenSource.Dispose(); + try + { + loaderToWaitFor?.Wait(); + } + catch (AggregateException) + { + // Ignore. + } (eventStore as IDisposable)?.Dispose(); } diff --git a/Src/LiquidProjections.PollingEventStore/Subscription.cs b/Src/LiquidProjections.PollingEventStore/Subscription.cs index 8fc343b..622472b 100644 --- a/Src/LiquidProjections.PollingEventStore/Subscription.cs +++ b/Src/LiquidProjections.PollingEventStore/Subscription.cs @@ -182,7 +182,15 @@ public void Dispose() cancellationTokenSource.Cancel(); } - Task?.Wait(); + try + { + Task?.Wait(); + } + catch (AggregateException) + { + // Ignore. + } + cancellationTokenSource.Dispose(); }