diff --git a/LiquidProjections.PollingEventStore.sln b/LiquidProjections.PollingEventStore.sln index 5912494..163e281 100644 --- a/LiquidProjections.PollingEventStore.sln +++ b/LiquidProjections.PollingEventStore.sln @@ -12,6 +12,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiquidProjections.PollingEventStore.Specs", "Tests\LiquidProjections.PollingEventStore.Specs\LiquidProjections.PollingEventStore.Specs.csproj", "{10FD8033-4296-4E33-A295-C046B0A1565B}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SampleSubscriber", "Samples\SampleSubscriber\SampleSubscriber.csproj", "{00122333-DFAD-4496-867A-C8422AAF0EDC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -26,6 +28,10 @@ Global {10FD8033-4296-4E33-A295-C046B0A1565B}.Debug|Any CPU.Build.0 = Debug|Any CPU {10FD8033-4296-4E33-A295-C046B0A1565B}.Release|Any CPU.ActiveCfg = Release|Any CPU {10FD8033-4296-4E33-A295-C046B0A1565B}.Release|Any CPU.Build.0 = Release|Any CPU + {00122333-DFAD-4496-867A-C8422AAF0EDC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {00122333-DFAD-4496-867A-C8422AAF0EDC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00122333-DFAD-4496-867A-C8422AAF0EDC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {00122333-DFAD-4496-867A-C8422AAF0EDC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Samples/SampleSubscriber/Program.cs b/Samples/SampleSubscriber/Program.cs new file mode 100644 index 0000000..c4499d0 --- /dev/null +++ b/Samples/SampleSubscriber/Program.cs @@ -0,0 +1,83 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using LiquidProjections; +using LiquidProjections.Abstractions; +using LiquidProjections.PollingEventStore; + +namespace SampleSubscriber +{ + class Program + { + static void Main(string[] args) + { +// var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 0, 5.Seconds(), 1000, () => DateTime.UtcNow, messageFunc => Console.WriteLine(messageFunc())); + var adapter = new PollingEventStoreAdapter(new PassiveEventStore(), 0, 5.Seconds(), 1000, () => DateTime.UtcNow); + + int maxSubscrbiers = 5; + + for (int id = 0; id < maxSubscrbiers; id++) + { + int localId = id; + adapter.Subscribe(0, new Subscriber + { + HandleTransactions = (transactions, info) => + { + Console.WriteLine( + $"Subscriber {info.Id} received transactions {transactions.First().Checkpoint} to {transactions.Last().Checkpoint} on thead {Thread.CurrentThread.ManagedThreadId}"); + + Thread.Sleep(500); + + return Task.FromResult(0); + }, + NoSuchCheckpoint = info => Task.FromResult(0) + }, id.ToString()); + + Console.WriteLine($"Started subscriber {localId}"); + } + + Console.WriteLine("Press a key to shutdown"); + Console.ReadLine(); + + adapter.Dispose(); + } + } + + internal class PassiveEventStore : IPassiveEventStore + { + private const long MaxCheckpoint = 100000; + + public IEnumerable GetFrom(long? previousCheckpoint) + { + previousCheckpoint = previousCheckpoint ?? 0; + if (previousCheckpoint > MaxCheckpoint) + { + return new List(); + } + + var transactions = new List(); + + long firstCheckpoint = previousCheckpoint.Value + 1; + + for (long checkpoint = firstCheckpoint; checkpoint < firstCheckpoint + 1000 && checkpoint < MaxCheckpoint; checkpoint++) + { + transactions.Add(new Transaction + { + Checkpoint = checkpoint, + Events = new List(), + Headers = new Dictionary(), + Id = Guid.NewGuid().ToString(), + StreamId = Guid.NewGuid().ToString(), + TimeStampUtc = DateTime.UtcNow + }); + } + + Thread.Sleep(1000); + + return transactions; + } + } +} \ No newline at end of file diff --git a/Samples/SampleSubscriber/SampleSubscriber.csproj b/Samples/SampleSubscriber/SampleSubscriber.csproj new file mode 100644 index 0000000..5e839a2 --- /dev/null +++ b/Samples/SampleSubscriber/SampleSubscriber.csproj @@ -0,0 +1,17 @@ + + + Exe + netcoreapp2.0 + + + + + + + ..\..\packages\FluentAssertions.4.19.3\lib\net45\FluentAssertions.Core.dll + + + + + + \ No newline at end of file