Skip to content

Commit

Permalink
Check the cache to preempt any queued requests. (#17)
Browse files Browse the repository at this point in the history
Just before the adapter is going to issue a request to the underlying event store, it will check to see if a majority of the requested page is already in the cache.
  • Loading branch information
dennisdoomen authored Jul 4, 2018
1 parent a0e6537 commit a907a7b
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 107 deletions.
7 changes: 6 additions & 1 deletion Samples/SampleSubscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static void Main(string[] args)
// var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 50000, 5.Seconds(), 1000, () => DateTime.UtcNow, messageFunc => Console.WriteLine(messageFunc()));
var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 50000, TimeSpan.FromSeconds(5), 1000, () => DateTime.UtcNow);

int maxSubscribers = 10;
int maxSubscribers = 50;

for (int id = 0; id < maxSubscribers; id++)
{
Expand Down Expand Up @@ -54,9 +54,12 @@ static void Main(string[] args)
internal class PassiveEventStore : IPassiveEventStore
{
private const long MaxCheckpoint = 100000;
private int nrRequests = 0;

public IEnumerable<Transaction> GetFrom(long? previousCheckpoint)
{
Interlocked.Increment(ref nrRequests);

previousCheckpoint = previousCheckpoint ?? 0;
if (previousCheckpoint > MaxCheckpoint)
{
Expand All @@ -81,6 +84,8 @@ public IEnumerable<Transaction> GetFrom(long? previousCheckpoint)
}

Thread.Sleep(1000);

Console.WriteLine($"Number of event store requests: {nrRequests}");

return transactions;
}
Expand Down
138 changes: 91 additions & 47 deletions Src/LiquidProjections.PollingEventStore/PollingEventStoreAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,14 @@ internal void Unsubscribe(Subscription subscription)
internal async Task<Page> GetNextPage(long precedingCheckpoint, string subscriptionId,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Page pageFromCache = TryGetNextPageFromCache(precedingCheckpoint, subscriptionId, cancellationToken);
if (!pageFromCache.IsEmpty)
{
if (pageFromCache.Transactions.Count < maxPageSize)
{
Task _ = RequestPageLoad(subscriptionId, pageFromCache.LastCheckpoint, cancellationToken);
}

return pageFromCache;
}

Expand Down Expand Up @@ -193,7 +196,6 @@ private Page TryGetNextPageFromCache(long precedingCheckpoint, string subscripti
}
else
{
Task _ = RequestPageLoad(subscriptionId, lastCheckpoint, cancellationToken);
break;
}
}
Expand All @@ -218,7 +220,7 @@ private Task<Page> RequestPageLoad(string subscriptionId, long precedingCheckpoi
PrecedingCheckpoint = precedingCheckpoint,
CancellationToken = cancellationToken
};

pendingRequests.Enqueue(loadRequest);
requestQueued.TrySetResult(true);

Expand All @@ -235,31 +237,7 @@ private void ProcessPendingRequestsAsync()
{
await WaitForPendingRequest();
// Keep the request on the queue, so the subscriber can wait the result of preloading the next page.
if (pendingRequests.TryPeek(out LoadRequest request))
{
try
{
Page page = await LoadPage(request.PrecedingCheckpoint, request.SubscriptionId, request.CancellationToken);
if (!page.IsEmpty)
{
CachePage(page, request.SubscriptionId);
}
request.SetResult(page);
}
catch
{
// Ignore the exception and have the subscriber try again if it wants to.
request.SetResult(null);
}
finally
{
// By now, the result is available in the cache, so subscribers that were too late to await
// the preload can benefit from it as well.
pendingRequests.TryDequeue(out LoadRequest _);
}
}
await ProcessPendingRequest();
}
}
catch (OperationCanceledException)
Expand All @@ -283,6 +261,72 @@ private async Task WaitForPendingRequest()
}
}

private async Task ProcessPendingRequest()
{
// Keep the request on the queue, so successive subscribers for the same page can "await"
// the result of the existing request
if (pendingRequests.TryPeek(out LoadRequest request))
{
try
{
if (IsMajorityCached(request.PrecedingCheckpoint))
{
request.SetResult(TryGetNextPageFromCache(request.PrecedingCheckpoint, request.SubscriptionId,
request.CancellationToken));
}
else
{
Page page = await LoadPage(request.PrecedingCheckpoint, request.SubscriptionId,
request.CancellationToken);

if (!page.IsEmpty)
{
CachePage(page, request.SubscriptionId);
}

request.SetResult(page);
}
}
catch
{
// Ignore the exception and have the subscriber try again if it wants to.
request.SetResult(null);
}
finally
{
// By now, the page should be available in the cache, so subscribers that were too late to "await"
// the current request can still get it from the cache.
pendingRequests.TryDequeue(out LoadRequest _);
}
}
}

/// <summary>
/// Determines whether a majority of the requested page is already cached.
/// </summary>
private bool IsMajorityCached(long requestPrecedingCheckpoint)
{
double fillFactor = 0.5;
int threshold = (int) (maxPageSize * fillFactor);
int actual = 0;

if (cachedTransactionsByPrecedingCheckpoint != null)
{
while (cachedTransactionsByPrecedingCheckpoint.TryGet(requestPrecedingCheckpoint, out Transaction transaction) &&
(actual <= threshold))
{
actual++;
requestPrecedingCheckpoint = transaction.Checkpoint;
}

return actual > threshold;
}
else
{
return false;
}
}

private async Task<Page> LoadPage(long precedingCheckpoint, string subscriptionId, CancellationToken cancellationToken)
{
List<Transaction> transactions;
Expand Down Expand Up @@ -350,33 +394,33 @@ private bool IsCachingEnabled

public void Dispose()
{
lock (subscriptionLock)
if (!disposingCancellationTokenSource.IsCancellationRequested)
{
if (!disposingCancellationTokenSource.IsCancellationRequested)
{
disposingCancellationTokenSource.Cancel();
disposingCancellationTokenSource.Cancel();

lock (subscriptionLock)
{
foreach (Subscription subscription in subscriptions)
{
subscription.Complete();
}
}

// New loading tasks are no longer started at this point.
// After the current loading task is finished, the event store is no longer used and can be disposed.
Task taskToWaitFor = Volatile.Read(ref loader);
// New loading tasks are no longer started at this point.
// After the current loading task is finished, the event store is no longer used and can be disposed.
Task taskToWaitFor = Volatile.Read(ref loader);

try
{
taskToWaitFor?.Wait();
}
catch (AggregateException)
{
// Ignore.
}

// ReSharper disable once SuspiciousTypeConversion.Global
(eventStore as IDisposable)?.Dispose();
try
{
taskToWaitFor?.Wait();
}
catch (AggregateException)
{
// Ignore.
}

// ReSharper disable once SuspiciousTypeConversion.Global
(eventStore as IDisposable)?.Dispose();
}
}
}
Expand Down
Loading

0 comments on commit a907a7b

Please sign in to comment.