From 8cd17e408283f40e5cd3d0276ca881580f01656b Mon Sep 17 00:00:00 2001 From: Aleksandr Evdokimov Date: Wed, 3 Apr 2019 22:50:31 +0500 Subject: [PATCH 1/5] Add binary payload format (#42) * Add binary payload format * Add binary format tests --- .../MongoDbJournalSpec.cs | 32 ++++++++++ .../MongoDbSettingsSpec.cs | 2 + .../MongoDbSnapshotStoreSpec.cs | 30 +++++++++ .../Journal/JournalEntry.cs | 5 +- .../Journal/MongoDbJournal.cs | 62 ++++++++++++++++++- .../MongoDbSettings.cs | 18 +++++- .../SerializationResult.cs | 16 +++++ .../Snapshot/MongoDbSnapshotStore.cs | 61 ++++++++++++++++-- .../Snapshot/SnapshotEntry.cs | 6 ++ src/Akka.Persistence.MongoDb/reference.conf | 6 ++ 10 files changed, 227 insertions(+), 11 deletions(-) create mode 100644 src/Akka.Persistence.MongoDb/SerializationResult.cs diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs index 38f059a..876657b 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs @@ -41,4 +41,36 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong return ConfigurationFactory.ParseString(specString); } } + + [Collection("MongoDbSpec")] + public class MongoDbBinaryJournalSpec : JournalSpec, IClassFixture + { + protected override bool SupportsRejectingNonSerializableObjects { get; } = false; + + public MongoDbBinaryJournalSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbJournalSpec") + { + Initialize(); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + @""" + auto-initialize = on + collection = ""EventJournal"" + stored-as = binary + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + } } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs index ce5f864..51409ab 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs @@ -22,6 +22,7 @@ public void Mongo_JournalSettings_must_have_default_values() mongoPersistence.JournalSettings.AutoInitialize.Should().BeFalse(); mongoPersistence.JournalSettings.Collection.Should().Be("EventJournal"); mongoPersistence.JournalSettings.MetadataCollection.Should().Be("Metadata"); + mongoPersistence.JournalSettings.StoredAs.Should().BeOfType(); } [Fact] @@ -32,6 +33,7 @@ public void Mongo_SnapshotStoreSettingsSettings_must_have_default_values() mongoPersistence.SnapshotStoreSettings.ConnectionString.Should().Be(string.Empty); mongoPersistence.SnapshotStoreSettings.AutoInitialize.Should().BeFalse(); mongoPersistence.SnapshotStoreSettings.Collection.Should().Be("SnapshotStore"); + mongoPersistence.SnapshotStoreSettings.StoredAs.Should().BeOfType(); } } } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs index 87d9a4e..8109ca8 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs @@ -39,4 +39,34 @@ class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persisten return ConfigurationFactory.ParseString(specString); } } + + [Collection("MongoDbSpec")] + public class MongoDbBinarySnapshotStoreSpec : SnapshotStoreSpec, IClassFixture + { + public MongoDbBinarySnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec") + { + Initialize(); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = ""akka.persistence.snapshot-store.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + @""" + auto-initialize = on + collection = ""SnapshotStore"" + stored-as = binary + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + } } diff --git a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs index 3733428..3f0069e 100644 --- a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs +++ b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs @@ -13,7 +13,7 @@ namespace Akka.Persistence.MongoDb.Journal /// Class used for storing intermediate result of the /// as BsonDocument into the MongoDB-Collection /// - public class JournalEntry + public class JournalEntry { [BsonId] public string Id { get; set; } @@ -32,5 +32,8 @@ public class JournalEntry [BsonElement("Manifest")] public string Manifest { get; set; } + + [BsonElement("SerializerId")] + public int? SerializerId { get; set; } } } diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs index ba97673..38465cb 100644 --- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs @@ -12,6 +12,8 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Journal; +using Akka.Serialization; +using Akka.Util; using MongoDB.Driver; namespace Akka.Persistence.MongoDb.Journal @@ -22,13 +24,43 @@ namespace Akka.Persistence.MongoDb.Journal public class MongoDbJournal : AsyncWriteJournal { private readonly MongoDbJournalSettings _settings; + private Lazy _mongoDatabase; private Lazy> _journalCollection; private Lazy> _metadataCollection; + private readonly Func _serialize; + private readonly Func _deserialize; + public MongoDbJournal() { _settings = MongoDbPersistence.Get(Context.System).JournalSettings; + + var serialization = Context.System.Serialization; + switch (_settings.StoredAs) + { + case StoredAsType.Binary: + _serialize = representation => + { + var serializer = serialization.FindSerializerFor(representation.Payload); + return new SerializationResult(serializer.ToBinary(representation.Payload), serializer); + }; + _deserialize = (type, serialized, manifest, serializerId) => + { + if (serializerId.HasValue) + { + return serialization.Deserialize((byte[]) serialized, serializerId.Value, manifest); + } + + var deserializer = serialization.FindSerializerForType(type); + return deserializer.FromBinary((byte[])serialized, type); + }; + break; + default: + _serialize = representation => new SerializationResult(representation.Payload, null); + _deserialize = (type, serialized, manifest, serializerId) => serialized; + break; + } } protected override void PreStart() @@ -147,20 +179,44 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque private JournalEntry ToJournalEntry(IPersistentRepresentation message) { + var serializationResult = _serialize(message); + var serializer = serializationResult.Serializer; + var hasSerializer = serializer != null; + + var manifest = ""; + if (hasSerializer && serializer is SerializerWithStringManifest) + manifest = ((SerializerWithStringManifest)serializer).Manifest(message.Payload); + else if (hasSerializer && serializer.IncludeManifest) + manifest = message.GetType().TypeQualifiedName(); + else + manifest = string.IsNullOrEmpty(message.Manifest) + ? message.GetType().TypeQualifiedName() + : message.Manifest; + return new JournalEntry { Id = message.PersistenceId + "_" + message.SequenceNr, IsDeleted = message.IsDeleted, - Payload = message.Payload, + Payload = serializationResult.Payload, PersistenceId = message.PersistenceId, SequenceNr = message.SequenceNr, - Manifest = message.Manifest + Manifest = manifest, + SerializerId = serializer?.Identifier }; } private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sender) { - return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender); + int? serializerId = null; + Type type = null; + if (!entry.SerializerId.HasValue) + type = Type.GetType(entry.Manifest, true); + else + serializerId = entry.SerializerId; + + var deserialized = _deserialize(type, entry.Payload, entry.Manifest, serializerId); + + return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender); } private async Task SetHighSequenceId(IList messages) diff --git a/src/Akka.Persistence.MongoDb/MongoDbSettings.cs b/src/Akka.Persistence.MongoDb/MongoDbSettings.cs index cce522f..f725537 100644 --- a/src/Akka.Persistence.MongoDb/MongoDbSettings.cs +++ b/src/Akka.Persistence.MongoDb/MongoDbSettings.cs @@ -10,6 +10,12 @@ namespace Akka.Persistence.MongoDb { + public enum StoredAsType + { + Object, + Binary + } + /// /// Settings for the MongoDB persistence implementation, parsed from HOCON configuration. /// @@ -30,15 +36,24 @@ public abstract class MongoDbSettings /// public string Collection { get; private set; } + /// + /// Specifies data type for payload column. + /// + public StoredAsType StoredAs { get; private set; } + protected MongoDbSettings(Config config) { ConnectionString = config.GetString("connection-string"); Collection = config.GetString("collection"); AutoInitialize = config.GetBoolean("auto-initialize"); + + StoredAs = StoredAsType.Object; + + if (Enum.TryParse(config.GetString("stored-as"), true, out StoredAsType storedAs)) + StoredAs = storedAs; } } - /// /// Settings for the MongoDB journal implementation, parsed from HOCON configuration. /// @@ -56,7 +71,6 @@ public MongoDbJournalSettings(Config config) : base(config) } } - /// /// Settings for the MongoDB snapshot implementation, parsed from HOCON configuration. /// diff --git a/src/Akka.Persistence.MongoDb/SerializationResult.cs b/src/Akka.Persistence.MongoDb/SerializationResult.cs new file mode 100644 index 0000000..f779644 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/SerializationResult.cs @@ -0,0 +1,16 @@ +using Akka.Serialization; + +namespace Akka.Persistence.MongoDb +{ + internal class SerializationResult + { + public SerializationResult(object payload, Serializer serializer) + { + Payload = payload; + Serializer = serializer; + } + + public object Payload { get; } + public Serializer Serializer { get; } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs index 8e702a5..d8a28cc 100644 --- a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs +++ b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs @@ -8,6 +8,8 @@ using System; using System.Threading.Tasks; using Akka.Persistence.Snapshot; +using Akka.Serialization; +using Akka.Util; using MongoDB.Driver; namespace Akka.Persistence.MongoDb.Snapshot @@ -18,11 +20,38 @@ namespace Akka.Persistence.MongoDb.Snapshot public class MongoDbSnapshotStore : SnapshotStore { private readonly MongoDbSnapshotSettings _settings; + private Lazy> _snapshotCollection; + private readonly Func _serialize; + private readonly Func _deserialize; + public MongoDbSnapshotStore() { _settings = MongoDbPersistence.Get(Context.System).SnapshotStoreSettings; + + var serialization = Context.System.Serialization; + switch (_settings.StoredAs) + { + case StoredAsType.Binary: + _serialize = o => + { + var serializer = serialization.FindSerializerFor(o); + return new SerializationResult(serializer.ToBinary(o), serializer); + }; + _deserialize = (type, serialized, manifest, serializerId) => + { + if (serializerId.HasValue) + return serialization.Deserialize((byte[]) serialized, serializerId.Value, manifest); + var deserializer = serialization.FindSerializerForType(type); + return deserializer.FromBinary((byte[]) serialized, type); + }; + break; + default: + _serialize = o => new SerializationResult(o, null); + _deserialize = (type, serialized, manifest, serializerId) => serialized; + break; + } } protected override void PreStart() @@ -117,21 +146,43 @@ private static FilterDefinition CreateRangeFilter(string persiste return filter; } - private static SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) + private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) { + var serializationResult = _serialize(snapshot); + var serializer = serializationResult.Serializer; + var hasSerializer = serializer != null; + + var manifest = ""; + if (hasSerializer && serializer is SerializerWithStringManifest) + manifest = ((SerializerWithStringManifest)serializer).Manifest(snapshot); + else if (hasSerializer && serializer.IncludeManifest) + manifest = snapshot.GetType().TypeQualifiedName(); + else + manifest = snapshot.GetType().TypeQualifiedName(); + return new SnapshotEntry { Id = metadata.PersistenceId + "_" + metadata.SequenceNr, PersistenceId = metadata.PersistenceId, SequenceNr = metadata.SequenceNr, - Snapshot = snapshot, - Timestamp = metadata.Timestamp.Ticks + Snapshot = serializationResult.Payload, + Timestamp = metadata.Timestamp.Ticks, + Manifest = manifest, + SerializerId = serializer?.Identifier }; } - private static SelectedSnapshot ToSelectedSnapshot(SnapshotEntry entry) + private SelectedSnapshot ToSelectedSnapshot(SnapshotEntry entry) { - return new SelectedSnapshot(new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), entry.Snapshot); + Type type = null; + + if (!string.IsNullOrEmpty(entry.Manifest)) + type = Type.GetType(entry.Manifest, throwOnError: true); + + var snapshot = _deserialize(type, entry.Snapshot, entry.Manifest, entry.SerializerId); + + return new SelectedSnapshot( + new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), snapshot); } } } diff --git a/src/Akka.Persistence.MongoDb/Snapshot/SnapshotEntry.cs b/src/Akka.Persistence.MongoDb/Snapshot/SnapshotEntry.cs index 421b73e..30e484e 100644 --- a/src/Akka.Persistence.MongoDb/Snapshot/SnapshotEntry.cs +++ b/src/Akka.Persistence.MongoDb/Snapshot/SnapshotEntry.cs @@ -28,5 +28,11 @@ public class SnapshotEntry [BsonElement("Snapshot")] public object Snapshot { get; set; } + + [BsonElement("Manifest")] + public string Manifest { get; set; } + + [BsonElement("SerializerId")] + public int? SerializerId { get; set; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/reference.conf b/src/Akka.Persistence.MongoDb/reference.conf index 81e8b97..07c5f80 100644 --- a/src/Akka.Persistence.MongoDb/reference.conf +++ b/src/Akka.Persistence.MongoDb/reference.conf @@ -18,6 +18,9 @@ # metadata collection metadata-collection = "Metadata" + + # MongoDb type for payload field. Allowed: object, binary, default : object + stored-as = object } } @@ -37,6 +40,9 @@ # MongoDb collection corresponding with persistent snapshot store collection = "SnapshotStore" + + # MongoDb type for payload field. Allowed: object, binary, default : object + stored-as = object } } } \ No newline at end of file From 4c9ead5e44f00a9cdf9b57a3a80d2e19315244f6 Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Thu, 4 Apr 2019 05:04:47 +1100 Subject: [PATCH 2/5] Updated to Akka.NET 1.3.11 (#48) * Updated to Akka.NET 1.3.11 * Fixed up xunit reference to use common.props --- .../Akka.Persistence.MongoDb.Tests.csproj | 2 +- src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj | 2 +- src/common.props | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index 90bddeb..f72317a 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj index d62c621..86593b9 100644 --- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj +++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/common.props b/src/common.props index 041fe55..9a65d85 100644 --- a/src/common.props +++ b/src/common.props @@ -1,4 +1,4 @@ - + Copyright © 2013-2018 Akka.NET Project Akka.NET Contrib @@ -12,7 +12,7 @@ $(NoWarn);CS1591 - 2.3.0 + 2.3.1 15.3.0 \ No newline at end of file From e72b425663d971a211a921b6fe2567e150ac031a Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Fri, 5 Apr 2019 01:38:23 +1100 Subject: [PATCH 3/5] Implemented Persistence Query API (#49) * Progressed #19 Completed AllPersistenceIds and CurrentPersistenceIds Although test ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete seems a bit presumptuous about the order, given docs say "The returned event stream is unordered and you can expect different order for multiple executions of the query." * Progressed #19 Implemented EventsByPersistenceId and tests * Progressed #19 Completed EventsByTag and passed specs * Left out unsubscription code for tagged query Fixed intermittent unit test fails on slow machine * Changed to using BsonTimestamp after looking at the Scala implementation. Fixed some unit test timeouts --- .../Akka.Persistence.MongoDb.Tests.csproj | 2 +- .../JournalTestActor.cs | 47 ++++ ...ongoDbCurrentEventsByPersistenceIdsSpec.cs | 67 +++++ .../MongoDbCurrentEventsByTagSpec.cs | 124 +++++++++ .../MongoDbCurrentPersistenceIdsSpec.cs | 100 +++++++ .../MongoDbEventsByPersistenceIdSpec.cs | 66 +++++ .../MongoDbEventsByTagSpec.cs | 117 ++++++++ .../MongoDbPersistenceIdsSpec.cs | 87 ++++++ .../Akka.Persistence.MongoDb.csproj | 4 +- .../Journal/JournalEntry.cs | 8 + .../Journal/MongoDbJournal.cs | 217 ++++++++++++++- .../Query/AllPersistenceIdsPublisher.cs | 62 +++++ .../Query/DeliveryBuffer.cs | 61 +++++ .../Query/EventByPersistenceIdPublisher.cs | 208 ++++++++++++++ .../Query/EventsByTagPublisher.cs | 200 ++++++++++++++ .../Query/Messages.cs | 257 ++++++++++++++++++ .../Query/MongoDbReadJournal.cs | 196 +++++++++++++ .../Query/MongoDbReadJournalProvider.cs | 32 +++ .../Query/SubscriptionDroppedException.cs | 23 ++ src/Akka.Persistence.MongoDb/reference.conf | 22 ++ 20 files changed, 1886 insertions(+), 14 deletions(-) create mode 100644 src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs create mode 100644 src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/AllPersistenceIdsPublisher.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/DeliveryBuffer.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/Messages.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/MongoDbReadJournal.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/MongoDbReadJournalProvider.cs create mode 100644 src/Akka.Persistence.MongoDb/Query/SubscriptionDroppedException.cs diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index f72317a..c13966c 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs b/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs new file mode 100644 index 0000000..debe52c --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs @@ -0,0 +1,47 @@ +using Akka.Actor; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Akka.Persistence.MongoDb.Tests +{ + public class JournalTestActor : UntypedPersistentActor + { + public static Props Props(string persistenceId) => Actor.Props.Create(() => new JournalTestActor(persistenceId)); + + public sealed class DeleteCommand + { + public DeleteCommand(long toSequenceNr) + { + ToSequenceNr = toSequenceNr; + } + + public long ToSequenceNr { get; } + } + + public JournalTestActor(string persistenceId) + { + PersistenceId = persistenceId; + } + + public override string PersistenceId { get; } + + protected override void OnRecover(object message) + { + } + + protected override void OnCommand(object message) + { + switch (message) { + case DeleteCommand delete: + DeleteMessages(delete.ToSequenceNr); + Sender.Tell($"{delete.ToSequenceNr}-deleted"); + break; + case string cmd: + var sender = Sender; + Persist(cmd, e => sender.Tell($"{e}-done")); + break; + } + } + } +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs new file mode 100644 index 0000000..aa2b358 --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs @@ -0,0 +1,67 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; +using Akka.Streams.Dsl; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByPersistenceIdsSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 10s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + } + + +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs new file mode 100644 index 0000000..bf2586e --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs @@ -0,0 +1,124 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbCurrentEventsByTagSpec : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + + var x = Sys.ActorOf(TestActor.Props("x")); + x.Tell("warm-up"); + ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + event-adapters { + color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"" + } + event-adapter-bindings = { + ""System.String"" = color-tagger + } + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + + //public override void ReadJournal_query_CurrentEventsByTag_should_find_existing_events() + //{ + // var a = Sys.ActorOf(TestActor.Props("a")); + // a.Tell("warm-up"); + // ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); + //} + + + internal class TestActor : UntypedPersistentActor + { + public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId)); + + public sealed class DeleteCommand + { + public DeleteCommand(long toSequenceNr) + { + ToSequenceNr = toSequenceNr; + } + + public long ToSequenceNr { get; } + } + + public TestActor(string persistenceId) + { + PersistenceId = persistenceId; + } + + public override string PersistenceId { get; } + + protected override void OnRecover(object message) + { + } + + protected override void OnCommand(object message) + { + switch (message) { + case DeleteCommand delete: + DeleteMessages(delete.ToSequenceNr); + Sender.Tell($"{delete.ToSequenceNr}-deleted"); + break; + case string cmd: + var sender = Sender; + Persist(cmd, e => sender.Tell($"{e}-done")); + break; + } + } + } + } + + +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs new file mode 100644 index 0000000..51b1d59 --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs @@ -0,0 +1,100 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbCurrentPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentPersistenceIdsSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete() + { + var queries = ReadJournal.AsInstanceOf(); + + Setup("a", 1); + Setup("b", 1); + Setup("c", 1); + + var greenSrc = queries.CurrentPersistenceIds(); + var probe = greenSrc.RunWith(this.SinkProbe(), Materializer); + var firstTwo = probe.Request(2).ExpectNextN(2); + Assert.Empty(firstTwo.Except(new[] { "a", "b", "c" }).ToArray()); + + var last = new[] { "a", "b", "c" }.Except(firstTwo).First(); + Setup("d", 1); + + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + probe.Request(5) + .ExpectNext(last) + .ExpectComplete(); + } + + private IActorRef Setup(string persistenceId, int n) + { + var sw = Stopwatch.StartNew(); + var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId)); + for (int i = 1; i <= n; i++) { + pref.Tell($"{persistenceId}-{i}"); + ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done"); + } + _output.WriteLine(sw.ElapsedMilliseconds.ToString()); + return pref; + } + + } + + +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs new file mode 100644 index 0000000..2763afe --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs @@ -0,0 +1,66 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbEventsByPersistenceIdSpec : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbEventsByPersistenceIdSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + } + + +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs new file mode 100644 index 0000000..6f1899b --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs @@ -0,0 +1,117 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbEventsByTagSpec : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + + var x = Sys.ActorOf(TestActor.Props("x")); + x.Tell("warm-up"); + ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10)); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 10s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + event-adapters { + color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"" + } + event-adapter-bindings = { + ""System.String"" = color-tagger + } + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + + + internal class TestActor : UntypedPersistentActor + { + public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId)); + + public sealed class DeleteCommand + { + public DeleteCommand(long toSequenceNr) + { + ToSequenceNr = toSequenceNr; + } + + public long ToSequenceNr { get; } + } + + public TestActor(string persistenceId) + { + PersistenceId = persistenceId; + } + + public override string PersistenceId { get; } + + protected override void OnRecover(object message) + { + } + + protected override void OnCommand(object message) + { + switch (message) { + case DeleteCommand delete: + DeleteMessages(delete.ToSequenceNr); + Sender.Tell($"{delete.ToSequenceNr}-deleted"); + break; + case string cmd: + var sender = Sender; + Persist(cmd, e => sender.Tell($"{e}-done")); + break; + } + } + } + } + + +} diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs new file mode 100644 index 0000000..980374d --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs @@ -0,0 +1,87 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Persistence.TCK.Journal; +using Xunit; +using Akka.Configuration; +using Akka.Persistence.MongoDb.Query; +using Akka.Persistence.Query; +using Xunit.Abstractions; +using Akka.Util.Internal; +using System; +using Akka.Actor; +using Akka.Streams.TestKit; +using System.Linq; +using System.Diagnostics; + +namespace Akka.Persistence.MongoDb.Tests +{ + [Collection("MongoDbSpec")] + public class MongoDbPersistenceIdsSpec : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbPersistenceIdsSpec", output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""EventJournal"" + } + } + query { + mongodb { + class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb"" + refresh-interval = 1s + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + [Fact] + public void ReadJournal_ConcurrentMessaging_should_work() + { + Enumerable.Range(1, 100).AsParallel().ForEach(_ => { + Setup(Guid.NewGuid().ToString(), 1); + Setup(Guid.NewGuid().ToString(), 1); + }); + } + + private IActorRef Setup(string persistenceId, int n) + { + var sw = Stopwatch.StartNew(); + var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId)); + for (int i = 1; i <= n; i++) { + pref.Tell($"{persistenceId}-{i}"); + ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done"); + } + _output.WriteLine(sw.ElapsedMilliseconds.ToString()); + return pref; + } + + } + + +} diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj index 86593b9..fc66ca5 100644 --- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj +++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj @@ -9,8 +9,10 @@ + + + - $(DefineConstants);RELEASE diff --git a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs index 3f0069e..1cb7b1b 100644 --- a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs +++ b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs @@ -5,7 +5,9 @@ // //----------------------------------------------------------------------- +using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; +using System.Collections.Generic; namespace Akka.Persistence.MongoDb.Journal { @@ -33,6 +35,12 @@ public class JournalEntry [BsonElement("Manifest")] public string Manifest { get; set; } + [BsonElement("Ordering")] + public BsonTimestamp Ordering { get; set; } + + [BsonElement("Tags")] + public ICollection Tags { get; set; } = new HashSet(); + [BsonElement("SerializerId")] public int? SerializerId { get; set; } } diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs index 38465cb..14f902c 100644 --- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs @@ -6,12 +6,17 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; using Akka.Actor; using Akka.Persistence.Journal; +using Akka.Persistence.MongoDb.Query; +using Akka.Streams; +using Akka.Streams.Dsl; +using MongoDB.Bson; using Akka.Serialization; using Akka.Util; using MongoDB.Driver; @@ -29,9 +34,17 @@ public class MongoDbJournal : AsyncWriteJournal private Lazy> _journalCollection; private Lazy> _metadataCollection; - private readonly Func _serialize; + private readonly HashSet _allPersistenceIds = new HashSet(); + private readonly HashSet _allPersistenceIdSubscribers = new HashSet(); + private readonly Dictionary> _tagSubscribers = + new Dictionary>(); + private readonly Dictionary> _persistenceIdSubscribers + = new Dictionary>(); + + private readonly Func _serialize; private readonly Func _deserialize; + public MongoDbJournal() { _settings = MongoDbPersistence.Get(Context.System).JournalSettings; @@ -42,8 +55,8 @@ public MongoDbJournal() case StoredAsType.Binary: _serialize = representation => { - var serializer = serialization.FindSerializerFor(representation.Payload); - return new SerializationResult(serializer.ToBinary(representation.Payload), serializer); + var serializer = serialization.FindSerializerFor(representation); + return new SerializationResult(serializer.ToBinary(representation), serializer); }; _deserialize = (type, serialized, manifest, serializerId) => { @@ -57,7 +70,7 @@ public MongoDbJournal() }; break; default: - _serialize = representation => new SerializationResult(representation.Payload, null); + _serialize = representation => new SerializationResult(representation, null); _deserialize = (type, serialized, manifest, serializerId) => serialized; break; } @@ -86,6 +99,10 @@ protected override void PreStart() .Ascending(entry => entry.PersistenceId) .Descending(entry => entry.SequenceNr)) .Wait(); + + collection.Indexes.CreateOne( + Builders.IndexKeys + .Ascending(entry => entry.Ordering)); } return collection; @@ -109,6 +126,8 @@ protected override void PreStart() public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback) { + NotifyNewPersistenceIdAdded(persistenceId); + // Limit allows only integer var limitValue = max >= int.MaxValue ? int.MaxValue : (int)max; @@ -131,14 +150,55 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per .Limit(limitValue) .ToListAsync(); - collections.ForEach(doc => - { + collections.ForEach(doc => { recoveryCallback(ToPersistenceRepresentation(doc, context.Sender)); }); } + /// + /// Replays all events with given tag withing provided boundaries from current database. + /// + /// TBD + /// TBD + private async Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) + { + // Limit allows only integer + var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max; + var fromSequenceNr = replay.FromOffset; + var toSequenceNr = replay.ToOffset; + var tag = replay.Tag; + + // Do not replay messages if limit equal zero + if (limitValue == 0) return 0; + + var builder = Builders.Filter; + var filter = builder.AnyEq(x => x.Tags, tag); + if (fromSequenceNr > 0) + filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr)); + if (toSequenceNr != long.MaxValue) + filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr)); + + var sort = Builders.Sort.Ascending(x => x.Ordering); + + long maxOrderingId = 0; + await _journalCollection.Value + .Find(filter) + .Sort(sort) + .Limit(limitValue) + .ForEachAsync(entry => { + var persistent = new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, ActorRefs.NoSender, null); + foreach (var adapted in AdaptFromJournal(persistent)) + replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value), ActorRefs.NoSender); + maxOrderingId = entry.Ordering.Value; + }); + + return maxOrderingId; + } + public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { + NotifyNewPersistenceIdAdded(persistenceId); + var builder = Builders.Filter; var filter = builder.Eq(x => x.PersistenceId, persistenceId); @@ -149,25 +209,38 @@ public override async Task ReadHighestSequenceNrAsync(string persistenceId protected override async Task> WriteMessagesAsync(IEnumerable messages) { + var allTags = new HashSet(); var messageList = messages.ToList(); - var writeTasks = messageList.Select(async message => - { + + var writeTasks = messageList.Select(async message => { var persistentMessages = ((IImmutableList)message.Payload).ToArray(); var journalEntries = persistentMessages.Select(ToJournalEntry).ToList(); await _journalCollection.Value.InsertManyAsync(journalEntries); + + NotifyNewPersistenceIdAdded(message.PersistenceId); }); await SetHighSequenceId(messageList); - return await Task> + var result = await Task> .Factory .ContinueWhenAll(writeTasks.ToArray(), tasks => tasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList()); + + if (HasTagSubscribers && allTags.Count != 0) { + foreach (var tag in allTags) { + NotifyTagChange(tag); + } + } + + return result; } protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { + NotifyNewPersistenceIdAdded(persistenceId); + var builder = Builders.Filter; var filter = builder.Eq(x => x.PersistenceId, persistenceId); @@ -179,13 +252,17 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque private JournalEntry ToJournalEntry(IPersistentRepresentation message) { - var serializationResult = _serialize(message); + object payload = message.Payload; + if (message.Payload is Tagged tagged) + payload = tagged.Payload; + + var serializationResult = _serialize(payload); var serializer = serializationResult.Serializer; var hasSerializer = serializer != null; var manifest = ""; - if (hasSerializer && serializer is SerializerWithStringManifest) - manifest = ((SerializerWithStringManifest)serializer).Manifest(message.Payload); + if (hasSerializer && serializer is SerializerWithStringManifest stringManifest) + manifest = stringManifest.Manifest(message.Payload); else if (hasSerializer && serializer.IncludeManifest) manifest = message.GetType().TypeQualifiedName(); else @@ -196,11 +273,13 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message) return new JournalEntry { Id = message.PersistenceId + "_" + message.SequenceNr, + Ordering = new BsonTimestamp(0), // Auto-populates with timestamp IsDeleted = message.IsDeleted, Payload = serializationResult.Payload, PersistenceId = message.PersistenceId, SequenceNr = message.SequenceNr, Manifest = manifest, + Tags = tagged.Tags?.ToList(), SerializerId = serializer?.Identifier }; } @@ -235,5 +314,119 @@ private async Task SetHighSequenceId(IList messages) await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new UpdateOptions() { IsUpsert = true }); } + + protected override bool ReceivePluginInternal(object message) + { + return message.Match() + .With(replay => { + ReplayTaggedMessagesAsync(replay) + .PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e)); + }) + .With(subscribe => { + AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId); + Context.Watch(Sender); + }) + .With(subscribe => { + AddAllPersistenceIdSubscriber(Sender); + Context.Watch(Sender); + }) + .With(subscribe => { + AddTagSubscriber(Sender, subscribe.Tag); + Context.Watch(Sender); + }) + .With(terminated => RemoveSubscriber(terminated.ActorRef)) + .WasHandled; + } + + private void AddAllPersistenceIdSubscriber(IActorRef subscriber) + { + lock (_allPersistenceIdSubscribers) { + _allPersistenceIdSubscribers.Add(subscriber); + } + subscriber.Tell(new CurrentPersistenceIds(GetAllPersistenceIds())); + } + + private void AddTagSubscriber(IActorRef subscriber, string tag) + { + if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) { + subscriptions = new HashSet(); + _tagSubscribers.Add(tag, subscriptions); + } + + subscriptions.Add(subscriber); + } + + private IEnumerable GetAllPersistenceIds() + { + return _journalCollection.Value.AsQueryable() + .Select(je => je.PersistenceId) + .Distinct() + .ToList(); + } + + private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId) + { + if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) { + subscriptions = new HashSet(); + _persistenceIdSubscribers.Add(persistenceId, subscriptions); + } + + subscriptions.Add(subscriber); + } + + private void RemoveSubscriber(IActorRef subscriber) + { + var pidSubscriptions = _persistenceIdSubscribers.Values.Where(x => x.Contains(subscriber)); + foreach (var subscription in pidSubscriptions) + subscription.Remove(subscriber); + + var tagSubscriptions = _tagSubscribers.Values.Where(x => x.Contains(subscriber)); + foreach (var subscription in tagSubscriptions) + subscription.Remove(subscriber); + + _allPersistenceIdSubscribers.Remove(subscriber); + } + + protected bool HasAllPersistenceIdSubscribers => _allPersistenceIdSubscribers.Count != 0; + protected bool HasTagSubscribers => _tagSubscribers.Count != 0; + protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0; + + private void NotifyNewPersistenceIdAdded(string persistenceId) + { + var isNew = TryAddPersistenceId(persistenceId); + if (isNew && HasAllPersistenceIdSubscribers) { + var added = new PersistenceIdAdded(persistenceId); + foreach (var subscriber in _allPersistenceIdSubscribers) + subscriber.Tell(added); + } + } + + private bool TryAddPersistenceId(string persistenceId) + { + lock (_allPersistenceIds) { + return _allPersistenceIds.Add(persistenceId); + } + } + + private void NotifyPersistenceIdChange(string persistenceId) + { + if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers)) { + var changed = new EventAppended(persistenceId); + foreach (var subscriber in subscribers) + subscriber.Tell(changed); + } + } + + private void NotifyTagChange(string tag) + { + if (_tagSubscribers.TryGetValue(tag, out var subscribers)) { + var changed = new TaggedEventAppended(tag); + foreach (var subscriber in subscribers) + subscriber.Tell(changed); + } + } + } + + } diff --git a/src/Akka.Persistence.MongoDb/Query/AllPersistenceIdsPublisher.cs b/src/Akka.Persistence.MongoDb/Query/AllPersistenceIdsPublisher.cs new file mode 100644 index 0000000..2c004a4 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/AllPersistenceIdsPublisher.cs @@ -0,0 +1,62 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Streams.Actors; + +namespace Akka.Persistence.MongoDb.Query +{ + internal sealed class AllPersistenceIdsPublisher : ActorPublisher + { + public static Props Props(bool liveQuery, string writeJournalPluginId) + { + return Actor.Props.Create(() => new AllPersistenceIdsPublisher(liveQuery, writeJournalPluginId)); + } + + private readonly bool _liveQuery; + private readonly IActorRef _journalRef; + + private readonly DeliveryBuffer _buffer; + + public AllPersistenceIdsPublisher(bool liveQuery, string writeJournalPluginId) + { + _liveQuery = liveQuery; + _buffer = new DeliveryBuffer(OnNext); + _journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + protected override bool Receive(object message) => message.Match() + .With(_ => { + _journalRef.Tell(SubscribeAllPersistenceIds.Instance); + Become(Active); + }) + .With(_ => Context.Stop(Self)) + .WasHandled; + + private bool Active(object message) => message.Match() + .With(current => { + _buffer.AddRange(current.AllPersistenceIds); + _buffer.DeliverBuffer(TotalDemand); + + if (!_liveQuery && _buffer.IsEmpty) + OnCompleteThenStop(); + }) + .With(added => { + if (_liveQuery) { + _buffer.Add(added.PersistenceId); + _buffer.DeliverBuffer(TotalDemand); + } + }) + .With(_ => { + _buffer.DeliverBuffer(TotalDemand); + if (!_liveQuery && _buffer.IsEmpty) + OnCompleteThenStop(); + }) + .With(_ => Context.Stop(Self)) + .WasHandled; + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/DeliveryBuffer.cs b/src/Akka.Persistence.MongoDb/Query/DeliveryBuffer.cs new file mode 100644 index 0000000..8a53436 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/DeliveryBuffer.cs @@ -0,0 +1,61 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Akka.Persistence.MongoDb.Query +{ + internal class DeliveryBuffer + { + public ImmutableArray Buffer { get; private set; } = ImmutableArray.Empty; + public bool IsEmpty => Buffer.IsEmpty; + public int Length => Buffer.Length; + + private readonly Action _onNext; + + public DeliveryBuffer(Action onNext) + { + _onNext = onNext; + } + + public void Add(T element) + { + Buffer = Buffer.Add(element); + } + public void AddRange(IEnumerable elements) + { + Buffer = Buffer.AddRange(elements); + } + + public void DeliverBuffer(long demand) + { + if (!Buffer.IsEmpty && demand > 0) { + var totalDemand = Math.Min((int)demand, Buffer.Length); + if (Buffer.Length == 1) { + // optimize for this common case + _onNext(Buffer[0]); + Buffer = ImmutableArray.Empty; + } + else if (demand <= int.MaxValue) { + for (var i = 0; i < totalDemand; i++) + _onNext(Buffer[i]); + + Buffer = Buffer.RemoveRange(0, totalDemand); + } + else { + foreach (var element in Buffer) + _onNext(element); + + Buffer = ImmutableArray.Empty; + } + } + } + + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs new file mode 100644 index 0000000..c463fff --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/EventByPersistenceIdPublisher.cs @@ -0,0 +1,208 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Event; +using Akka.Persistence.Query; +using Akka.Streams.Actors; + +namespace Akka.Persistence.MongoDb.Query +{ + internal static class EventsByPersistenceIdPublisher + { + [Serializable] + public sealed class Continue + { + public static readonly Continue Instance = new Continue(); + + private Continue() + { + } + } + + public static Props Props(string persistenceId, long fromSequenceNr, long toSequenceNr, TimeSpan? refreshDuration, int maxBufferSize, string writeJournalPluginId) + { + return refreshDuration.HasValue + ? Actor.Props.Create(() => new LiveEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId, refreshDuration.Value)) + : Actor.Props.Create(() => new CurrentEventsByPersistenceIdPublisher(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId)); + } + } + + internal abstract class AbstractEventsByPersistenceIdPublisher : ActorPublisher + { + private ILoggingAdapter _log; + + protected DeliveryBuffer Buffer; + protected readonly IActorRef JournalRef; + protected long CurrentSequenceNr; + + protected AbstractEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId) + { + PersistenceId = persistenceId; + CurrentSequenceNr = FromSequenceNr = fromSequenceNr; + ToSequenceNr = toSequenceNr; + MaxBufferSize = maxBufferSize; + WriteJournalPluginId = writeJournalPluginId; + Buffer = new DeliveryBuffer(OnNext); + + JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected string PersistenceId { get; } + protected long FromSequenceNr { get; } + protected long ToSequenceNr { get; set; } + protected int MaxBufferSize { get; } + protected string WriteJournalPluginId { get; } + + protected bool IsTimeForReplay => (Buffer.IsEmpty || Buffer.Length <= MaxBufferSize / 2) && (CurrentSequenceNr <= ToSequenceNr); + + protected abstract void ReceiveInitialRequest(); + protected abstract void ReceiveIdleRequest(); + protected abstract void ReceiveRecoverySuccess(long highestSequenceNr); + + protected override bool Receive(object message) + { + return Init(message); + } + + protected bool Init(object message) + { + return message.Match() + .With(() => { }) + .With(_ => ReceiveInitialRequest()) + .With(_ => Context.Stop(Self)) + .WasHandled; + } + + protected bool Idle(object message) + { + return message.Match() + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(_ => ReceiveIdleRequest()) + .With(_ => Context.Stop(Self)) + .WasHandled; + } + + protected void Replay() + { + var limit = MaxBufferSize - Buffer.Length; + Log.Debug("request replay for persistenceId [{0}] from [{1}] to [{2}] limit [{3}]", PersistenceId, CurrentSequenceNr, ToSequenceNr, limit); + JournalRef.Tell(new ReplayMessages(CurrentSequenceNr, ToSequenceNr, limit, PersistenceId, Self)); + Context.Become(Replaying(limit)); + } + + protected Receive Replaying(int limit) + { + return message => message.Match() + .With(replayed => { + var seqNr = replayed.Persistent.SequenceNr; + Buffer.Add(new EventEnvelope( + offset: new Sequence(seqNr), + persistenceId: PersistenceId, + sequenceNr: seqNr, + @event: replayed.Persistent.Payload)); + CurrentSequenceNr = seqNr + 1; + Buffer.DeliverBuffer(TotalDemand); + }) + .With(success => { + Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, CurrentSequenceNr); + ReceiveRecoverySuccess(success.HighestSequenceNr); + }) + .With(failure => { + Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + }) + .With(_ => Buffer.DeliverBuffer(TotalDemand)) + .With(() => { }) // skip during replay + .With(() => { }) // skip during replay + .With(_ => Context.Stop(Self)) + .WasHandled; + } + } + + internal sealed class LiveEventsByPersistenceIdPublisher : AbstractEventsByPersistenceIdPublisher + { + private readonly ICancelable _tickCancelable; + + public LiveEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId, TimeSpan refreshInterval) + : base(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId) + { + _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByPersistenceIdPublisher.Continue.Instance, Self); + } + + protected override void PostStop() + { + _tickCancelable.Cancel(); + base.PostStop(); + } + + protected override void ReceiveInitialRequest() + { + JournalRef.Tell(new SubscribePersistenceId(PersistenceId)); + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + + Context.Become(Idle); + } + } + + internal sealed class CurrentEventsByPersistenceIdPublisher : AbstractEventsByPersistenceIdPublisher + { + public CurrentEventsByPersistenceIdPublisher(string persistenceId, long fromSequenceNr, long toSequenceNr, int maxBufferSize, string writeJournalPluginId) + : base(persistenceId, fromSequenceNr, toSequenceNr, maxBufferSize, writeJournalPluginId) + { + } + + protected override void ReceiveInitialRequest() + { + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentSequenceNr > ToSequenceNr) + OnCompleteThenStop(); + else + Self.Tell(EventsByPersistenceIdPublisher.Continue.Instance); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (highestSequenceNr < ToSequenceNr) + ToSequenceNr = highestSequenceNr; + if (Buffer.IsEmpty && (CurrentSequenceNr > ToSequenceNr || CurrentSequenceNr == FromSequenceNr)) + OnCompleteThenStop(); + else + Self.Tell(EventsByPersistenceIdPublisher.Continue.Instance); + + Context.Become(Idle); + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs new file mode 100644 index 0000000..4607e84 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs @@ -0,0 +1,200 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Event; +using Akka.Persistence.Query; +using Akka.Streams.Actors; + +namespace Akka.Persistence.MongoDb.Query +{ + internal static class EventsByTagPublisher + { + [Serializable] + public sealed class Continue + { + public static readonly Continue Instance = new Continue(); + + private Continue() + { + } + } + + public static Props Props(string tag, long fromOffset, long toOffset, TimeSpan? refreshInterval, int maxBufferSize, string writeJournalPluginId) + { + return refreshInterval.HasValue + ? Actor.Props.Create(() => new LiveEventsByTagPublisher(tag, fromOffset, toOffset, refreshInterval.Value, maxBufferSize, writeJournalPluginId)) + : Actor.Props.Create(() => new CurrentEventsByTagPublisher(tag, fromOffset, toOffset, maxBufferSize, writeJournalPluginId)); + } + } + + internal abstract class AbstractEventsByTagPublisher : ActorPublisher + { + private ILoggingAdapter _log; + + protected readonly DeliveryBuffer Buffer; + protected readonly IActorRef JournalRef; + protected long CurrentOffset; + protected AbstractEventsByTagPublisher(string tag, long fromOffset, int maxBufferSize, string writeJournalPluginId) + { + Tag = tag; + CurrentOffset = FromOffset = fromOffset; + MaxBufferSize = maxBufferSize; + WriteJournalPluginId = writeJournalPluginId; + Buffer = new DeliveryBuffer(OnNext); + JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId); + } + + protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger()); + protected string Tag { get; } + protected long FromOffset { get; } + protected abstract long ToOffset { get; } + protected int MaxBufferSize { get; } + protected string WriteJournalPluginId { get; } + + protected bool IsTimeForReplay => (Buffer.IsEmpty || Buffer.Length <= MaxBufferSize / 2) && (CurrentOffset <= ToOffset); + + protected abstract void ReceiveInitialRequest(); + protected abstract void ReceiveIdleRequest(); + protected abstract void ReceiveRecoverySuccess(long highestSequenceNr); + + protected override bool Receive(object message) => message.Match() + .With(_ => ReceiveInitialRequest()) + .With(() => { }) + .With(_ => Context.Stop(Self)) + .WasHandled; + + protected bool Idle(object message) => message.Match() + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(() => { + if (IsTimeForReplay) Replay(); + }) + .With(ReceiveIdleRequest) + .With(() => Context.Stop(Self)) + .WasHandled; + + protected void Replay() + { + var limit = MaxBufferSize - Buffer.Length; + Log.Debug("request replay for tag [{0}] from [{1}] to [{2}] limit [{3}]", Tag, CurrentOffset, ToOffset, limit); + JournalRef.Tell(new ReplayTaggedMessages(CurrentOffset, ToOffset, limit, Tag, Self)); + Context.Become(Replaying(limit)); + } + + protected Receive Replaying(int limit) + { + return message => message.Match() + .With(replayed => { + Buffer.Add(new EventEnvelope( + offset: new Sequence(replayed.Offset), + persistenceId: replayed.Persistent.PersistenceId, + sequenceNr: replayed.Persistent.SequenceNr, + @event: replayed.Persistent.Payload)); + + CurrentOffset = replayed.Offset; + Buffer.DeliverBuffer(TotalDemand); + }) + .With(success => { + Log.Debug("replay completed for tag [{0}], currOffset [{1}]", Tag, CurrentOffset); + ReceiveRecoverySuccess(success.HighestSequenceNr); + }) + .With(failure => { + Log.Debug("replay failed for tag [{0}], due to [{1}]", Tag, failure.Cause.Message); + Buffer.DeliverBuffer(TotalDemand); + OnErrorThenStop(failure.Cause); + }) + .With(_ => Buffer.DeliverBuffer(TotalDemand)) + .With(() => { }) + .With(() => { }) + .With(() => Context.Stop(Self)) + .WasHandled; + } + } + + internal sealed class LiveEventsByTagPublisher : AbstractEventsByTagPublisher + { + private readonly ICancelable _tickCancelable; + public LiveEventsByTagPublisher(string tag, long fromOffset, long toOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId) + : base(tag, fromOffset, maxBufferSize, writeJournalPluginId) + { + ToOffset = toOffset; + _tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self); + } + + protected override long ToOffset { get; } + + protected override void PostStop() + { + _tickCancelable.Cancel(); + base.PostStop(); + } + + protected override void ReceiveInitialRequest() + { + JournalRef.Tell(new SubscribeTag(Tag)); + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + + Context.Become(Idle); + } + } + + internal sealed class CurrentEventsByTagPublisher : AbstractEventsByTagPublisher + { + public CurrentEventsByTagPublisher(string tag, long fromOffset, long toOffset, int maxBufferSize, string writeJournalPluginId) + : base(tag, fromOffset, maxBufferSize, writeJournalPluginId) + { + _toOffset = toOffset; + } + + private long _toOffset; + protected override long ToOffset => _toOffset; + protected override void ReceiveInitialRequest() + { + Replay(); + } + + protected override void ReceiveIdleRequest() + { + Buffer.DeliverBuffer(TotalDemand); + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + else + Self.Tell(EventsByTagPublisher.Continue.Instance); + } + + protected override void ReceiveRecoverySuccess(long highestSequenceNr) + { + Buffer.DeliverBuffer(TotalDemand); + if (highestSequenceNr < ToOffset) + _toOffset = highestSequenceNr; + + if (Buffer.IsEmpty && CurrentOffset > ToOffset) + OnCompleteThenStop(); + else + Self.Tell(EventsByTagPublisher.Continue.Instance); + + Context.Become(Idle); + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/Messages.cs b/src/Akka.Persistence.MongoDb/Query/Messages.cs new file mode 100644 index 0000000..db9ddfe --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/Messages.cs @@ -0,0 +1,257 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Event; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Akka.Persistence.MongoDb.Query +{ + /// + /// TBD + /// + public interface ISubscriptionCommand { } + + /// + /// Subscribe the `sender` to changes (appended events) for a specific `persistenceId`. + /// Used by query-side. The journal will send messages to + /// the subscriber when has been called. + /// + public sealed class SubscribePersistenceId : ISubscriptionCommand + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public SubscribePersistenceId(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + /// + /// TBD + /// + public sealed class EventAppended : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public EventAppended(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + /// + /// Subscribe the `sender` to current and new persistenceIds. + /// Used by query-side. The journal will send one to the + /// subscriber followed by messages when new persistenceIds + /// are created. + /// + public sealed class SubscribeAllPersistenceIds : ISubscriptionCommand + { + /// + /// TBD + /// + public static readonly SubscribeAllPersistenceIds Instance = new SubscribeAllPersistenceIds(); + private SubscribeAllPersistenceIds() { } + } + + /// + /// TBD + /// + public sealed class CurrentPersistenceIds : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly IEnumerable AllPersistenceIds; + + /// + /// TBD + /// + /// TBD + public CurrentPersistenceIds(IEnumerable allPersistenceIds) + { + AllPersistenceIds = allPersistenceIds.ToImmutableHashSet(); + } + } + + /// + /// TBD + /// + + public sealed class PersistenceIdAdded : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string PersistenceId; + + /// + /// TBD + /// + /// TBD + public PersistenceIdAdded(string persistenceId) + { + PersistenceId = persistenceId; + } + } + + /// + /// Subscribe the `sender` to changes (appended events) for a specific `tag`. + /// Used by query-side. The journal will send messages to + /// the subscriber when `asyncWriteMessages` has been called. + /// Events are tagged by wrapping in + /// via an . + /// + public sealed class SubscribeTag : ISubscriptionCommand + { + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + /// TBD + public SubscribeTag(string tag) + { + Tag = tag; + } + } + + /// + /// TBD + /// + public sealed class TaggedEventAppended : IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly string Tag; + + /// + /// TBD + /// + /// TBD + public TaggedEventAppended(string tag) + { + Tag = tag; + } + } + + /// + /// TBD + /// + public sealed class ReplayTaggedMessages : IJournalRequest + { + /// + /// TBD + /// + public readonly long FromOffset; + /// + /// TBD + /// + public readonly long ToOffset; + /// + /// TBD + /// + public readonly long Max; + /// + /// TBD + /// + public readonly string Tag; + /// + /// TBD + /// + public readonly IActorRef ReplyTo; + + /// + /// Initializes a new instance of the class. + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// + /// This exception is thrown for a number of reasons. These include the following: + ///
    + ///
  • The specified is less than zero.
  • + ///
  • The specified is less than or equal to zero.
  • + ///
  • The specified is less than or equal to zero.
  • + ///
+ ///
+ /// + /// This exception is thrown when the specified is null or empty. + /// + public ReplayTaggedMessages(long fromOffset, long toOffset, long max, string tag, IActorRef replyTo) + { + if (fromOffset < 0) throw new ArgumentException("From offset may not be a negative number", nameof(fromOffset)); + if (toOffset <= 0) throw new ArgumentException("To offset must be a positive number", nameof(toOffset)); + if (max <= 0) throw new ArgumentException("Maximum number of replayed messages must be a positive number", nameof(max)); + if (string.IsNullOrEmpty(tag)) throw new ArgumentNullException(nameof(tag), "Replay tagged messages require a tag value to be provided"); + + FromOffset = fromOffset; + ToOffset = toOffset; + Max = max; + Tag = tag; + ReplyTo = replyTo; + } + } + + /// + /// TBD + /// + public sealed class ReplayedTaggedMessage : INoSerializationVerificationNeeded, IDeadLetterSuppression + { + /// + /// TBD + /// + public readonly IPersistentRepresentation Persistent; + /// + /// TBD + /// + public readonly string Tag; + /// + /// TBD + /// + public readonly long Offset; + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + public ReplayedTaggedMessage(IPersistentRepresentation persistent, string tag, long offset) + { + Persistent = persistent; + Tag = tag; + Offset = offset; + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournal.cs b/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournal.cs new file mode 100644 index 0000000..c436556 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournal.cs @@ -0,0 +1,196 @@ +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Journal; +using Akka.Persistence.Query; +using Akka.Streams.Dsl; +using MongoDB.Driver; + +namespace Akka.Persistence.MongoDb.Query +{ + public class MongoDbReadJournal : IReadJournal, + IPersistenceIdsQuery, + ICurrentPersistenceIdsQuery, + IEventsByPersistenceIdQuery, + ICurrentEventsByPersistenceIdQuery, + IEventsByTagQuery, + ICurrentEventsByTagQuery + { + /// + /// HOCON identifier + /// + public const string Identifier = "akka.persistence.query.mongodb"; + + private readonly TimeSpan _refreshInterval; + private readonly string _writeJournalPluginId; + private readonly int _maxBufferSize; + + /// + public MongoDbReadJournal(ExtendedActorSystem system, Config config) + { + _refreshInterval = config.GetTimeSpan("refresh-interval"); + _writeJournalPluginId = config.GetString("write-plugin"); + _maxBufferSize = config.GetInt("max-buffer-size"); + } + + /// + /// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores. + /// + /// + public static Config DefaultConfiguration() + { + return ConfigurationFactory.FromResource( + "Akka.Persistence.MongoDb.reference.conf"); + } + + /// + /// + /// is used for retrieving all `persistenceIds` of all + /// persistent actors. + /// + /// The returned event stream is unordered and you can expect different order for multiple + /// executions of the query. + /// + /// The stream is not completed when it reaches the end of the currently used `persistenceIds`, + /// but it continues to push new `persistenceIds` when new persistent actors are created. + /// Corresponding query that is completed when it reaches the end of the currently + /// currently used `persistenceIds` is provided by . + /// + /// The SQL write journal is notifying the query side as soon as new `persistenceIds` are + /// created and there is no periodic polling or batching involved in this query. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + /// + public Source PersistenceIds() => + Source.ActorPublisher(AllPersistenceIdsPublisher.Props(true, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("AllPersistenceIds") as Source; + + /// + /// Same type of query as but the stream + /// is completed immediately when it reaches the end of the "result set". Persistent + /// actors that are created after the query is completed are not included in the stream. + /// + public Source CurrentPersistenceIds() => + Source.ActorPublisher(AllPersistenceIdsPublisher.Props(false, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("CurrentPersistenceIds") as Source; + + /// + /// is used for retrieving events for a specific + /// identified by . + /// + /// You can retrieve a subset of all events by specifying and + /// or use `0L` and respectively to retrieve all events. Note that + /// the corresponding sequence number of each event is provided in the + /// , which makes it possible to resume the + /// stream at a later point from a given sequence number. + /// + /// The returned event stream is ordered by sequence number, i.e. the same order as the + /// persisted the events. The same prefix of stream elements (in same order) + /// are returned for multiple executions of the query, except for when events have been deleted. + /// + /// The stream is not completed when it reaches the end of the currently stored events, + /// but it continues to push new events when new events are persisted. + /// Corresponding query that is completed when it reaches the end of the currently + /// stored events is provided by . + /// + /// The SQLite write journal is notifying the query side as soon as events are persisted, but for + /// efficiency reasons the query side retrieves the events in batches that sometimes can + /// be delayed up to the configured `refresh-interval`. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + public Source EventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) => + Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, _refreshInterval, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("EventsByPersistenceId-" + persistenceId) as Source; + + /// + /// Same type of query as but the event stream + /// is completed immediately when it reaches the end of the "result set". Events that are + /// stored after the query is completed are not included in the event stream. + /// + public Source CurrentEventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) => + Source.ActorPublisher(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, null, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named("CurrentEventsByPersistenceId-" + persistenceId) as Source; + + /// + /// is used for retrieving events that were marked with + /// a given tag, e.g. all events of an Aggregate Root type. + /// + /// To tag events you create an that wraps the events + /// in a with the given `tags`. + /// + /// You can use to retrieve all events with a given tag or retrieve a subset of all + /// events by specifying a . The `offset` corresponds to an ordered sequence number for + /// the specific tag. Note that the corresponding offset of each event is provided in the + /// , which makes it possible to resume the + /// stream at a later point from a given offset. + /// + /// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included + /// in the returned stream.This means that you can use the offset that is returned in + /// as the `offset` parameter in a subsequent query. + /// + /// In addition to the the also provides `persistenceId` and `sequenceNr` + /// for each event. The `sequenceNr` is the sequence number for the persistent actor with the + /// `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique + /// identifier for the event. + /// + /// The returned event stream is ordered by the offset (tag sequence number), which corresponds + /// to the same order as the write journal stored the events. The same stream elements (in same order) + /// are returned for multiple executions of the query. Deleted events are not deleted from the + /// tagged event stream. + /// + /// The stream is not completed when it reaches the end of the currently stored events, + /// but it continues to push new events when new events are persisted. + /// Corresponding query that is completed when it reaches the end of the currently + /// stored events is provided by . + /// + /// The SQL write journal is notifying the query side as soon as tagged events are persisted, but for + /// efficiency reasons the query side retrieves the events in batches that sometimes can + /// be delayed up to the configured `refresh-interval`. + /// + /// The stream is completed with failure if there is a failure in executing the query in the + /// backend journal. + /// + public Source EventsByTag(string tag, Offset offset = null) + { + offset = offset ?? new Sequence(0L); + switch (offset) { + case Sequence seq: + return Source.ActorPublisher(EventsByTagPublisher.Props(tag, seq.Value, long.MaxValue, _refreshInterval, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named($"EventsByTag-{tag}"); + case NoOffset _: + return EventsByTag(tag, new Sequence(0L)); + default: + throw new ArgumentException($"SqlReadJournal does not support {offset.GetType().Name} offsets"); + } + } + + /// + /// Same type of query as but the event stream + /// is completed immediately when it reaches the end of the "result set". Events that are + /// stored after the query is completed are not included in the event stream. + /// + public Source CurrentEventsByTag(string tag, Offset offset = null) + { + offset = offset ?? new Sequence(0L); + switch (offset) { + case Sequence seq: + return Source.ActorPublisher(EventsByTagPublisher.Props(tag, seq.Value, long.MaxValue, null, _maxBufferSize, _writeJournalPluginId)) + .MapMaterializedValue(_ => NotUsed.Instance) + .Named($"CurrentEventsByTag-{tag}"); + case NoOffset _: + return CurrentEventsByTag(tag, new Sequence(0L)); + default: + throw new ArgumentException($"SqlReadJournal does not support {offset.GetType().Name} offsets"); + } + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournalProvider.cs b/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournalProvider.cs new file mode 100644 index 0000000..82fd060 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/MongoDbReadJournalProvider.cs @@ -0,0 +1,32 @@ +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Query; + +namespace Akka.Persistence.MongoDb.Query +{ + public class MongoDbReadJournalProvider : IReadJournalProvider + { + private readonly ExtendedActorSystem _system; + private readonly Config _config; + + /// + /// + /// + /// instance of actor system at which read journal should be started + /// + public MongoDbReadJournalProvider(ExtendedActorSystem system, Config config) + { + _system = system; + _config = config; + } + + /// + /// Returns instance of EventStoreReadJournal + /// + /// + public IReadJournal GetReadJournal() + { + return new MongoDbReadJournal(_system, _config); + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Query/SubscriptionDroppedException.cs b/src/Akka.Persistence.MongoDb/Query/SubscriptionDroppedException.cs new file mode 100644 index 0000000..f2932f7 --- /dev/null +++ b/src/Akka.Persistence.MongoDb/Query/SubscriptionDroppedException.cs @@ -0,0 +1,23 @@ +using Akka.Event; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Akka.Persistence.MongoDb.Query +{ + public class SubscriptionDroppedException : Exception, IDeadLetterSuppression + { + + public SubscriptionDroppedException() : this("Unknown error", null) + { + + } + + public SubscriptionDroppedException(string message, Exception inner) : base(message, inner) + { + + } + } +} diff --git a/src/Akka.Persistence.MongoDb/reference.conf b/src/Akka.Persistence.MongoDb/reference.conf index 07c5f80..f0e3a68 100644 --- a/src/Akka.Persistence.MongoDb/reference.conf +++ b/src/Akka.Persistence.MongoDb/reference.conf @@ -45,4 +45,26 @@ stored-as = object } } + + query { + mongodb { + # Implementation class of the EventStore ReadJournalProvider + class = "Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb" + + # Absolute path to the write journal plugin configuration entry that this + # query journal will connect to. + # If undefined (or "") it will connect to the default journal as specified by the + # akka.persistence.journal.plugin property. + write-plugin = "" + + # The SQL write journal is notifying the query side as soon as things + # are persisted, but for efficiency reasons the query side retrieves the events + # in batches that sometimes can be delayed up to the configured `refresh-interval`. + refresh-interval = 3s + + # How many events to fetch in one query (replay) and keep buffered until they + # are delivered downstreams. + max-buffer-size = 500 + } + } } \ No newline at end of file From 4e73bce7ff02994ba965797c335616e7d6a4bd9d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 4 Apr 2019 14:17:39 -0500 Subject: [PATCH 4/5] upgraded to Akka.NET v1.3.12 (#54) --- .../Akka.Persistence.MongoDb.Tests.csproj | 2 +- src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj | 4 +--- src/common.props | 1 + 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index c13966c..23851f5 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj index fc66ca5..6ebc6f5 100644 --- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj +++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj @@ -9,10 +9,8 @@ - - + - $(DefineConstants);RELEASE diff --git a/src/common.props b/src/common.props index 9a65d85..1e0ada3 100644 --- a/src/common.props +++ b/src/common.props @@ -14,5 +14,6 @@ 2.3.1 15.3.0 + 1.3.12
\ No newline at end of file From ece60c1ec97d5ad444cc2ef05247e06e3bbbda07 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 5 Apr 2019 17:49:57 -0500 Subject: [PATCH 5/5] added v1.3.12 release notes (#55) --- RELEASE_NOTES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 198733b..20652a2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,9 @@ +#### 1.3.12 April 05 2019 #### +Support for Akka.Persistence 1.3.12. +Added support for Akka.Persistence.Query. +Upgraded to MongoDb v2.7.0 driver (2.8.0 doesn't support .NET 4.5) +Added support for configurable, binary serialization via Akka.NET. + #### 1.3.5 March 23 2018 #### Support for Akka.Persistence 1.3.5.