diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 198733b..20652a2 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,9 @@
+#### 1.3.12 April 05 2019 ####
+Support for Akka.Persistence 1.3.12.
+Added support for Akka.Persistence.Query.
+Upgraded to MongoDb v2.7.0 driver (2.8.0 doesn't support .NET 4.5)
+Added support for configurable, binary serialization via Akka.NET.
+
#### 1.3.5 March 23 2018 ####
Support for Akka.Persistence 1.3.5.
diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
index 90bddeb..23851f5 100644
--- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
+++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
@@ -11,9 +11,9 @@
-
+
-
+
diff --git a/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs b/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs
new file mode 100644
index 0000000..debe52c
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs
@@ -0,0 +1,47 @@
+using Akka.Actor;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ public class JournalTestActor : UntypedPersistentActor
+ {
+ public static Props Props(string persistenceId) => Actor.Props.Create(() => new JournalTestActor(persistenceId));
+
+ public sealed class DeleteCommand
+ {
+ public DeleteCommand(long toSequenceNr)
+ {
+ ToSequenceNr = toSequenceNr;
+ }
+
+ public long ToSequenceNr { get; }
+ }
+
+ public JournalTestActor(string persistenceId)
+ {
+ PersistenceId = persistenceId;
+ }
+
+ public override string PersistenceId { get; }
+
+ protected override void OnRecover(object message)
+ {
+ }
+
+ protected override void OnCommand(object message)
+ {
+ switch (message) {
+ case DeleteCommand delete:
+ DeleteMessages(delete.ToSequenceNr);
+ Sender.Tell($"{delete.ToSequenceNr}-deleted");
+ break;
+ case string cmd:
+ var sender = Sender;
+ Persist(cmd, e => sender.Tell($"{e}-done"));
+ break;
+ }
+ }
+ }
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs
new file mode 100644
index 0000000..aa2b358
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByPersistenceIdsSpec.cs
@@ -0,0 +1,67 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+using Akka.Streams.Dsl;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByPersistenceIdsSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 10s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs
new file mode 100644
index 0000000..bf2586e
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs
@@ -0,0 +1,124 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbCurrentEventsByTagSpec : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+
+ var x = Sys.ActorOf(TestActor.Props("x"));
+ x.Tell("warm-up");
+ ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ event-adapters {
+ color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
+ }
+ event-adapter-bindings = {
+ ""System.String"" = color-tagger
+ }
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+
+ //public override void ReadJournal_query_CurrentEventsByTag_should_find_existing_events()
+ //{
+ // var a = Sys.ActorOf(TestActor.Props("a"));
+ // a.Tell("warm-up");
+ // ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
+ //}
+
+
+ internal class TestActor : UntypedPersistentActor
+ {
+ public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId));
+
+ public sealed class DeleteCommand
+ {
+ public DeleteCommand(long toSequenceNr)
+ {
+ ToSequenceNr = toSequenceNr;
+ }
+
+ public long ToSequenceNr { get; }
+ }
+
+ public TestActor(string persistenceId)
+ {
+ PersistenceId = persistenceId;
+ }
+
+ public override string PersistenceId { get; }
+
+ protected override void OnRecover(object message)
+ {
+ }
+
+ protected override void OnCommand(object message)
+ {
+ switch (message) {
+ case DeleteCommand delete:
+ DeleteMessages(delete.ToSequenceNr);
+ Sender.Tell($"{delete.ToSequenceNr}-deleted");
+ break;
+ case string cmd:
+ var sender = Sender;
+ Persist(cmd, e => sender.Tell($"{e}-done"));
+ break;
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs
new file mode 100644
index 0000000..51b1d59
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs
@@ -0,0 +1,100 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbCurrentPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentPersistenceIdsSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+ public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete()
+ {
+ var queries = ReadJournal.AsInstanceOf();
+
+ Setup("a", 1);
+ Setup("b", 1);
+ Setup("c", 1);
+
+ var greenSrc = queries.CurrentPersistenceIds();
+ var probe = greenSrc.RunWith(this.SinkProbe(), Materializer);
+ var firstTwo = probe.Request(2).ExpectNextN(2);
+ Assert.Empty(firstTwo.Except(new[] { "a", "b", "c" }).ToArray());
+
+ var last = new[] { "a", "b", "c" }.Except(firstTwo).First();
+ Setup("d", 1);
+
+ probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
+ probe.Request(5)
+ .ExpectNext(last)
+ .ExpectComplete();
+ }
+
+ private IActorRef Setup(string persistenceId, int n)
+ {
+ var sw = Stopwatch.StartNew();
+ var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId));
+ for (int i = 1; i <= n; i++) {
+ pref.Tell($"{persistenceId}-{i}");
+ ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done");
+ }
+ _output.WriteLine(sw.ElapsedMilliseconds.ToString());
+ return pref;
+ }
+
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs
new file mode 100644
index 0000000..2763afe
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByPersistenceIdSpec.cs
@@ -0,0 +1,66 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbEventsByPersistenceIdSpec : Akka.Persistence.TCK.Query.EventsByPersistenceIdSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbEventsByPersistenceIdSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbEventsByPersistenceIdSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs
new file mode 100644
index 0000000..6f1899b
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs
@@ -0,0 +1,117 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbEventsByTagSpec : Akka.Persistence.TCK.Query.EventsByTagSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+
+ var x = Sys.ActorOf(TestActor.Props("x"));
+ x.Tell("warm-up");
+ ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 10s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ event-adapters {
+ color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
+ }
+ event-adapter-bindings = {
+ ""System.String"" = color-tagger
+ }
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+
+
+ internal class TestActor : UntypedPersistentActor
+ {
+ public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId));
+
+ public sealed class DeleteCommand
+ {
+ public DeleteCommand(long toSequenceNr)
+ {
+ ToSequenceNr = toSequenceNr;
+ }
+
+ public long ToSequenceNr { get; }
+ }
+
+ public TestActor(string persistenceId)
+ {
+ PersistenceId = persistenceId;
+ }
+
+ public override string PersistenceId { get; }
+
+ protected override void OnRecover(object message)
+ {
+ }
+
+ protected override void OnCommand(object message)
+ {
+ switch (message) {
+ case DeleteCommand delete:
+ DeleteMessages(delete.ToSequenceNr);
+ Sender.Tell($"{delete.ToSequenceNr}-deleted");
+ break;
+ case string cmd:
+ var sender = Sender;
+ Persist(cmd, e => sender.Tell($"{e}-done"));
+ break;
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs
index 38f059a..876657b 100644
--- a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs
@@ -41,4 +41,36 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
return ConfigurationFactory.ParseString(specString);
}
}
+
+ [Collection("MongoDbSpec")]
+ public class MongoDbBinaryJournalSpec : JournalSpec, IClassFixture
+ {
+ protected override bool SupportsRejectingNonSerializableObjects { get; } = false;
+
+ public MongoDbBinaryJournalSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbJournalSpec")
+ {
+ Initialize();
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ stored-as = binary
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+ }
}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs
new file mode 100644
index 0000000..980374d
--- /dev/null
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs
@@ -0,0 +1,87 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2016 Lightbend Inc.
+// Copyright (C) 2013-2016 Akka.NET project
+//
+//-----------------------------------------------------------------------
+
+using Akka.Persistence.TCK.Journal;
+using Xunit;
+using Akka.Configuration;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Persistence.Query;
+using Xunit.Abstractions;
+using Akka.Util.Internal;
+using System;
+using Akka.Actor;
+using Akka.Streams.TestKit;
+using System.Linq;
+using System.Diagnostics;
+
+namespace Akka.Persistence.MongoDb.Tests
+{
+ [Collection("MongoDbSpec")]
+ public class MongoDbPersistenceIdsSpec : Akka.Persistence.TCK.Query.PersistenceIdsSpec, IClassFixture
+ {
+ public static readonly AtomicCounter Counter = new AtomicCounter(0);
+ private readonly ITestOutputHelper _output;
+
+ public MongoDbPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
+ : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbPersistenceIdsSpec", output)
+ {
+ _output = output;
+ output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
+ ReadJournal = Sys.ReadJournalFor(MongoDbReadJournal.Identifier);
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ journal {
+ plugin = ""akka.persistence.journal.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + id + @"""
+ auto-initialize = on
+ collection = ""EventJournal""
+ }
+ }
+ query {
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
+ refresh-interval = 1s
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+
+ [Fact]
+ public void ReadJournal_ConcurrentMessaging_should_work()
+ {
+ Enumerable.Range(1, 100).AsParallel().ForEach(_ => {
+ Setup(Guid.NewGuid().ToString(), 1);
+ Setup(Guid.NewGuid().ToString(), 1);
+ });
+ }
+
+ private IActorRef Setup(string persistenceId, int n)
+ {
+ var sw = Stopwatch.StartNew();
+ var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId));
+ for (int i = 1; i <= n; i++) {
+ pref.Tell($"{persistenceId}-{i}");
+ ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done");
+ }
+ _output.WriteLine(sw.ElapsedMilliseconds.ToString());
+ return pref;
+ }
+
+ }
+
+
+}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs
index ce5f864..51409ab 100644
--- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs
@@ -22,6 +22,7 @@ public void Mongo_JournalSettings_must_have_default_values()
mongoPersistence.JournalSettings.AutoInitialize.Should().BeFalse();
mongoPersistence.JournalSettings.Collection.Should().Be("EventJournal");
mongoPersistence.JournalSettings.MetadataCollection.Should().Be("Metadata");
+ mongoPersistence.JournalSettings.StoredAs.Should().BeOfType();
}
[Fact]
@@ -32,6 +33,7 @@ public void Mongo_SnapshotStoreSettingsSettings_must_have_default_values()
mongoPersistence.SnapshotStoreSettings.ConnectionString.Should().Be(string.Empty);
mongoPersistence.SnapshotStoreSettings.AutoInitialize.Should().BeFalse();
mongoPersistence.SnapshotStoreSettings.Collection.Should().Be("SnapshotStore");
+ mongoPersistence.SnapshotStoreSettings.StoredAs.Should().BeOfType();
}
}
}
diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs
index 87d9a4e..8109ca8 100644
--- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs
+++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSnapshotStoreSpec.cs
@@ -39,4 +39,34 @@ class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persisten
return ConfigurationFactory.ParseString(specString);
}
}
+
+ [Collection("MongoDbSpec")]
+ public class MongoDbBinarySnapshotStoreSpec : SnapshotStoreSpec, IClassFixture
+ {
+ public MongoDbBinarySnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec")
+ {
+ Initialize();
+ }
+
+ private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
+ {
+ var specString = @"
+ akka.test.single-expect-default = 3s
+ akka.persistence {
+ publish-plugin-commands = on
+ snapshot-store {
+ plugin = ""akka.persistence.snapshot-store.mongodb""
+ mongodb {
+ class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
+ connection-string = """ + databaseFixture.ConnectionString + @"""
+ auto-initialize = on
+ collection = ""SnapshotStore""
+ stored-as = binary
+ }
+ }
+ }";
+
+ return ConfigurationFactory.ParseString(specString);
+ }
+ }
}
diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
index d62c621..6ebc6f5 100644
--- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
+++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
@@ -9,8 +9,8 @@
-
-
+
+
$(DefineConstants);RELEASE
diff --git a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
index 3733428..1cb7b1b 100644
--- a/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
+++ b/src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
@@ -5,7 +5,9 @@
//
//-----------------------------------------------------------------------
+using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
+using System.Collections.Generic;
namespace Akka.Persistence.MongoDb.Journal
{
@@ -13,7 +15,7 @@ namespace Akka.Persistence.MongoDb.Journal
/// Class used for storing intermediate result of the
/// as BsonDocument into the MongoDB-Collection
///
- public class JournalEntry
+ public class JournalEntry
{
[BsonId]
public string Id { get; set; }
@@ -32,5 +34,14 @@ public class JournalEntry
[BsonElement("Manifest")]
public string Manifest { get; set; }
+
+ [BsonElement("Ordering")]
+ public BsonTimestamp Ordering { get; set; }
+
+ [BsonElement("Tags")]
+ public ICollection Tags { get; set; } = new HashSet();
+
+ [BsonElement("SerializerId")]
+ public int? SerializerId { get; set; }
}
}
diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
index ba97673..14f902c 100644
--- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
+++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
@@ -6,12 +6,19 @@
//-----------------------------------------------------------------------
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Persistence.Journal;
+using Akka.Persistence.MongoDb.Query;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using MongoDB.Bson;
+using Akka.Serialization;
+using Akka.Util;
using MongoDB.Driver;
namespace Akka.Persistence.MongoDb.Journal
@@ -22,13 +29,51 @@ namespace Akka.Persistence.MongoDb.Journal
public class MongoDbJournal : AsyncWriteJournal
{
private readonly MongoDbJournalSettings _settings;
+
private Lazy _mongoDatabase;
private Lazy> _journalCollection;
private Lazy> _metadataCollection;
+ private readonly HashSet _allPersistenceIds = new HashSet();
+ private readonly HashSet _allPersistenceIdSubscribers = new HashSet();
+ private readonly Dictionary> _tagSubscribers =
+ new Dictionary>();
+ private readonly Dictionary> _persistenceIdSubscribers
+ = new Dictionary>();
+
+ private readonly Func