Skip to content

Commit

Permalink
Merge pull request #12 from dennisdoomen/FixConcurreny
Browse files Browse the repository at this point in the history
 Ensures that the fastest completed subscriber doesn't block the others
  • Loading branch information
dennisdoomen authored Jun 15, 2018
2 parents d0aaae9 + e46aa5b commit 85b8b7f
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 157 deletions.

This file was deleted.

6 changes: 3 additions & 3 deletions Src/LiquidProjections.PollingEventStore/Page.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ namespace LiquidProjections.PollingEventStore
{
internal sealed class Page
{
public Page(long previousCheckpoint, IReadOnlyList<Transaction> transactions)
public Page(long precedingCheckpoint, IReadOnlyList<Transaction> transactions)
{
PreviousCheckpoint = previousCheckpoint;
PrecedingCheckpoint = precedingCheckpoint;
Transactions = transactions;
}

/// <summary>
/// Gets the checkpoint as it was requested when loading this page.
/// </summary>
public long PreviousCheckpoint { get; }
public long PrecedingCheckpoint { get; }

public IReadOnlyList<Transaction> Transactions { get; }

Expand Down
159 changes: 67 additions & 92 deletions Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs

Large diffs are not rendered by default.

48 changes: 27 additions & 21 deletions Src/LiquidProjections.PollingEventStore/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ internal sealed class Subscription : IDisposable
private bool isDisposed;
private long lastProcessedCheckpoint;
private readonly Subscriber subscriber;
private readonly TimeSpan pollInterval;
private readonly LogMessage logger;

public Subscription(PollingEventStoreAdapter eventStoreAdapter, long lastProcessedCheckpoint,
Subscriber subscriber, string subscriptionId, LogMessage logger)
Subscriber subscriber, string subscriptionId, TimeSpan pollInterval, LogMessage logger)
{
this.eventStoreAdapter = eventStoreAdapter;
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.subscriber = subscriber;
this.pollInterval = pollInterval;
this.logger = logger;
Id = subscriptionId;
}
Expand Down Expand Up @@ -57,23 +59,23 @@ public void Start()
};

Task = Task.Run(async () =>
{
try
{
await RunAsync(info).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Do nothing.
}
catch (Exception exception)
{
try
{
await RunAsync(info).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Do nothing.
}
catch (Exception exception)
{
logger(() =>
"NEventStore polling task has failed. Event subscription has been cancelled: " +
exception);
}
},
CancellationToken.None);
logger(() =>
"NEventStore polling task has failed. Event subscription has been cancelled: " +
exception);
}
},
CancellationToken.None);
}
}

Expand Down Expand Up @@ -120,6 +122,10 @@ private async Task RunAsync(SubscriptionInfo info)

lastProcessedCheckpoint = page.LastCheckpoint;
}
else
{
await Task.Delay(pollInterval);
}
}
}
}
Expand Down Expand Up @@ -183,11 +189,11 @@ public void Dispose()
}
}

lock (eventStoreAdapter.subscriptionLock)
lock (eventStoreAdapter.SubscriptionLock)
{
eventStoreAdapter.subscriptions.Remove(this);
eventStoreAdapter.Subscriptions.Remove(this);
}

if (Task == null)
{
FinishDisposing();
Expand All @@ -204,7 +210,7 @@ public void Dispose()
private void FinishDisposing()
{
cancellationTokenSource?.Dispose();

#if LIQUIDPROJECTIONS_DIAGNOSTICS
logger(() => $"Subscription {Id} has been stopped.");
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task Then_it_should_recover_automatically_after_its_polling_interva
}
while (actualTransaction == null);

actualTransaction.Id.Should().Be(The<Transaction>().Id.ToString());
actualTransaction.Id.Should().Be(The<Transaction>().Id);
}
}

Expand Down Expand Up @@ -99,9 +99,9 @@ public When_a_commit_is_persisted()
public async Task Then_it_should_convert_the_commit_details_to_a_transaction()
{
Transaction actualTransaction = await transactionHandledSource.Task.TimeoutAfter(30.Seconds());
;

var transaction = The<Transaction>();
actualTransaction.Id.Should().Be(transaction.Id.ToString());
actualTransaction.Id.Should().Be(transaction.Id);
actualTransaction.Checkpoint.Should().Be(transaction.Checkpoint);
actualTransaction.TimeStampUtc.Should().Be(transaction.TimeStampUtc);
actualTransaction.StreamId.Should().Be(transaction.StreamId);
Expand Down Expand Up @@ -338,8 +338,6 @@ public class
GivenSubject<CreateSubscription>
{
private readonly TimeSpan pollingInterval = 500.Milliseconds();
private readonly DateTime utcNow = DateTime.UtcNow;
private IPassiveEventStore eventStore;
private readonly ManualResetEventSlim aSubscriptionStartedLoading = new ManualResetEventSlim();
private readonly ManualResetEventSlim secondSubscriptionCreated = new ManualResetEventSlim();
private readonly ManualResetEventSlim secondSubscriptionReceivedTheTransaction = new ManualResetEventSlim();
Expand All @@ -349,7 +347,7 @@ public class
{
Given(() =>
{
eventStore = A.Fake<IPassiveEventStore>();
IPassiveEventStore eventStore = A.Fake<IPassiveEventStore>();
A.CallTo(() => eventStore.GetFrom(A<long?>.Ignored)).ReturnsLazily(call =>
{
string checkpointString = call.GetArgument<string>(0);
Expand All @@ -370,7 +368,7 @@ public class
return checkpoint > 0
? new Transaction[0]
: new Transaction[] {new TransactionBuilder().WithCheckpoint(1).Build()};
: new[] {new TransactionBuilder().WithCheckpoint(1).Build()};
});
var adapter = new PollingEventStoreAdapter(eventStore, 11, pollingInterval, 100, () => DateTime.UtcNow);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ public IEnumerable<Transaction> BuildAsEnumerable()
return new[] { Build() };
}

public TransactionBuilder WithStreamId(string streamId)
{
this.streamId = streamId;
return this;
}

public TransactionBuilder WithEvents(params Event[] events)
{
return events.Select(WithEvent).Last();
}

public TransactionBuilder WithEvent(Event @event)
{
var eventMessage = new EventEnvelopeBuilder().WithBody(@event).Build();
Expand All @@ -59,12 +48,6 @@ public TransactionBuilder WithEvent(Event @event)
return this;
}

public TransactionBuilder WithEvent(EventEnvelopeBuilder eventEnvelopeBuilder)
{
events.Add(eventEnvelopeBuilder.Build());
return this;
}

public TransactionBuilder At(DateTime timeStamp)
{
this.timeStamp = timeStamp;
Expand Down

0 comments on commit 85b8b7f

Please sign in to comment.