Skip to content

Commit

Permalink
Added a sample program to test concurrent subscribers (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisdoomen authored Jun 14, 2018
1 parent 84bf4e2 commit d0aaae9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
6 changes: 6 additions & 0 deletions LiquidProjections.PollingEventStore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiquidProjections.PollingEventStore.Specs", "Tests\LiquidProjections.PollingEventStore.Specs\LiquidProjections.PollingEventStore.Specs.csproj", "{10FD8033-4296-4E33-A295-C046B0A1565B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleSubscriber", "Samples\SampleSubscriber\SampleSubscriber.csproj", "{00122333-DFAD-4496-867A-C8422AAF0EDC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -26,6 +28,10 @@ Global
{10FD8033-4296-4E33-A295-C046B0A1565B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{10FD8033-4296-4E33-A295-C046B0A1565B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{10FD8033-4296-4E33-A295-C046B0A1565B}.Release|Any CPU.Build.0 = Release|Any CPU
{00122333-DFAD-4496-867A-C8422AAF0EDC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{00122333-DFAD-4496-867A-C8422AAF0EDC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{00122333-DFAD-4496-867A-C8422AAF0EDC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00122333-DFAD-4496-867A-C8422AAF0EDC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
83 changes: 83 additions & 0 deletions Samples/SampleSubscriber/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using LiquidProjections;
using LiquidProjections.Abstractions;
using LiquidProjections.PollingEventStore;

namespace SampleSubscriber
{
class Program
{
static void Main(string[] args)
{
// var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 0, 5.Seconds(), 1000, () => DateTime.UtcNow, messageFunc => Console.WriteLine(messageFunc()));
var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 0, 5.Seconds(), 1000, () => DateTime.UtcNow);

int maxSubscrbiers = 5;

for (int id = 0; id < maxSubscrbiers; id++)
{
int localId = id;
adapter.Subscribe(0, new Subscriber
{
HandleTransactions = (transactions, info) =>
{
Console.WriteLine(
$"Subscriber {info.Id} received transactions {transactions.First().Checkpoint} to {transactions.Last().Checkpoint} on thead {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(500);
return Task.FromResult(0);
},
NoSuchCheckpoint = info => Task.FromResult(0)
}, id.ToString());

Console.WriteLine($"Started subscriber {localId}");
}

Console.WriteLine("Press a key to shutdown");
Console.ReadLine();

adapter.Dispose();
}
}

internal class PassiveEventStore : IPassiveEventStore
{
private const long MaxCheckpoint = 100000;

public IEnumerable<Transaction> GetFrom(long? previousCheckpoint)
{
previousCheckpoint = previousCheckpoint ?? 0;
if (previousCheckpoint > MaxCheckpoint)
{
return new List<Transaction>();
}

var transactions = new List<Transaction>();

long firstCheckpoint = previousCheckpoint.Value + 1;

for (long checkpoint = firstCheckpoint; checkpoint < firstCheckpoint + 1000 && checkpoint < MaxCheckpoint; checkpoint++)
{
transactions.Add(new Transaction
{
Checkpoint = checkpoint,
Events = new List<EventEnvelope>(),
Headers = new Dictionary<string, object>(),
Id = Guid.NewGuid().ToString(),
StreamId = Guid.NewGuid().ToString(),
TimeStampUtc = DateTime.UtcNow
});
}

Thread.Sleep(1000);

return transactions;
}
}
}
17 changes: 17 additions & 0 deletions Samples/SampleSubscriber/SampleSubscriber.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Src\LiquidProjections.PollingEventStore\LiquidProjections.PollingEventStore.csproj" />
</ItemGroup>
<ItemGroup>
<Reference Include="FluentAssertions.Core, Version=4.19.3.0, Culture=neutral, PublicKeyToken=33f2691a05b67b6a">
<HintPath>..\..\packages\FluentAssertions.4.19.3\lib\net45\FluentAssertions.Core.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.3.2" />
</ItemGroup>
</Project>

0 comments on commit d0aaae9

Please sign in to comment.