diff --git a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs index aa8366e..f36f108 100644 --- a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs +++ b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs @@ -32,7 +32,6 @@ public class PollingEventStoreAdapter : IDisposable /// private readonly LruCache transactionCacheByPreviousCheckpoint; - private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults; /// @@ -173,11 +172,14 @@ private async Task LoadNextPageSequentially(long previousCheckpoint, strin { while (true) { - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - if (isDisposed) { - return new Page(previousCheckpoint, new Transaction[0]); +#if DEBUG + LogProvider.GetLogger(typeof(PollingEventStoreAdapter)).Debug(() => + $"Page loading for subscription {subscriptionId} cancelled because the adapter is disposed."); +#endif + + throw new OperationCanceledException(); } CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest = @@ -246,8 +248,22 @@ private Task TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo $"for a page after checkpoint {previousCheckpoint}."); #endif - // Ignore result. - Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId); + if (isDisposed) + { +#if DEBUG + LogProvider.GetLogger(typeof(PollingEventStoreAdapter)) + .Debug(() => $"The loader {loader.Id} is cancelled because the adapter is disposed."); +#endif + + // If the adapter is disposed before the current task is set, we cancel the task + // so we do not touch the event store. + taskCompletionSource.SetCanceled(); + } + else + { + // Ignore result. + Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId); + } } else { @@ -394,17 +410,23 @@ public void Dispose() { isDisposed = true; - cancellationTokenSource.Cancel(); - foreach (Subscription subscription in subscriptions.ToArray()) { subscription.Complete(); } + // New loading tasks are no longer started at this point. + // After the current loading task is finished, the event store is no longer used and can be disposed. Task loaderToWaitFor = Volatile.Read(ref currentLoader); - loaderToWaitFor?.Wait(); - cancellationTokenSource.Dispose(); + try + { + loaderToWaitFor?.Wait(); + } + catch (AggregateException) + { + // Ignore. + } (eventStore as IDisposable)?.Dispose(); } diff --git a/Src/LiquidProjections.PollingEventStore/Subscription.cs b/Src/LiquidProjections.PollingEventStore/Subscription.cs index b146b3d..622472b 100644 --- a/Src/LiquidProjections.PollingEventStore/Subscription.cs +++ b/Src/LiquidProjections.PollingEventStore/Subscription.cs @@ -162,43 +162,48 @@ public void Complete() public void Dispose() { - bool isDisposing; - lock (syncRoot) { - isDisposing = !isDisposed; - - if (isDisposing) + if (!isDisposed) { isDisposed = true; - } - } - if (isDisposing) - { - if (cancellationTokenSource != null) - { + // Wait for the task asynchronously. + Task.Run(() => + { + if (cancellationTokenSource != null) + { #if DEBUG - LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped."); + LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped."); #endif - if (!cancellationTokenSource.IsCancellationRequested) - { - cancellationTokenSource.Cancel(); - } + if (!cancellationTokenSource.IsCancellationRequested) + { + cancellationTokenSource.Cancel(); + } - Task?.Wait(); - cancellationTokenSource.Dispose(); - } + try + { + Task?.Wait(); + } + catch (AggregateException) + { + // Ignore. + } - lock (eventStoreAdapter.subscriptionLock) - { - eventStoreAdapter.subscriptions.Remove(this); - } + cancellationTokenSource.Dispose(); + } + + lock (eventStoreAdapter.subscriptionLock) + { + eventStoreAdapter.subscriptions.Remove(this); + } #if DEBUG - LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped."); + LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped."); #endif + }); + } } } } diff --git a/Tests/LiquidProjections.PollingEventStore.Specs/LiquidProjections.PollingEventStore.Specs.csproj b/Tests/LiquidProjections.PollingEventStore.Specs/LiquidProjections.PollingEventStore.Specs.csproj index 4a36466..493d87b 100644 --- a/Tests/LiquidProjections.PollingEventStore.Specs/LiquidProjections.PollingEventStore.Specs.csproj +++ b/Tests/LiquidProjections.PollingEventStore.Specs/LiquidProjections.PollingEventStore.Specs.csproj @@ -70,7 +70,7 @@ - + diff --git a/Tests/LiquidProjections.PollingEventStore.Specs/NEventStoreAdapterSpecs.cs b/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs similarity index 88% rename from Tests/LiquidProjections.PollingEventStore.Specs/NEventStoreAdapterSpecs.cs rename to Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs index 978b992..63a38ac 100644 --- a/Tests/LiquidProjections.PollingEventStore.Specs/NEventStoreAdapterSpecs.cs +++ b/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs @@ -13,7 +13,7 @@ namespace LiquidProjections.PollingEventStore.Specs { - namespace EventStoreClientSpecs + namespace PollingEventStoreAdapterSpecs { public class When_the_persistency_engine_is_temporarily_unavailable : GivenSubject { @@ -418,5 +418,48 @@ public void Then_the_second_subscription_should_not_hang() } } } + + public class When_the_subscriber_cancels_the_subscription_from_inside_its_transaction_handler : + GivenSubject + { + private readonly TimeSpan pollingInterval = 500.Milliseconds(); + private readonly DateTime utcNow = DateTime.UtcNow; + private readonly ManualResetEventSlim disposed = new ManualResetEventSlim(); + + public When_the_subscriber_cancels_the_subscription_from_inside_its_transaction_handler() + { + Given(() => + { + UseThe(new TransactionBuilder().WithCheckpoint(123).Build()); + + UseThe(A.Fake()); + A.CallTo(() => The().GetFrom(A.Ignored)).Returns(new[] { The() }); + + WithSubject(_ => new PollingEventStoreAdapter(The(), 11, pollingInterval, 100, + () => utcNow)); + }); + + When(() => Subject.Subscribe(null, + new Subscriber + { + HandleTransactions = (transactions, info) => + { + info.Subscription.Dispose(); + disposed.Set(); + return Task.FromResult(0); + } + }, + "someId")); + } + + [Fact] + public void Then_it_should_cancel_the_subscription_asynchronously() + { + if (!disposed.Wait(TimeSpan.FromSeconds(10))) + { + throw new InvalidOperationException("The subscription was not disposed in 10 seconds."); + } + } + } } } \ No newline at end of file