From ea679d4914847d69ea1d0498c456a657b1289f4a Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 30 Nov 2016 14:44:10 +0100 Subject: [PATCH 1/3] batching postgres journal --- .../Akka.Persistence.PostgreSql.Tests.csproj | 8 +- ...BatchingPostgreSqlAllPersistenceIdsSpec.cs | 42 +++++ ...hingPostgreSqlEventsByPersistenceIdSpec.cs | 42 +++++ .../BatchingPostgreSqlEventsByTagSpec.cs | 48 +++++ .../Batching/BatchingPostgreSqlJournalSpec.cs | 56 ++++++ .../app.config | 6 - .../packages.config | 2 +- .../Akka.Persistence.PostgreSql.csproj | 5 +- .../Journal/BatchingPostgreSqlJournal.cs | 174 ++++++++++++++++++ .../packages.config | 2 +- 10 files changed, 373 insertions(+), 12 deletions(-) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlAllPersistenceIdsSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByPersistenceIdSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByTagSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlJournalSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj index 456cb4a..5a97ea0 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj +++ b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj @@ -99,8 +99,8 @@ ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll True - - ..\packages\Npgsql.3.1.7\lib\net45\Npgsql.dll + + ..\packages\Npgsql.3.1.9\lib\net45\Npgsql.dll True @@ -144,6 +144,10 @@ Properties\SharedAssemblyInfo.cs + + + + diff --git a/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlAllPersistenceIdsSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlAllPersistenceIdsSpec.cs new file mode 100644 index 0000000..7a53577 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlAllPersistenceIdsSpec.cs @@ -0,0 +1,42 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Typesafe Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.Sql.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Batching +{ + [Collection("PostgreSqlSpec")] + public class BatchingPostgreSqlAllPersistenceIdsSpec : AllPersistenceIdsSpec + { + public static Config SpecConfig => ConfigurationFactory.ParseString($@" + akka.loglevel = INFO + akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql"" + akka.persistence.journal.postgresql {{ + class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + auto-initialize = on + connection-string-name = ""TestDb"" + refresh-interval = 1s + }} + akka.test.single-expect-default = 10s"); + + public BatchingPostgreSqlAllPersistenceIdsSpec(ITestOutputHelper output) : base(SpecConfig, output) + { + DbUtils.Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByPersistenceIdSpec.cs new file mode 100644 index 0000000..b8aca10 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByPersistenceIdSpec.cs @@ -0,0 +1,42 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Typesafe Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.Sql.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Batching +{ + [Collection("PostgreSqlSpec")] + public class BatchingPostgreSqlEventsByPersistenceIdSpec : EventsByPersistenceIdSpec + { + public static Config SpecConfig => ConfigurationFactory.ParseString($@" + akka.loglevel = INFO + akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql"" + akka.persistence.journal.postgresql {{ + class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + auto-initialize = on + connection-string-name = ""TestDb"" + refresh-interval = 1s + }} + akka.test.single-expect-default = 10s"); + + public BatchingPostgreSqlEventsByPersistenceIdSpec(ITestOutputHelper output) : base(SpecConfig, output) + { + DbUtils.Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByTagSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByTagSpec.cs new file mode 100644 index 0000000..645fe97 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlEventsByTagSpec.cs @@ -0,0 +1,48 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Typesafe Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.Sql.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Batching +{ + [Collection("PostgreSqlSpec")] + public class BatchingPostgreSqlEventsByTagSpec : EventsByTagSpec + { + public static Config SpecConfig => ConfigurationFactory.ParseString($@" + akka.loglevel = DEBUG + akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql"" + akka.persistence.journal.postgresql {{ + event-adapters {{ + color-tagger = ""Akka.Persistence.Sql.TestKit.ColorTagger, Akka.Persistence.Sql.TestKit"" + }} + event-adapter-bindings = {{ + ""System.String"" = color-tagger + }} + class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + auto-initialize = on + connection-string-name = ""TestDb"" + refresh-interval = 1s + }} + akka.test.single-expect-default = 10s"); + + public BatchingPostgreSqlEventsByTagSpec(ITestOutputHelper output) : base(SpecConfig, output) + { + DbUtils.Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlJournalSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlJournalSpec.cs new file mode 100644 index 0000000..4092989 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Batching/BatchingPostgreSqlJournalSpec.cs @@ -0,0 +1,56 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Typesafe Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TestKit.Journal; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Batching +{ + [Collection("PostgreSqlSpec")] + public class BatchingPostgreSqlJournalSpec : JournalSpec + { + private static readonly Config SpecConfig; + + static BatchingPostgreSqlJournalSpec() + { + var config = @" + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + schema-name = public + auto-initialize = on + connection-string-name = ""TestDb"" + } + } + }"; + + SpecConfig = ConfigurationFactory.ParseString(config); + + //need to make sure db is created before the tests start + DbUtils.Initialize(); + } + + public BatchingPostgreSqlJournalSpec(ITestOutputHelper output) + : base(SpecConfig, "PostgreSqlJournalSpec", output: output) + { + Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/app.config b/src/Akka.Persistence.PostgreSql.Tests/app.config index 633d40e..da9a8c3 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/app.config +++ b/src/Akka.Persistence.PostgreSql.Tests/app.config @@ -27,11 +27,5 @@ - - - - - - diff --git a/src/Akka.Persistence.PostgreSql.Tests/packages.config b/src/Akka.Persistence.PostgreSql.Tests/packages.config index 43c18cc..9b555da 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/packages.config +++ b/src/Akka.Persistence.PostgreSql.Tests/packages.config @@ -14,7 +14,7 @@ - + diff --git a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj index c2b66ef..c1aae7c 100644 --- a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj +++ b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj @@ -57,8 +57,8 @@ ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll True - - ..\packages\Npgsql.3.1.7\lib\net45\Npgsql.dll + + ..\packages\Npgsql.3.1.9\lib\net45\Npgsql.dll True @@ -79,6 +79,7 @@ Properties\SharedAssemblyInfo.cs + diff --git a/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs new file mode 100644 index 0000000..13e952d --- /dev/null +++ b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs @@ -0,0 +1,174 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Configuration; +using System.Data; +using System.Data.Common; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Sql.Common.Journal; +using Akka.Serialization; +using Akka.Util; +using Newtonsoft.Json; +using Npgsql; +using NpgsqlTypes; +using ConfigurationException = Akka.Configuration.ConfigurationException; + +namespace Akka.Persistence.PostgreSql.Journal +{ + public sealed class BatchingPostgresJournalSetup : BatchingSqlJournalSetup + { + public readonly StoredAsType StoredAs; + public readonly JsonSerializerSettings JsonSerializerSettings; + + public static BatchingPostgresJournalSetup Create(Config config) + { + if (config == null) throw new ArgumentNullException(nameof(config), "Postgres journal settings cannot be initialized, because required HOCON section couldn't been found"); + + var connectionString = config.GetString("connection-string"); + if (string.IsNullOrWhiteSpace(connectionString)) + { + connectionString = ConfigurationManager + .ConnectionStrings[config.GetString("connection-string-name", "DefaultConnection")] + .ConnectionString; + } + + if (string.IsNullOrWhiteSpace(connectionString)) + throw new ArgumentException("No connection string for Sql Event Journal was specified"); + + StoredAsType storedAs; + var storedAsString = config.GetString("stored-as", "JsonB"); + if (!Enum.TryParse(storedAsString, true, out storedAs)) + { + throw new ConfigurationException($"Value [{storedAsString}] of the 'stored-as' HOCON config key is not valid. Valid values: bytea, json, jsonb."); + } + + return new BatchingPostgresJournalSetup( + connectionString: connectionString, + maxConcurrentOperations: config.GetInt("max-concurrent-operations", 64), + maxBatchSize: config.GetInt("max-batch-size", 100), + autoInitialize: config.GetBoolean("auto-initialize", false), + connectionTimeout: config.GetTimeSpan("connection-timeout", TimeSpan.FromSeconds(30)), + circuitBreakerSettings: CircuitBreakerSettings.Create(config.GetConfig("circuit-breaker")), + namingConventions: new QueryConfiguration( + schemaName: config.GetString("schema-name", "public"), + journalEventsTableName: config.GetString("table-name", "event_journal"), + metaTableName: config.GetString("metadata-table-name", "metadata"), + persistenceIdColumnName: "persistence_id", + sequenceNrColumnName: "sequence_nr", + payloadColumnName: "payload", + manifestColumnName: "manifest", + timestampColumnName: "created_at", + isDeletedColumnName: "is_deleted", + tagsColumnName: "tags", + orderingColumnName: "ordering", + timeout: config.GetTimeSpan("connection-timeout", TimeSpan.FromSeconds(30))), + storedAs: storedAs, + jsonSerializerSettings: new JsonSerializerSettings + { + ContractResolver = new NewtonSoftJsonSerializer.AkkaContractResolver() + }); + } + + public BatchingPostgresJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, bool autoInitialize, + TimeSpan connectionTimeout, CircuitBreakerSettings circuitBreakerSettings, QueryConfiguration namingConventions, + StoredAsType storedAs, JsonSerializerSettings jsonSerializerSettings = null) + : base(connectionString, maxConcurrentOperations, maxBatchSize, autoInitialize, connectionTimeout, circuitBreakerSettings, namingConventions) + { + StoredAs = storedAs; + JsonSerializerSettings = jsonSerializerSettings; + } + } + + public class BatchingPostgreSqlJournal : BatchingSqlJournal + { + private readonly Func> serialize; + private readonly Func deserialize; + + public BatchingPostgreSqlJournal(Config config) : this(BatchingPostgresJournalSetup.Create(config)) + { + } + + public BatchingPostgreSqlJournal(BatchingPostgresJournalSetup setup) : base(setup) + { + var conventions = Setup.NamingConventions; + Initializers = ImmutableDictionary.CreateRange(new[] + { + new KeyValuePair("CreateJournalSql", $@" + CREATE TABLE IF NOT EXISTS {conventions.FullJournalTableName} ( + {conventions.OrderingColumnName} BIGSERIAL NOT NULL PRIMARY KEY, + {conventions.PersistenceIdColumnName} VARCHAR(255) NOT NULL, + {conventions.SequenceNrColumnName} BIGINT NOT NULL, + {conventions.IsDeletedColumnName} BOOLEAN NOT NULL, + {conventions.TimestampColumnName} BIGINT NOT NULL, + {conventions.ManifestColumnName} VARCHAR(500) NOT NULL, + {conventions.PayloadColumnName} {setup.StoredAs} NOT NULL, + {conventions.TagsColumnName} VARCHAR(100) NULL, + CONSTRAINT {conventions.JournalEventsTableName}_uq UNIQUE ({conventions.PersistenceIdColumnName}, {conventions.SequenceNrColumnName}) + );"), + new KeyValuePair("CreateMetadataSql", $@" + CREATE TABLE IF NOT EXISTS {conventions.FullMetaTableName} ( + {conventions.PersistenceIdColumnName} VARCHAR(255) NOT NULL, + {conventions.SequenceNrColumnName} BIGINT NOT NULL, + CONSTRAINT {conventions.MetaTableName}_pk PRIMARY KEY ({conventions.PersistenceIdColumnName}, {conventions.SequenceNrColumnName}) + );"), + }); + + switch (setup.StoredAs) + { + case StoredAsType.ByteA: + var serialization = Context.System.Serialization; + serialize = e => new KeyValuePair(NpgsqlDbType.Bytea, serialization.FindSerializerFor(e.Payload).ToBinary(e.Payload)); + deserialize = (type, serialized) => serialization.FindSerializerForType(type).FromBinary((byte[])serialized, type); + break; + case StoredAsType.JsonB: + serialize = e => new KeyValuePair(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, setup.JsonSerializerSettings)); + deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, setup.JsonSerializerSettings); + break; + case StoredAsType.Json: + serialize = e => new KeyValuePair(NpgsqlDbType.Json, JsonConvert.SerializeObject(e.Payload, setup.JsonSerializerSettings)); + deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, setup.JsonSerializerSettings); + break; + default: + throw new NotSupportedException($"{setup.StoredAs} is not supported Db type for a payload"); + } + } + + protected override NpgsqlConnection CreateConnection(string connectionString) => + new NpgsqlConnection(connectionString); + + protected override ImmutableDictionary Initializers { get; } + + protected override void WriteEvent(NpgsqlCommand command, IPersistentRepresentation persistent, string tags = "") + { + var payloadType = persistent.Payload.GetType(); + var manifest = string.IsNullOrEmpty(persistent.Manifest) + ? payloadType.TypeQualifiedName() + : persistent.Manifest; + var t = serialize(persistent); + + AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId); + AddParameter(command, "@SequenceNr", DbType.Int64, persistent.SequenceNr); + AddParameter(command, "@Timestamp", DbType.Int64, 0L); + AddParameter(command, "@IsDeleted", DbType.Boolean, false); + AddParameter(command, "@Manifest", DbType.String, manifest); + command.Parameters.Add(new NpgsqlParameter("@Payload", t.Key) { Value = t.Value }); + AddParameter(command, "@Tag", DbType.String, tags); + } + + protected override IPersistentRepresentation ReadEvent(DbDataReader reader) + { + var persistenceId = reader.GetString(PersistenceIdIndex); + var sequenceNr = reader.GetInt64(SequenceNrIndex); + var isDeleted = reader.GetBoolean(IsDeletedIndex); + var manifest = reader.GetString(ManifestIndex); + var payload = reader[PayloadIndex]; + + var type = Type.GetType(manifest, true); + var deserialized = deserialize(type, payload); + + var persistent = new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null); + return persistent; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql/packages.config b/src/Akka.Persistence.PostgreSql/packages.config index 5e60271..e86d3df 100644 --- a/src/Akka.Persistence.PostgreSql/packages.config +++ b/src/Akka.Persistence.PostgreSql/packages.config @@ -5,6 +5,6 @@ - + \ No newline at end of file From e6bafafae9c24caad69a0a6eac4b0d0afca1a7bf Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 30 Nov 2016 14:53:00 +0100 Subject: [PATCH 2/3] batching journal --- src/Akka.Persistence.PostgreSql.sln | 6 ++ .../Journal/BatchingPostgreSqlJournal.cs | 8 +- src/Benchmark/App.config | 9 ++ src/Benchmark/Benchmark.csproj | 76 +++++++++++++ src/Benchmark/Messages.cs | 70 ++++++++++++ src/Benchmark/PerformanceActor.cs | 87 +++++++++++++++ src/Benchmark/Program.cs | 101 ++++++++++++++++++ src/Benchmark/Properties/AssemblyInfo.cs | 36 +++++++ 8 files changed, 390 insertions(+), 3 deletions(-) create mode 100644 src/Benchmark/App.config create mode 100644 src/Benchmark/Benchmark.csproj create mode 100644 src/Benchmark/Messages.cs create mode 100644 src/Benchmark/PerformanceActor.cs create mode 100644 src/Benchmark/Program.cs create mode 100644 src/Benchmark/Properties/AssemblyInfo.cs diff --git a/src/Akka.Persistence.PostgreSql.sln b/src/Akka.Persistence.PostgreSql.sln index dce83e5..938472d 100644 --- a/src/Akka.Persistence.PostgreSql.sln +++ b/src/Akka.Persistence.PostgreSql.sln @@ -30,6 +30,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution ..\README.md = ..\README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -44,6 +46,10 @@ Global {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug|Any CPU.Build.0 = Debug|Any CPU {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.ActiveCfg = Release|Any CPU {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.Build.0 = Release|Any CPU + {72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Debug|Any CPU.Build.0 = Debug|Any CPU + {72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Release|Any CPU.ActiveCfg = Release|Any CPU + {72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs index 13e952d..700c835 100644 --- a/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs +++ b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs @@ -47,9 +47,11 @@ public static BatchingPostgresJournalSetup Create(Config config) connectionString: connectionString, maxConcurrentOperations: config.GetInt("max-concurrent-operations", 64), maxBatchSize: config.GetInt("max-batch-size", 100), + maxBufferSize: config.GetInt("max-buffer-size", 500000), autoInitialize: config.GetBoolean("auto-initialize", false), connectionTimeout: config.GetTimeSpan("connection-timeout", TimeSpan.FromSeconds(30)), circuitBreakerSettings: CircuitBreakerSettings.Create(config.GetConfig("circuit-breaker")), + replayFilterSettings: ReplayFilterSettings.Create(config.GetConfig("replay-filter")), namingConventions: new QueryConfiguration( schemaName: config.GetString("schema-name", "public"), journalEventsTableName: config.GetString("table-name", "event_journal"), @@ -70,10 +72,10 @@ public static BatchingPostgresJournalSetup Create(Config config) }); } - public BatchingPostgresJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, bool autoInitialize, - TimeSpan connectionTimeout, CircuitBreakerSettings circuitBreakerSettings, QueryConfiguration namingConventions, + public BatchingPostgresJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, + TimeSpan connectionTimeout, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, StoredAsType storedAs, JsonSerializerSettings jsonSerializerSettings = null) - : base(connectionString, maxConcurrentOperations, maxBatchSize, autoInitialize, connectionTimeout, circuitBreakerSettings, namingConventions) + : base(connectionString, maxConcurrentOperations, maxBatchSize, maxBufferSize, autoInitialize, connectionTimeout, circuitBreakerSettings, replayFilterSettings, namingConventions) { StoredAs = storedAs; JsonSerializerSettings = jsonSerializerSettings; diff --git a/src/Benchmark/App.config b/src/Benchmark/App.config new file mode 100644 index 0000000..5668215 --- /dev/null +++ b/src/Benchmark/App.config @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/src/Benchmark/Benchmark.csproj b/src/Benchmark/Benchmark.csproj new file mode 100644 index 0000000..339b4ae --- /dev/null +++ b/src/Benchmark/Benchmark.csproj @@ -0,0 +1,76 @@ + + + + + Debug + AnyCPU + {72F4C1B4-9D86-4222-A6D5-17AC91B9ED41} + Exe + Properties + Benchmark + Benchmark + v4.5 + 512 + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.dll + + + ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.Persistence.dll + + + ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.Persistence.Sql.Common.dll + + + + + + + + + + + + + + + + + + + + + + {4b89227b-5ad1-4061-816f-570067c3727f} + Akka.Persistence.PostgreSql + + + + + \ No newline at end of file diff --git a/src/Benchmark/Messages.cs b/src/Benchmark/Messages.cs new file mode 100644 index 0000000..a2970f6 --- /dev/null +++ b/src/Benchmark/Messages.cs @@ -0,0 +1,70 @@ +using System; +using System.Runtime.Serialization; + +namespace Benchmark +{ + [Serializable] + public sealed class StopMeasure + { + public static readonly StopMeasure Instance = new StopMeasure(); + + private StopMeasure() + { + } + } + + [Serializable] + public sealed class FailAt + { + public readonly long SequenceNr; + + public FailAt(long sequenceNr) + { + SequenceNr = sequenceNr; + } + } + + [Serializable] + public sealed class Measure + { + public readonly int MessagesCount; + + public Measure(int messagesCount) + { + MessagesCount = messagesCount; + } + + public DateTime StartedAt { get; private set; } + public DateTime StopedAt { get; private set; } + + public void StartMeasure() + { + StartedAt = DateTime.Now; + } + + public double StopMeasure() + { + StopedAt = DateTime.Now; + return MessagesCount / (StopedAt - StartedAt).TotalSeconds; + } + } + + public class PerformanceTestException : Exception + { + public PerformanceTestException() + { + } + + public PerformanceTestException(string message) : base(message) + { + } + + public PerformanceTestException(string message, Exception innerException) : base(message, innerException) + { + } + + protected PerformanceTestException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} \ No newline at end of file diff --git a/src/Benchmark/PerformanceActor.cs b/src/Benchmark/PerformanceActor.cs new file mode 100644 index 0000000..16e525c --- /dev/null +++ b/src/Benchmark/PerformanceActor.cs @@ -0,0 +1,87 @@ +using Akka; +using Akka.Actor; +using Akka.Persistence; + +namespace Benchmark +{ + public sealed class Init + { + public static readonly Init Instance = new Init(); + private Init() { } + } + + public sealed class Finish + { + public static readonly Finish Instance = new Finish(); + private Finish() { } + } + public sealed class Done + { + public static readonly Done Instance = new Done(); + private Done() { } + } + public sealed class Finished + { + public readonly long State; + + public Finished(long state) + { + State = state; + } + } + + public sealed class Store + { + public readonly int Value; + + public Store(int value) + { + Value = value; + } + } + + public sealed class Stored + { + public readonly int Value; + + public Stored(int value) + { + Value = value; + } + } + + public class PerformanceTestActor : PersistentActor + { + private long state = 0L; + public PerformanceTestActor(string persistenceId) + { + PersistenceId = persistenceId; + } + + public sealed override string PersistenceId { get; } + + protected override bool ReceiveRecover(object message) => message.Match() + .With(s => state += s.Value) + .WasHandled; + + protected override bool ReceiveCommand(object message) => message.Match() + .With(store => + { + Persist(new Stored(store.Value), s => + { + state += s.Value; + }); + }) + .With(_ => + { + var sender = Sender; + Persist(new Stored(0), s => + { + state += s.Value; + sender.Tell(Done.Instance); + }); + }) + .With(_ => Sender.Tell(new Finished(state))) + .WasHandled; + } +} \ No newline at end of file diff --git a/src/Benchmark/Program.cs b/src/Benchmark/Program.cs new file mode 100644 index 0000000..b5a2ca3 --- /dev/null +++ b/src/Benchmark/Program.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Pattern; + +namespace Benchmark +{ + class Program + { + // if you want to benchmark your persistent storage provides, paste the configuration in string below + // by default we're checking against in-memory journal + private static Config config = ConfigurationFactory.ParseString(@" + akka { + #loglevel = DEBUG + suppress-json-serializer-warning = true + persistence.journal { + plugin = ""akka.persistence.journal.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + schema-name = public + auto-initialize = on + connection-string-name = ""TestDb"" + connection-timeout = 30s + refresh-interval = 1s + max-concurrent-operations = 64 + max-batch-size = 100 + circuit-breaker { + max-failures = 10 + call-timeout = 30s + reset-timeout = 30s + } + } + } + }"); + + public const int ActorCount = 10000; + public const int MessagesPerActor = 100; + + static void Main(string[] args) + { + using (var system = ActorSystem.Create("persistent-benchmark", config.WithFallback(ConfigurationFactory.Default()))) + { + Console.WriteLine("Performance benchmark starting..."); + + var stopwatch = new Stopwatch(); + + var actors = new IActorRef[ActorCount]; + for (int i = 0; i < ActorCount; i++) + { + var pid = "a-" + i; + actors[i] = system.ActorOf(Props.Create(() => new PerformanceTestActor(pid))); + } + + stopwatch.Start(); + + Task.WaitAll(actors.Select(a => a.Ask(Init.Instance)).Cast().ToArray()); + + stopwatch.Stop(); + + Console.WriteLine($"Initialized {ActorCount} eventsourced actors in {stopwatch.ElapsedMilliseconds / 1000.0} sec..."); + + stopwatch.Start(); + + for (int i = 0; i < MessagesPerActor; i++) + for (int j = 0; j < ActorCount; j++) + { + actors[j].Tell(new Store(1)); + } + + var finished = new Task[ActorCount]; + for (int i = 0; i < ActorCount; i++) + { + finished[i] = actors[i].Ask(Finish.Instance); + } + + Task.WaitAll(finished); + + stopwatch.Stop(); + var elapsed = stopwatch.ElapsedMilliseconds; + + Console.WriteLine($"{ActorCount} actors stored {MessagesPerActor} events each in {elapsed / 1000.0} sec. Average: {ActorCount * MessagesPerActor * 1000.0 / elapsed} events/sec"); + + foreach (Task task in finished) + { + if (!task.IsCompleted || task.Result.State != MessagesPerActor) + throw new IllegalStateException("Actor's state was invalid"); + } + } + + Console.WriteLine("Press Enter to exit..."); + Console.ReadLine(); + } + } +} diff --git a/src/Benchmark/Properties/AssemblyInfo.cs b/src/Benchmark/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..b9a5ade --- /dev/null +++ b/src/Benchmark/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Benchmark")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Benchmark")] +[assembly: AssemblyCopyright("Copyright © 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("72f4c1b4-9d86-4222-a6d5-17ac91b9ed41")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] From 705d15e2e19af0c1bd5b9de4e8c5520a098cec87 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 24 Jan 2017 10:37:18 +0100 Subject: [PATCH 3/3] updated to v1.1.3 with batching journal --- README.md | 14 ++++++ .../Akka.Persistence.PostgreSql.Tests.csproj | 47 ++++++++++++++++++- .../packages.config | 31 +++++------- .../Akka.Persistence.PostgreSql.csproj | 11 +++++ .../Journal/BatchingPostgreSqlJournal.cs | 24 +++++++--- .../packages.config | 6 +-- src/Benchmark/Benchmark.csproj | 28 ++++++++--- src/Benchmark/packages.config | 8 ++++ 8 files changed, 133 insertions(+), 36 deletions(-) create mode 100644 src/Benchmark/packages.config diff --git a/README.md b/README.md index 253244c..49f490d 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,20 @@ akka.persistence{ } } ``` + +### Batching journal + +Since version 1.1.3 an alternative, experimental type of the journal has been released, known as batching journal. It's optimized for concurrent writes made by multiple persistent actors, thanks to the ability of batching multiple SQL operations to be executed within the same database connection. In some of those situations we've noticed over an order of magnitude in event write speed. + +To use batching journal, simply change `akka.persistence.journal.sql-server.class` to *Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer*. + +Additionally to the existing settings, batching journal introduces few more: + +- `isolation-level` to define isolation level for transactions used withing event reads/writes. Possible options: *unspecified* (default), *chaos*, *read-committed*, *read-uncommitted*, *repeatable-read*, *serializable* or *snapshot*. +- `max-concurrent-operations` is used to limit the maximum number of database connections used by this journal. You can use them in situations when you want to partition the same ADO.NET pool between multiple components. Current default: *64*. +- `max-batch-size` defines the maximum number of SQL operations, that are allowed to be executed using the same connection. When there are more operations, they will chunked into subsequent connections. Current default: *100*. +- `max-buffer-size` defines maximum buffer capacity for the requests send to a journal. Once buffer gets overflown, a journal will call `OnBufferOverflow` method. By default it will reject all incoming requests until the buffer space gets freed. You can inherit from `BatchingSqlServerJournal` and override that method to provide a custom backpressure strategy. Current default: *500 000*. + ### Table Schema PostgreSql persistence plugin defines a default table schema used for journal, snapshot store and metadate table. diff --git a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj index 5a97ea0..651f146 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj +++ b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj @@ -77,6 +77,49 @@ ..\packages\Akka.TestKit.Xunit2.1.1.2\lib\net45\Akka.TestKit.Xunit2.dll + + + ..\packages\Akka.1.1.3\lib\net45\Akka.dll + True + + + ..\packages\Akka.Persistence.1.1.3.32-beta\lib\net45\Akka.Persistence.dll + True + + + ..\packages\Akka.Persistence.Query.1.1.3.32-beta\lib\net45\Akka.Persistence.Query.dll + True + + + ..\packages\Akka.Persistence.Query.Sql.1.1.3.32-beta\lib\net45\Akka.Persistence.Query.Sql.dll + True + + + ..\packages\Akka.Persistence.Sql.Common.1.1.3.32-beta\lib\net45\Akka.Persistence.Sql.Common.dll + True + + + ..\packages\Akka.Persistence.Sql.TestKit.1.1.3.32-beta\lib\net45\Akka.Persistence.Sql.TestKit.dll + True + + + ..\packages\Akka.Persistence.TestKit.1.1.3.32-beta\lib\net45\Akka.Persistence.TestKit.dll + True + + + ..\packages\Akka.Streams.1.1.3.32-beta\lib\net45\Akka.Streams.dll + True + + + ..\packages\Akka.Streams.TestKit.1.1.3.32-beta\lib\net45\Akka.Streams.TestKit.dll + True + + + ..\packages\Akka.TestKit.1.1.3\lib\net45\Akka.TestKit.dll + True + + + ..\packages\Akka.TestKit.Xunit2.1.1.3\lib\net45\Akka.TestKit.Xunit2.dll True @@ -103,8 +146,8 @@ ..\packages\Npgsql.3.1.9\lib\net45\Npgsql.dll True - - ..\packages\Reactive.Streams.1.0.1\lib\net40\Reactive.Streams.dll + + ..\packages\Reactive.Streams.1.0.0-RC1\lib\portable-net45+netcore45\Reactive.Streams.dll True diff --git a/src/Akka.Persistence.PostgreSql.Tests/packages.config b/src/Akka.Persistence.PostgreSql.Tests/packages.config index 9b555da..7b7b482 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/packages.config +++ b/src/Akka.Persistence.PostgreSql.Tests/packages.config @@ -1,29 +1,22 @@  - - - - - - - - - - - + + + + + + + + + + + - - + - - - - - - diff --git a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj index c1aae7c..d7a99c5 100644 --- a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj +++ b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj @@ -43,6 +43,17 @@ ..\packages\Akka.Persistence.Sql.Common.1.1.2.30-beta\lib\net45\Akka.Persistence.Sql.Common.dll + + + ..\packages\Akka.1.1.3\lib\net45\Akka.dll + True + + + ..\packages\Akka.Persistence.1.1.3.32-beta\lib\net45\Akka.Persistence.dll + True + + + ..\packages\Akka.Persistence.Sql.Common.1.1.3.32-beta\lib\net45\Akka.Persistence.Sql.Common.dll True diff --git a/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs index 700c835..421b24d 100644 --- a/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs +++ b/src/Akka.Persistence.PostgreSql/Journal/BatchingPostgreSqlJournal.cs @@ -43,6 +43,19 @@ public static BatchingPostgresJournalSetup Create(Config config) throw new ConfigurationException($"Value [{storedAsString}] of the 'stored-as' HOCON config key is not valid. Valid values: bytea, json, jsonb."); } + IsolationLevel level; + switch (config.GetString("isolation-level", "unspecified")) + { + case "chaos": level = IsolationLevel.Chaos; break; + case "read-committed": level = IsolationLevel.ReadCommitted; break; + case "read-uncommitted": level = IsolationLevel.ReadUncommitted; break; + case "repeatable-read": level = IsolationLevel.RepeatableRead; break; + case "serializable": level = IsolationLevel.Serializable; break; + case "snapshot": level = IsolationLevel.Snapshot; break; + case "unspecified": level = IsolationLevel.Unspecified; break; + default: throw new ArgumentException("Unknown isolation-level value. Should be one of: chaos | read-committed | read-uncommitted | repeatable-read | serializable | snapshot | unspecified"); + } + return new BatchingPostgresJournalSetup( connectionString: connectionString, maxConcurrentOperations: config.GetInt("max-concurrent-operations", 64), @@ -50,8 +63,9 @@ public static BatchingPostgresJournalSetup Create(Config config) maxBufferSize: config.GetInt("max-buffer-size", 500000), autoInitialize: config.GetBoolean("auto-initialize", false), connectionTimeout: config.GetTimeSpan("connection-timeout", TimeSpan.FromSeconds(30)), - circuitBreakerSettings: CircuitBreakerSettings.Create(config.GetConfig("circuit-breaker")), - replayFilterSettings: ReplayFilterSettings.Create(config.GetConfig("replay-filter")), + isolationLevel: level, + circuitBreakerSettings: new CircuitBreakerSettings(config.GetConfig("circuit-breaker")), + replayFilterSettings: new ReplayFilterSettings(config.GetConfig("replay-filter")), namingConventions: new QueryConfiguration( schemaName: config.GetString("schema-name", "public"), journalEventsTableName: config.GetString("table-name", "event_journal"), @@ -72,10 +86,8 @@ public static BatchingPostgresJournalSetup Create(Config config) }); } - public BatchingPostgresJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, - TimeSpan connectionTimeout, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, - StoredAsType storedAs, JsonSerializerSettings jsonSerializerSettings = null) - : base(connectionString, maxConcurrentOperations, maxBatchSize, maxBufferSize, autoInitialize, connectionTimeout, circuitBreakerSettings, replayFilterSettings, namingConventions) + public BatchingPostgresJournalSetup(string connectionString, int maxConcurrentOperations, int maxBatchSize, int maxBufferSize, bool autoInitialize, TimeSpan connectionTimeout, IsolationLevel isolationLevel, CircuitBreakerSettings circuitBreakerSettings, ReplayFilterSettings replayFilterSettings, QueryConfiguration namingConventions, StoredAsType storedAs, JsonSerializerSettings jsonSerializerSettings) + : base(connectionString, maxConcurrentOperations, maxBatchSize, maxBufferSize, autoInitialize, connectionTimeout, isolationLevel, circuitBreakerSettings, replayFilterSettings, namingConventions) { StoredAs = storedAs; JsonSerializerSettings = jsonSerializerSettings; diff --git a/src/Akka.Persistence.PostgreSql/packages.config b/src/Akka.Persistence.PostgreSql/packages.config index e86d3df..c71262a 100644 --- a/src/Akka.Persistence.PostgreSql/packages.config +++ b/src/Akka.Persistence.PostgreSql/packages.config @@ -1,8 +1,8 @@  - - - + + + diff --git a/src/Benchmark/Benchmark.csproj b/src/Benchmark/Benchmark.csproj index 339b4ae..0e80e03 100644 --- a/src/Benchmark/Benchmark.csproj +++ b/src/Benchmark/Benchmark.csproj @@ -32,16 +32,31 @@ 4 - - ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.dll + + ..\packages\Akka.1.1.3\lib\net45\Akka.dll + True - - ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.Persistence.dll + + ..\packages\Akka.Persistence.1.1.3.32-beta\lib\net45\Akka.Persistence.dll + True - - ..\..\..\akka.net\src\contrib\persistence\Akka.Persistence.Sql.Common\bin\Debug\Akka.Persistence.Sql.Common.dll + + ..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.dll + True + + + ..\packages\Google.ProtocolBuffers.2.4.1.555\lib\net40\Google.ProtocolBuffers.Serialization.dll + True + + + ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + True + + ..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll + True + @@ -58,6 +73,7 @@ + diff --git a/src/Benchmark/packages.config b/src/Benchmark/packages.config new file mode 100644 index 0000000..bf08920 --- /dev/null +++ b/src/Benchmark/packages.config @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file