Skip to content

Commit

Permalink
Merge pull request #1 from IharBury/fix-dispose-deadlock
Browse files Browse the repository at this point in the history
Fixed potential deadlock when a subscription is disposed from a transaction handler
  • Loading branch information
dennisdoomen authored Sep 14, 2017
2 parents 84a85fd + 688a49d commit 1872c0a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 36 deletions.
42 changes: 32 additions & 10 deletions Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class PollingEventStoreAdapter : IDisposable
/// </summary>
private readonly LruCache<long, Transaction> transactionCacheByPreviousCheckpoint;

private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults;

/// <summary>
Expand Down Expand Up @@ -173,11 +172,14 @@ private async Task<Page> LoadNextPageSequentially(long previousCheckpoint, strin
{
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

if (isDisposed)
{
return new Page(previousCheckpoint, new Transaction[0]);
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter)).Debug(() =>
$"Page loading for subscription {subscriptionId} cancelled because the adapter is disposed.");
#endif

throw new OperationCanceledException();
}

CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest =
Expand Down Expand Up @@ -246,8 +248,22 @@ private Task<Page> TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo
$"for a page after checkpoint {previousCheckpoint}.");
#endif

// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
if (isDisposed)
{
#if DEBUG
LogProvider.GetLogger(typeof(PollingEventStoreAdapter))
.Debug(() => $"The loader {loader.Id} is cancelled because the adapter is disposed.");
#endif

// If the adapter is disposed before the current task is set, we cancel the task
// so we do not touch the event store.
taskCompletionSource.SetCanceled();
}
else
{
// Ignore result.
Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId);
}
}
else
{
Expand Down Expand Up @@ -394,17 +410,23 @@ public void Dispose()
{
isDisposed = true;

cancellationTokenSource.Cancel();

foreach (Subscription subscription in subscriptions.ToArray())
{
subscription.Complete();
}

// New loading tasks are no longer started at this point.
// After the current loading task is finished, the event store is no longer used and can be disposed.
Task loaderToWaitFor = Volatile.Read(ref currentLoader);
loaderToWaitFor?.Wait();

cancellationTokenSource.Dispose();
try
{
loaderToWaitFor?.Wait();
}
catch (AggregateException)
{
// Ignore.
}

(eventStore as IDisposable)?.Dispose();
}
Expand Down
53 changes: 29 additions & 24 deletions Src/LiquidProjections.PollingEventStore/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,43 +162,48 @@ public void Complete()

public void Dispose()
{
bool isDisposing;

lock (syncRoot)
{
isDisposing = !isDisposed;

if (isDisposing)
if (!isDisposed)
{
isDisposed = true;
}
}

if (isDisposing)
{
if (cancellationTokenSource != null)
{
// Wait for the task asynchronously.
Task.Run(() =>
{
if (cancellationTokenSource != null)
{
#if DEBUG
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped.");
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} is being stopped.");
#endif
if (!cancellationTokenSource.IsCancellationRequested)
{
cancellationTokenSource.Cancel();
}
if (!cancellationTokenSource.IsCancellationRequested)
{
cancellationTokenSource.Cancel();
}
Task?.Wait();
cancellationTokenSource.Dispose();
}
try
{
Task?.Wait();
}
catch (AggregateException)
{
// Ignore.
}
lock (eventStoreAdapter.subscriptionLock)
{
eventStoreAdapter.subscriptions.Remove(this);
}
cancellationTokenSource.Dispose();
}
lock (eventStoreAdapter.subscriptionLock)
{
eventStoreAdapter.subscriptions.Remove(this);
}
#if DEBUG
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped.");
LogProvider.GetLogger(typeof(Subscription)).Debug(() => $"Subscription {Id} has been stopped.");
#endif
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<Compile Include="TransactionBuilder.cs" />
<Compile Include="Event.cs" />
<Compile Include="EventEnvelopeBuilder.cs" />
<Compile Include="NEventStoreAdapterSpecs.cs" />
<Compile Include="PollingEventStoreAdapterSpecs.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TaskExtensions.cs" />
<Compile Include="TestDataBuilder.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace LiquidProjections.PollingEventStore.Specs
{
namespace EventStoreClientSpecs
namespace PollingEventStoreAdapterSpecs
{
public class When_the_persistency_engine_is_temporarily_unavailable : GivenSubject<CreateSubscription>
{
Expand Down Expand Up @@ -418,5 +418,48 @@ public void Then_the_second_subscription_should_not_hang()
}
}
}

public class When_the_subscriber_cancels_the_subscription_from_inside_its_transaction_handler :
GivenSubject<PollingEventStoreAdapter>
{
private readonly TimeSpan pollingInterval = 500.Milliseconds();
private readonly DateTime utcNow = DateTime.UtcNow;
private readonly ManualResetEventSlim disposed = new ManualResetEventSlim();

public When_the_subscriber_cancels_the_subscription_from_inside_its_transaction_handler()
{
Given(() =>
{
UseThe(new TransactionBuilder().WithCheckpoint(123).Build());
UseThe(A.Fake<IPassiveEventStore>());
A.CallTo(() => The<IPassiveEventStore>().GetFrom(A<long?>.Ignored)).Returns(new[] { The<Transaction>() });
WithSubject(_ => new PollingEventStoreAdapter(The<IPassiveEventStore>(), 11, pollingInterval, 100,
() => utcNow));
});

When(() => Subject.Subscribe(null,
new Subscriber
{
HandleTransactions = (transactions, info) =>
{
info.Subscription.Dispose();
disposed.Set();
return Task.FromResult(0);
}
},
"someId"));
}

[Fact]
public void Then_it_should_cancel_the_subscription_asynchronously()
{
if (!disposed.Wait(TimeSpan.FromSeconds(10)))
{
throw new InvalidOperationException("The subscription was not disposed in 10 seconds.");
}
}
}
}
}

0 comments on commit 1872c0a

Please sign in to comment.