From b4034b3561a63f222ba092f23d510766a3f2c86f Mon Sep 17 00:00:00 2001 From: Dennis Doomen Date: Thu, 14 Jun 2018 10:44:34 +0200 Subject: [PATCH 1/2] Improved naming of variables and concepts --- .../Page.cs | 6 +- .../PollingEventStoreAdapter.cs | 133 +++++++++--------- .../Subscription.cs | 44 +++--- 3 files changed, 92 insertions(+), 91 deletions(-) diff --git a/Src/LiquidProjections.PollingEventStore/Page.cs b/Src/LiquidProjections.PollingEventStore/Page.cs index b25ad38..87854ac 100644 --- a/Src/LiquidProjections.PollingEventStore/Page.cs +++ b/Src/LiquidProjections.PollingEventStore/Page.cs @@ -4,16 +4,16 @@ namespace LiquidProjections.PollingEventStore { internal sealed class Page { - public Page(long previousCheckpoint, IReadOnlyList transactions) + public Page(long precedingCheckpoint, IReadOnlyList transactions) { - PreviousCheckpoint = previousCheckpoint; + PrecedingCheckpoint = precedingCheckpoint; Transactions = transactions; } /// /// Gets the checkpoint as it was requested when loading this page. /// - public long PreviousCheckpoint { get; } + public long PrecedingCheckpoint { get; } public IReadOnlyList Transactions { get; } diff --git a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs index cefaa1d..7929e5d 100644 --- a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs +++ b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs @@ -25,20 +25,19 @@ class PollingEventStoreAdapter : IDisposable { private readonly TimeSpan pollInterval; private readonly int maxPageSize; - private readonly Func getUtcNow; private readonly LogMessage logger; private readonly IPassiveEventStore eventStore; - internal readonly HashSet subscriptions = new HashSet(); + internal readonly HashSet Subscriptions = new HashSet(); private volatile bool isDisposed; - internal readonly object subscriptionLock = new object(); - private Task currentLoader; + internal readonly object SubscriptionLock = new object(); + private Task currentPreloader; /// - /// Stores cached transactions by the checkpoint of their previous transaction. + /// Stores cached transactions by the checkpoint of their PRECEDING transaction. This ensures that + /// gaps in checkpoints will not result in cache misses. /// - private readonly LruCache transactionCacheByPreviousCheckpoint; - private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults; + private readonly LruCache cachedTransactionsByPrecedingCheckpoint; /// /// Creates an adapter that observes an implementation of and efficiently handles @@ -59,20 +58,18 @@ class PollingEventStoreAdapter : IDisposable /// The size of the page of transactions the adapter should load from the event store for every query. /// /// - /// Provides the current date and time in UTC. + /// Obsolete. Only kept in there to prevent breaking changes. /// - public PollingEventStoreAdapter(IPassiveEventStore eventStore, int cacheSize, TimeSpan pollInterval, int maxPageSize, - Func getUtcNow, LogMessage logger = null) + public PollingEventStoreAdapter(IPassiveEventStore eventStore, int cacheSize, TimeSpan pollInterval, int maxPageSize, Func getUtcNow, LogMessage logger = null) { this.eventStore = eventStore; this.pollInterval = pollInterval; this.maxPageSize = maxPageSize; - this.getUtcNow = getUtcNow; this.logger = logger ?? (_ => {}); if (cacheSize > 0) { - transactionCacheByPreviousCheckpoint = new LruCache(cacheSize); + cachedTransactionsByPrecedingCheckpoint = new LruCache(cacheSize); } } @@ -85,15 +82,15 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscribe Subscription subscription; - lock (subscriptionLock) + lock (SubscriptionLock) { if (isDisposed) { throw new ObjectDisposedException(typeof(PollingEventStoreAdapter).FullName); } - subscription = new Subscription(this, lastProcessedCheckpoint ?? 0, subscriber, subscriptionId, logger); - subscriptions.Add(subscription); + subscription = new Subscription(this, lastProcessedCheckpoint ?? 0, subscriber, subscriptionId, pollInterval, logger); + Subscriptions.Add(subscription); } subscription.Start(); @@ -113,7 +110,9 @@ internal async Task GetNextPage(long lastProcessedCheckpoint, string subsc return pageFromCache; } - Page loadedPage = await LoadNextPageSequentially(lastProcessedCheckpoint, subscriptionId).ConfigureAwait(false); + + Page loadedPage = await LoadNextPage(lastProcessedCheckpoint, subscriptionId).ConfigureAwait(false); + if (loadedPage.Transactions.Count == maxPageSize) { StartPreloadingNextPage(loadedPage.LastCheckpoint, subscriptionId); @@ -122,21 +121,19 @@ internal async Task GetNextPage(long lastProcessedCheckpoint, string subsc return loadedPage; } - private Page TryGetNextPageFromCache(long previousCheckpoint, string subscriptionId) + private Page TryGetNextPageFromCache(long precedingCheckpoint, string subscriptionId) { - Transaction cachedNextTransaction; - - if ((transactionCacheByPreviousCheckpoint != null) && transactionCacheByPreviousCheckpoint.TryGet(previousCheckpoint, out cachedNextTransaction)) + if ((cachedTransactionsByPrecedingCheckpoint != null) && cachedTransactionsByPrecedingCheckpoint.TryGet(precedingCheckpoint, out Transaction cachedTransaction)) { - var resultPage = new List(maxPageSize) { cachedNextTransaction }; + var resultPage = new List(maxPageSize) { cachedTransaction }; while (resultPage.Count < maxPageSize) { - long lastCheckpoint = cachedNextTransaction.Checkpoint; + long lastCheckpoint = cachedTransaction.Checkpoint; - if (transactionCacheByPreviousCheckpoint.TryGet(lastCheckpoint, out cachedNextTransaction)) + if (cachedTransactionsByPrecedingCheckpoint.TryGet(lastCheckpoint, out cachedTransaction)) { - resultPage.Add(cachedNextTransaction); + resultPage.Add(cachedTransaction); } else { @@ -152,7 +149,7 @@ private Page TryGetNextPageFromCache(long previousCheckpoint, string subscriptio $"to checkpoint {resultPage.Last().Checkpoint} in the cache."); #endif - return new Page(previousCheckpoint, resultPage); + return new Page(precedingCheckpoint, resultPage); } #if LIQUIDPROJECTIONS_DIAGNOSTICS @@ -160,22 +157,22 @@ private Page TryGetNextPageFromCache(long previousCheckpoint, string subscriptio $"Subscription {subscriptionId} has not found the next transaction in the cache."); #endif - return new Page(previousCheckpoint, new Transaction[0]); + return new Page(precedingCheckpoint, new Transaction[0]); } - private void StartPreloadingNextPage(long previousCheckpoint, string subscriptionId) + private void StartPreloadingNextPage(long precedingCheckpoint, string subscriptionId) { #if LIQUIDPROJECTIONS_DIAGNOSTICS logger(() => $"Subscription {subscriptionId} has started preloading transactions " + - $"after checkpoint {previousCheckpoint}."); + $"after checkpoint {precedingCheckpoint}."); #endif // Ignore result. - Task _ = LoadNextPageSequentially(previousCheckpoint, subscriptionId); + Task _ = LoadNextPage(precedingCheckpoint, subscriptionId); } - private async Task LoadNextPageSequentially(long previousCheckpoint, string subscriptionId) + private async Task LoadNextPage(long precedingCheckpoint, string subscriptionId) { while (true) { @@ -215,43 +212,43 @@ private async Task LoadNextPageSequentially(long previousCheckpoint, strin .ConfigureAwait(false); if (candidatePage.PreviousCheckpoint == previousCheckpoint) + if (candidatePage.PrecedingCheckpoint == precedingCheckpoint) { return candidatePage; } } } - private Task TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(long previousCheckpoint, - string subscriptionId) + private Task TryLoadNextOrWaitForPreLoadingToFinish(long precedingCheckpoint, string subscriptionId) { if (isDisposed) { - return Task.FromResult(new Page(previousCheckpoint, new Transaction[0])); + return Task.FromResult(new Page(precedingCheckpoint, new Transaction[0])); } - TaskCompletionSource taskCompletionSource = null; - bool isTaskOwner = false; - Task loader = Volatile.Read(ref currentLoader); + TaskCompletionSource ourPreloader = null; + bool isOurPreloader = false; + Task loader = Volatile.Read(ref currentPreloader); try { if (loader == null) { - taskCompletionSource = new TaskCompletionSource(); - Task oldLoader = Interlocked.CompareExchange(ref currentLoader, taskCompletionSource.Task, null); - isTaskOwner = oldLoader == null; - loader = isTaskOwner ? taskCompletionSource.Task : oldLoader; + ourPreloader = new TaskCompletionSource(); + Task oldLoader = Interlocked.CompareExchange(ref currentPreloader, ourPreloader.Task, null); + isOurPreloader = (oldLoader == null); + loader = isOurPreloader ? ourPreloader.Task : oldLoader; } return loader; } finally { - if (isTaskOwner) + if (isOurPreloader) { #if LIQUIDPROJECTIONS_DIAGNOSTICS logger(() => $"Subscription {subscriptionId} created a loader {loader.Id} " + - $"for a page after checkpoint {previousCheckpoint}."); + $"for a page after checkpoint {precedingCheckpoint}."); #endif if (isDisposed) @@ -262,12 +259,12 @@ private Task TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo // If the adapter is disposed before the current task is set, we cancel the task // so we do not touch the event store. - taskCompletionSource.SetCanceled(); + ourPreloader.SetCanceled(); } else { // Ignore result. - Task _ = TryLoadNextPageAndMakeLoaderComplete(previousCheckpoint, taskCompletionSource, subscriptionId); + Task _ = TryLoadNextPageAndMakeLoaderComplete(precedingCheckpoint, ourPreloader, subscriptionId); } } else @@ -279,7 +276,7 @@ private Task TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(lo } } - private async Task TryLoadNextPageAndMakeLoaderComplete(long previousCheckpoint, + private async Task TryLoadNextPageAndMakeLoaderComplete(long precedingCheckpoint, TaskCompletionSource loaderCompletionSource, string subscriptionId) { Page nextPage; @@ -288,7 +285,7 @@ private async Task TryLoadNextPageAndMakeLoaderComplete(long previousCheckpoint, { try { - nextPage = await TryLoadNextPage(previousCheckpoint, subscriptionId).ConfigureAwait(false); + nextPage = await LoadAndCachePage(precedingCheckpoint, subscriptionId).ConfigureAwait(false); } finally { @@ -296,7 +293,7 @@ private async Task TryLoadNextPageAndMakeLoaderComplete(long previousCheckpoint, logger(() => $"Loader for subscription {subscriptionId} is no longer the current one."); #endif - Volatile.Write(ref currentLoader, null); + Volatile.Write(ref currentPreloader, null); } } catch (Exception exception) @@ -316,12 +313,12 @@ private async Task TryLoadNextPageAndMakeLoaderComplete(long previousCheckpoint, loaderCompletionSource.SetResult(nextPage); } - private async Task TryLoadNextPage(long previousCheckpoint, string subscriptionId) + private async Task LoadAndCachePage(long precedingCheckpoint, string subscriptionId) { // Maybe it's just loaded to cache. try { - Page cachedPage = TryGetNextPageFromCache(previousCheckpoint, subscriptionId); + Page cachedPage = TryGetNextPageFromCache(precedingCheckpoint, subscriptionId); if (cachedPage.Transactions.Count > 0) { #if LIQUIDPROJECTIONS_DIAGNOSTICS @@ -333,29 +330,28 @@ private async Task TryLoadNextPage(long previousCheckpoint, string subscri catch (Exception exception) { logger(() => - $"Failed getting transactions after checkpoint {previousCheckpoint} from the cache: " + exception); + $"Failed getting transactions after checkpoint {precedingCheckpoint} from the cache: " + exception); } - DateTime timeOfRequestUtc = getUtcNow(); List transactions; try { transactions = await Task .Run(() => eventStore - .GetFrom((previousCheckpoint == 0) ? (long?)null : previousCheckpoint) + .GetFrom((precedingCheckpoint == 0) ? (long?)null : precedingCheckpoint) .Take(maxPageSize) .ToList()) .ConfigureAwait(false); } catch (Exception exception) { - logger(() => $"Failed loading transactions after checkpoint {previousCheckpoint} from NEventStore: " + + logger(() => $"Failed loading transactions after checkpoint {precedingCheckpoint} from NEventStore: " + exception); throw; } - + if (transactions.Count > 0) { #if LIQUIDPROJECTIONS_DIAGNOSTICS @@ -364,16 +360,22 @@ private async Task TryLoadNextPage(long previousCheckpoint, string subscri $"from checkpoint {transactions.First().Checkpoint} to checkpoint {transactions.Last().Checkpoint}."); #endif - if (transactionCacheByPreviousCheckpoint != null) + if (transactions.First().Checkpoint <= precedingCheckpoint) + { + throw new InvalidOperationException( + $"The event store returned a transaction with checkpoint {transactions.First().Checkpoint} that is supposed to be higher than the requested {precedingCheckpoint}"); + } + + if (cachedTransactionsByPrecedingCheckpoint != null) { /* Add to cache in reverse order to prevent other projectors from requesting already loaded transactions which are not added to cache yet. */ for (int index = transactions.Count - 1; index > 0; index--) { - transactionCacheByPreviousCheckpoint.Set(transactions[index - 1].Checkpoint, transactions[index]); + cachedTransactionsByPrecedingCheckpoint.Set(transactions[index - 1].Checkpoint, transactions[index]); } - transactionCacheByPreviousCheckpoint.Set(previousCheckpoint, transactions[0]); + cachedTransactionsByPrecedingCheckpoint.Set(precedingCheckpoint, transactions[0]); #if LIQUIDPROJECTIONS_DIAGNOSTICS logger(() => @@ -389,31 +391,27 @@ from requesting already loaded transactions which are not added to cache yet. */ $"Loader for subscription {subscriptionId} has discovered " + $"that there are no new transactions yet. Next request for the new transactions will be delayed."); #endif - - Volatile.Write( - ref lastSuccessfulPollingRequestWithoutResults, - new CheckpointRequestTimestamp(previousCheckpoint, timeOfRequestUtc)); } - return new Page(previousCheckpoint, transactions); + return new Page(precedingCheckpoint, transactions); } public void Dispose() { - lock (subscriptionLock) + lock (SubscriptionLock) { if (!isDisposed) { isDisposed = true; - foreach (Subscription subscription in subscriptions.ToArray()) + foreach (Subscription subscription in Subscriptions.ToArray()) { subscription.Complete(); } // New loading tasks are no longer started at this point. // After the current loading task is finished, the event store is no longer used and can be disposed. - Task loaderToWaitFor = Volatile.Read(ref currentLoader); + Task loaderToWaitFor = Volatile.Read(ref currentPreloader); try { @@ -447,8 +445,9 @@ interface IPassiveEventStore /// /// The implementation is allowed to return just a limited subset of items at a time. /// It is up to the implementation to decide how many items should be returned at a time. - /// The only requirement is that the implementation should return at least one , + /// The only requirement is that the implementation should return at least one /// if there are any transactions having checkpoint () bigger than given one. + /// However, the checkpoint of the first transaction must be larger than 0. /// /// /// Determines the value of the , next to which s should be loaded from the storage. @@ -464,5 +463,5 @@ interface IPassiveEventStore #else internal #endif - delegate void LogMessage(Func message); + delegate void LogMessage(Func messageFunc); } diff --git a/Src/LiquidProjections.PollingEventStore/Subscription.cs b/Src/LiquidProjections.PollingEventStore/Subscription.cs index 193bbaa..23b2ffe 100644 --- a/Src/LiquidProjections.PollingEventStore/Subscription.cs +++ b/Src/LiquidProjections.PollingEventStore/Subscription.cs @@ -14,14 +14,16 @@ internal sealed class Subscription : IDisposable private bool isDisposed; private long lastProcessedCheckpoint; private readonly Subscriber subscriber; + private readonly TimeSpan pollInterval; private readonly LogMessage logger; public Subscription(PollingEventStoreAdapter eventStoreAdapter, long lastProcessedCheckpoint, - Subscriber subscriber, string subscriptionId, LogMessage logger) + Subscriber subscriber, string subscriptionId, TimeSpan pollInterval, LogMessage logger) { this.eventStoreAdapter = eventStoreAdapter; this.lastProcessedCheckpoint = lastProcessedCheckpoint; this.subscriber = subscriber; + this.pollInterval = pollInterval; this.logger = logger; Id = subscriptionId; } @@ -57,23 +59,23 @@ public void Start() }; Task = Task.Run(async () => + { + try { - try - { - await RunAsync(info).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - // Do nothing. - } - catch (Exception exception) - { - logger(() => - "NEventStore polling task has failed. Event subscription has been cancelled: " + - exception); - } - }, - CancellationToken.None); + await RunAsync(info).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Do nothing. + } + catch (Exception exception) + { + logger(() => + "NEventStore polling task has failed. Event subscription has been cancelled: " + + exception); + } + }, + CancellationToken.None); } } @@ -183,11 +185,11 @@ public void Dispose() } } - lock (eventStoreAdapter.subscriptionLock) + lock (eventStoreAdapter.SubscriptionLock) { - eventStoreAdapter.subscriptions.Remove(this); + eventStoreAdapter.Subscriptions.Remove(this); } - + if (Task == null) { FinishDisposing(); @@ -204,7 +206,7 @@ public void Dispose() private void FinishDisposing() { cancellationTokenSource?.Dispose(); - + #if LIQUIDPROJECTIONS_DIAGNOSTICS logger(() => $"Subscription {Id} has been stopped."); #endif From e46aa5b56ae1ac6e38ad276fadf18bcc320f2c2d Mon Sep 17 00:00:00 2001 From: Dennis Doomen Date: Thu, 14 Jun 2018 10:48:46 +0200 Subject: [PATCH 2/2] Ensures that the fastest completed subscriber doesn't block the others When the fastest subscriber finished processing all transactions, it ended up in a polling loop where no other subscriber would ever receive transactions anymore --- .../CheckpointRequestTimestamp.cs | 16 ------------ .../PollingEventStoreAdapter.cs | 26 +------------------ .../Subscription.cs | 4 +++ .../PollingEventStoreAdapterSpecs.cs | 12 ++++----- .../Properties/AssemblyInfo.cs | 1 - .../TransactionBuilder.cs | 17 ------------ 6 files changed, 10 insertions(+), 66 deletions(-) delete mode 100644 Src/LiquidProjections.PollingEventStore/CheckpointRequestTimestamp.cs diff --git a/Src/LiquidProjections.PollingEventStore/CheckpointRequestTimestamp.cs b/Src/LiquidProjections.PollingEventStore/CheckpointRequestTimestamp.cs deleted file mode 100644 index ccdff18..0000000 --- a/Src/LiquidProjections.PollingEventStore/CheckpointRequestTimestamp.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; - -namespace LiquidProjections.PollingEventStore -{ - internal sealed class CheckpointRequestTimestamp - { - public CheckpointRequestTimestamp(long previousCheckpoint, DateTime dateTimeUtc) - { - PreviousCheckpoint = previousCheckpoint; - DateTimeUtc = dateTimeUtc; - } - - public long PreviousCheckpoint { get; } - public DateTime DateTimeUtc { get; } - } -} \ No newline at end of file diff --git a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs index 7929e5d..a6986ef 100644 --- a/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs +++ b/Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs @@ -36,7 +36,6 @@ class PollingEventStoreAdapter : IDisposable /// Stores cached transactions by the checkpoint of their PRECEDING transaction. This ensures that /// gaps in checkpoints will not result in cache misses. /// - private CheckpointRequestTimestamp lastSuccessfulPollingRequestWithoutResults; private readonly LruCache cachedTransactionsByPrecedingCheckpoint; /// @@ -186,32 +185,9 @@ private async Task LoadNextPage(long precedingCheckpoint, string subscript throw new OperationCanceledException(); } - CheckpointRequestTimestamp effectiveLastExistingCheckpointRequest = - Volatile.Read(ref lastSuccessfulPollingRequestWithoutResults); - - if ((effectiveLastExistingCheckpointRequest != null) && - (effectiveLastExistingCheckpointRequest.PreviousCheckpoint == previousCheckpoint)) - { - TimeSpan timeAfterPreviousRequest = getUtcNow() - effectiveLastExistingCheckpointRequest.DateTimeUtc; - if (timeAfterPreviousRequest < pollInterval) - { - TimeSpan delay = pollInterval - timeAfterPreviousRequest; - -#if LIQUIDPROJECTIONS_DIAGNOSTICS - logger(() => - $"Subscription {subscriptionId} is waiting " + - $"for {delay} before checking for new transactions."); -#endif - - await Task.Delay(delay).ConfigureAwait(false); - } - } - - Page candidatePage = await TryLoadNextPageSequentiallyOrWaitForCurrentLoadingToFinish(previousCheckpoint, - subscriptionId) + Page candidatePage = await TryLoadNextOrWaitForPreLoadingToFinish(precedingCheckpoint, subscriptionId) .ConfigureAwait(false); - if (candidatePage.PreviousCheckpoint == previousCheckpoint) if (candidatePage.PrecedingCheckpoint == precedingCheckpoint) { return candidatePage; diff --git a/Src/LiquidProjections.PollingEventStore/Subscription.cs b/Src/LiquidProjections.PollingEventStore/Subscription.cs index 23b2ffe..c82fd24 100644 --- a/Src/LiquidProjections.PollingEventStore/Subscription.cs +++ b/Src/LiquidProjections.PollingEventStore/Subscription.cs @@ -122,6 +122,10 @@ private async Task RunAsync(SubscriptionInfo info) lastProcessedCheckpoint = page.LastCheckpoint; } + else + { + await Task.Delay(pollInterval); + } } } } diff --git a/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs b/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs index 462b9f1..119cf9a 100644 --- a/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs +++ b/Tests/LiquidProjections.PollingEventStore.Specs/PollingEventStoreAdapterSpecs.cs @@ -58,7 +58,7 @@ public async Task Then_it_should_recover_automatically_after_its_polling_interva } while (actualTransaction == null); - actualTransaction.Id.Should().Be(The().Id.ToString()); + actualTransaction.Id.Should().Be(The().Id); } } @@ -99,9 +99,9 @@ public When_a_commit_is_persisted() public async Task Then_it_should_convert_the_commit_details_to_a_transaction() { Transaction actualTransaction = await transactionHandledSource.Task.TimeoutAfter(30.Seconds()); - ; + var transaction = The(); - actualTransaction.Id.Should().Be(transaction.Id.ToString()); + actualTransaction.Id.Should().Be(transaction.Id); actualTransaction.Checkpoint.Should().Be(transaction.Checkpoint); actualTransaction.TimeStampUtc.Should().Be(transaction.TimeStampUtc); actualTransaction.StreamId.Should().Be(transaction.StreamId); @@ -338,8 +338,6 @@ public class GivenSubject { private readonly TimeSpan pollingInterval = 500.Milliseconds(); - private readonly DateTime utcNow = DateTime.UtcNow; - private IPassiveEventStore eventStore; private readonly ManualResetEventSlim aSubscriptionStartedLoading = new ManualResetEventSlim(); private readonly ManualResetEventSlim secondSubscriptionCreated = new ManualResetEventSlim(); private readonly ManualResetEventSlim secondSubscriptionReceivedTheTransaction = new ManualResetEventSlim(); @@ -349,7 +347,7 @@ public class { Given(() => { - eventStore = A.Fake(); + IPassiveEventStore eventStore = A.Fake(); A.CallTo(() => eventStore.GetFrom(A.Ignored)).ReturnsLazily(call => { string checkpointString = call.GetArgument(0); @@ -370,7 +368,7 @@ public class return checkpoint > 0 ? new Transaction[0] - : new Transaction[] {new TransactionBuilder().WithCheckpoint(1).Build()}; + : new[] {new TransactionBuilder().WithCheckpoint(1).Build()}; }); var adapter = new PollingEventStoreAdapter(eventStore, 11, pollingInterval, 100, () => DateTime.UtcNow); diff --git a/Tests/LiquidProjections.PollingEventStore.Specs/Properties/AssemblyInfo.cs b/Tests/LiquidProjections.PollingEventStore.Specs/Properties/AssemblyInfo.cs index 3795965..bde6984 100644 --- a/Tests/LiquidProjections.PollingEventStore.Specs/Properties/AssemblyInfo.cs +++ b/Tests/LiquidProjections.PollingEventStore.Specs/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following diff --git a/Tests/LiquidProjections.PollingEventStore.Specs/TransactionBuilder.cs b/Tests/LiquidProjections.PollingEventStore.Specs/TransactionBuilder.cs index 0d59e50..e3b727a 100644 --- a/Tests/LiquidProjections.PollingEventStore.Specs/TransactionBuilder.cs +++ b/Tests/LiquidProjections.PollingEventStore.Specs/TransactionBuilder.cs @@ -39,17 +39,6 @@ public IEnumerable BuildAsEnumerable() return new[] { Build() }; } - public TransactionBuilder WithStreamId(string streamId) - { - this.streamId = streamId; - return this; - } - - public TransactionBuilder WithEvents(params Event[] events) - { - return events.Select(WithEvent).Last(); - } - public TransactionBuilder WithEvent(Event @event) { var eventMessage = new EventEnvelopeBuilder().WithBody(@event).Build(); @@ -59,12 +48,6 @@ public TransactionBuilder WithEvent(Event @event) return this; } - public TransactionBuilder WithEvent(EventEnvelopeBuilder eventEnvelopeBuilder) - { - events.Add(eventEnvelopeBuilder.Build()); - return this; - } - public TransactionBuilder At(DateTime timeStamp) { this.timeStamp = timeStamp;