Skip to content

Commit

Permalink
Merge pull request #56 from AkkaNetContrib/dev
Browse files Browse the repository at this point in the history
v1.3.12 Release
  • Loading branch information
Aaronontheweb authored Apr 5, 2019
2 parents d9572c3 + ece60c1 commit b128c2b
Show file tree
Hide file tree
Showing 29 changed files with 2,115 additions and 22 deletions.
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#### 1.3.12 April 05 2019 ####
Support for Akka.Persistence 1.3.12.
Added support for Akka.Persistence.Query.
Upgraded to MongoDb v2.7.0 driver (2.8.0 doesn't support .NET 4.5)
Added support for configurable, binary serialization via Akka.NET.


#### 1.3.5 March 23 2018 ####
Support for Akka.Persistence 1.3.5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<DotNetCliToolReference Include="dotnet-xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="1.3.4-beta" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="4.19.4" />
<PackageReference Include="Mongo2Go" Version="2.2.1" />
<PackageReference Include="Mongo2Go" Version="2.2.8" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>

Expand Down
47 changes: 47 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/JournalTestActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Akka.Actor;
using System;
using System.Collections.Generic;
using System.Text;

namespace Akka.Persistence.MongoDb.Tests
{
public class JournalTestActor : UntypedPersistentActor
{
public static Props Props(string persistenceId) => Actor.Props.Create(() => new JournalTestActor(persistenceId));

public sealed class DeleteCommand
{
public DeleteCommand(long toSequenceNr)
{
ToSequenceNr = toSequenceNr;
}

public long ToSequenceNr { get; }
}

public JournalTestActor(string persistenceId)
{
PersistenceId = persistenceId;
}

public override string PersistenceId { get; }

protected override void OnRecover(object message)
{
}

protected override void OnCommand(object message)
{
switch (message) {
case DeleteCommand delete:
DeleteMessages(delete.ToSequenceNr);
Sender.Tell($"{delete.ToSequenceNr}-deleted");
break;
case string cmd:
var sender = Sender;
Persist(cmd, e => sender.Tell($"{e}-done"));
break;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//-----------------------------------------------------------------------
// <copyright file="MongoDbJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Persistence.TCK.Journal;
using Xunit;
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Xunit.Abstractions;
using Akka.Util.Internal;
using System;
using Akka.Actor;
using Akka.Streams.TestKit;
using System.Linq;
using System.Diagnostics;
using Akka.Streams.Dsl;

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;

public MongoDbCurrentEventsByPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByPersistenceIdsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}

private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";

return ConfigurationFactory.ParseString(specString);
}

}


}
124 changes: 124 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentEventsByTagSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//-----------------------------------------------------------------------
// <copyright file="MongoDbJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Persistence.TCK.Journal;
using Xunit;
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Xunit.Abstractions;
using Akka.Util.Internal;
using System;
using Akka.Actor;
using Akka.Streams.TestKit;
using System.Linq;
using System.Diagnostics;

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByTagSpec : Akka.Persistence.TCK.Query.CurrentEventsByTagSpec, IClassFixture<DatabaseFixture>
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;

public MongoDbCurrentEventsByTagSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentEventsByTagSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);

var x = Sys.ActorOf(TestActor.Props("x"));
x.Tell("warm-up");
ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
}

private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
auto-initialize = on
collection = ""EventJournal""
event-adapters {
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
}
event-adapter-bindings = {
""System.String"" = color-tagger
}
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";

return ConfigurationFactory.ParseString(specString);
}


//public override void ReadJournal_query_CurrentEventsByTag_should_find_existing_events()
//{
// var a = Sys.ActorOf(TestActor.Props("a"));
// a.Tell("warm-up");
// ExpectMsg("warm-up-done", TimeSpan.FromSeconds(10));
//}


internal class TestActor : UntypedPersistentActor
{
public static Props Props(string persistenceId) => Actor.Props.Create(() => new TestActor(persistenceId));

public sealed class DeleteCommand
{
public DeleteCommand(long toSequenceNr)
{
ToSequenceNr = toSequenceNr;
}

public long ToSequenceNr { get; }
}

public TestActor(string persistenceId)
{
PersistenceId = persistenceId;
}

public override string PersistenceId { get; }

protected override void OnRecover(object message)
{
}

protected override void OnCommand(object message)
{
switch (message) {
case DeleteCommand delete:
DeleteMessages(delete.ToSequenceNr);
Sender.Tell($"{delete.ToSequenceNr}-deleted");
break;
case string cmd:
var sender = Sender;
Persist(cmd, e => sender.Tell($"{e}-done"));
break;
}
}
}
}


}
100 changes: 100 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentPersistenceIdsSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//-----------------------------------------------------------------------
// <copyright file="MongoDbJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Persistence.TCK.Journal;
using Xunit;
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Xunit.Abstractions;
using Akka.Util.Internal;
using System;
using Akka.Actor;
using Akka.Streams.TestKit;
using System.Linq;
using System.Diagnostics;

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentPersistenceIdsSpec, IClassFixture<DatabaseFixture>
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;

public MongoDbCurrentPersistenceIdsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbCurrentPersistenceIdsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}

private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";

return ConfigurationFactory.ParseString(specString);
}

public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete()
{
var queries = ReadJournal.AsInstanceOf<ICurrentPersistenceIdsQuery>();

Setup("a", 1);
Setup("b", 1);
Setup("c", 1);

var greenSrc = queries.CurrentPersistenceIds();
var probe = greenSrc.RunWith(this.SinkProbe<string>(), Materializer);
var firstTwo = probe.Request(2).ExpectNextN(2);
Assert.Empty(firstTwo.Except(new[] { "a", "b", "c" }).ToArray());

var last = new[] { "a", "b", "c" }.Except(firstTwo).First();
Setup("d", 1);

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
probe.Request(5)
.ExpectNext(last)
.ExpectComplete();
}

private IActorRef Setup(string persistenceId, int n)
{
var sw = Stopwatch.StartNew();
var pref = Sys.ActorOf(JournalTestActor.Props(persistenceId));
for (int i = 1; i <= n; i++) {
pref.Tell($"{persistenceId}-{i}");
ExpectMsg($"{persistenceId}-{i}-done", TimeSpan.FromSeconds(10), $"{persistenceId}-{i}-done");
}
_output.WriteLine(sw.ElapsedMilliseconds.ToString());
return pref;
}

}


}
Loading

0 comments on commit b128c2b

Please sign in to comment.