Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching journal for Postgres #32

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ akka.persistence{
}
}
```

### Batching journal

Since version 1.1.3 an alternative, experimental type of the journal has been released, known as batching journal. It's optimized for concurrent writes made by multiple persistent actors, thanks to the ability of batching multiple SQL operations to be executed within the same database connection. In some of those situations we've noticed over an order of magnitude in event write speed.

To use batching journal, simply change `akka.persistence.journal.sql-server.class` to *Akka.Persistence.SqlServer.Journal.BatchingSqlServerJournal, Akka.Persistence.SqlServer*.

Additionally to the existing settings, batching journal introduces few more:

- `isolation-level` to define isolation level for transactions used withing event reads/writes. Possible options: *unspecified* (default), *chaos*, *read-committed*, *read-uncommitted*, *repeatable-read*, *serializable* or *snapshot*.
- `max-concurrent-operations` is used to limit the maximum number of database connections used by this journal. You can use them in situations when you want to partition the same ADO.NET pool between multiple components. Current default: *64*.
- `max-batch-size` defines the maximum number of SQL operations, that are allowed to be executed using the same connection. When there are more operations, they will chunked into subsequent connections. Current default: *100*.
- `max-buffer-size` defines maximum buffer capacity for the requests send to a journal. Once buffer gets overflown, a journal will call `OnBufferOverflow` method. By default it will reject all incoming requests until the buffer space gets freed. You can inherit from `BatchingSqlServerJournal` and override that method to provide a custom backpressure strategy. Current default: *500 000*.

### Table Schema

PostgreSql persistence plugin defines a default table schema used for journal, snapshot store and metadate table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,49 @@
</Reference>
<Reference Include="Akka.TestKit.Xunit2, Version=1.1.2.29, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.Xunit2.1.1.2\lib\net45\Akka.TestKit.Xunit2.dll</HintPath>
</Reference>
<Reference Include="Akka, Version=1.1.3.31, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.1.1.3\lib\net45\Akka.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence, Version=1.1.3.32, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.1.1.3.32-beta\lib\net45\Akka.Persistence.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.Query, Version=1.1.3.32, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.Query.1.1.3.32-beta\lib\net45\Akka.Persistence.Query.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.Query.Sql, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.Query.Sql.1.1.3.32-beta\lib\net45\Akka.Persistence.Query.Sql.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.Sql.Common, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.Sql.Common.1.1.3.32-beta\lib\net45\Akka.Persistence.Sql.Common.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.Sql.TestKit, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.Sql.TestKit.1.1.3.32-beta\lib\net45\Akka.Persistence.Sql.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Persistence.TestKit, Version=1.1.3.32, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Persistence.TestKit.1.1.3.32-beta\lib\net45\Akka.Persistence.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Streams, Version=1.1.3.32, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Streams.1.1.3.32-beta\lib\net45\Akka.Streams.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.Streams.TestKit, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.Streams.TestKit.1.1.3.32-beta\lib\net45\Akka.Streams.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.TestKit, Version=1.1.3.31, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.1.1.3\lib\net45\Akka.TestKit.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Akka.TestKit.Xunit2, Version=1.1.3.32, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Akka.TestKit.Xunit2.1.1.3\lib\net45\Akka.TestKit.Xunit2.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="FluentAssertions, Version=4.6.3.0, Culture=neutral, PublicKeyToken=33f2691a05b67b6a, processorArchitecture=MSIL">
Expand All @@ -99,12 +142,12 @@
<HintPath>..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Npgsql, Version=3.1.7.0, Culture=neutral, PublicKeyToken=5d8b90d52f46fda7, processorArchitecture=MSIL">
<HintPath>..\packages\Npgsql.3.1.7\lib\net45\Npgsql.dll</HintPath>
<Reference Include="Npgsql, Version=3.1.9.0, Culture=neutral, PublicKeyToken=5d8b90d52f46fda7, processorArchitecture=MSIL">
<HintPath>..\packages\Npgsql.3.1.9\lib\net45\Npgsql.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Reactive.Streams, Version=1.0.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Reactive.Streams.1.0.1\lib\net40\Reactive.Streams.dll</HintPath>
<Reference Include="Reactive.Streams, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Reactive.Streams.1.0.0-RC1\lib\portable-net45+netcore45\Reactive.Streams.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
Expand Down Expand Up @@ -144,6 +187,10 @@
<Compile Include="..\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Batching\BatchingPostgreSqlAllPersistenceIdsSpec.cs" />
<Compile Include="Batching\BatchingPostgreSqlEventsByPersistenceIdSpec.cs" />
<Compile Include="Batching\BatchingPostgreSqlEventsByTagSpec.cs" />
<Compile Include="Batching\BatchingPostgreSqlJournalSpec.cs" />
<Compile Include="DbUtils.cs" />
<Compile Include="Json\PostgreSqlJournalJsonSpec.cs" />
<Compile Include="Json\PostgreSqlSnapshotStoreJsonSpec.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//-----------------------------------------------------------------------
// <copyright file="BatchingPostgreSqlAllPersistenceIdsSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.Sql.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests.Batching
{
[Collection("PostgreSqlSpec")]
public class BatchingPostgreSqlAllPersistenceIdsSpec : AllPersistenceIdsSpec
{
public static Config SpecConfig => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql""
akka.persistence.journal.postgresql {{
class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
auto-initialize = on
connection-string-name = ""TestDb""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s");

public BatchingPostgreSqlAllPersistenceIdsSpec(ITestOutputHelper output) : base(SpecConfig, output)
{
DbUtils.Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//-----------------------------------------------------------------------
// <copyright file="BatchingPostgreSqlEventsByPersistenceIdSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.Sql.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests.Batching
{
[Collection("PostgreSqlSpec")]
public class BatchingPostgreSqlEventsByPersistenceIdSpec : EventsByPersistenceIdSpec
{
public static Config SpecConfig => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql""
akka.persistence.journal.postgresql {{
class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
auto-initialize = on
connection-string-name = ""TestDb""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s");

public BatchingPostgreSqlEventsByPersistenceIdSpec(ITestOutputHelper output) : base(SpecConfig, output)
{
DbUtils.Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//-----------------------------------------------------------------------
// <copyright file="BatchingPostgreSqlEventsByTagSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.Sql.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests.Batching
{
[Collection("PostgreSqlSpec")]
public class BatchingPostgreSqlEventsByTagSpec : EventsByTagSpec
{
public static Config SpecConfig => ConfigurationFactory.ParseString($@"
akka.loglevel = DEBUG
akka.persistence.journal.plugin = ""akka.persistence.journal.postgresql""
akka.persistence.journal.postgresql {{
event-adapters {{
color-tagger = ""Akka.Persistence.Sql.TestKit.ColorTagger, Akka.Persistence.Sql.TestKit""
}}
event-adapter-bindings = {{
""System.String"" = color-tagger
}}
class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
auto-initialize = on
connection-string-name = ""TestDb""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s");

public BatchingPostgreSqlEventsByTagSpec(ITestOutputHelper output) : base(SpecConfig, output)
{
DbUtils.Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//-----------------------------------------------------------------------
// <copyright file="BatchingPostgreSqlJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TestKit.Journal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests.Batching
{
[Collection("PostgreSqlSpec")]
public class BatchingPostgreSqlJournalSpec : JournalSpec
{
private static readonly Config SpecConfig;

static BatchingPostgreSqlJournalSpec()
{
var config = @"
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.postgresql""
postgresql {
class = ""Akka.Persistence.PostgreSql.Journal.BatchingPostgreSqlJournal, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
schema-name = public
auto-initialize = on
connection-string-name = ""TestDb""
}
}
}";

SpecConfig = ConfigurationFactory.ParseString(config);

//need to make sure db is created before the tests start
DbUtils.Initialize();
}

public BatchingPostgreSqlJournalSpec(ITestOutputHelper output)
: base(SpecConfig, "PostgreSqlJournalSpec", output: output)
{
Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
6 changes: 0 additions & 6 deletions src/Akka.Persistence.PostgreSql.Tests/app.config
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,5 @@
<bindingRedirect oldVersion="0.0.0.0-4.6.3.0" newVersion="4.6.3.0" />
</dependentAssembly>
</assemblyBinding>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="Npgsql" publicKeyToken="5d8b90d52f46fda7" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-3.1.4.0" newVersion="3.1.4.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>
33 changes: 13 additions & 20 deletions src/Akka.Persistence.PostgreSql.Tests/packages.config
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Akka" version="1.1.2" targetFramework="net45" />
<package id="Akka.Persistence" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Persistence.Query" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Persistence.Query.Sql" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Persistence.Sql.Common" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Persistence.Sql.TestKit" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Persistence.TestKit" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Streams" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.Streams.TestKit" version="1.1.2.30-beta" targetFramework="net45" />
<package id="Akka.TestKit" version="1.1.2" targetFramework="net45" />
<package id="Akka.TestKit.Xunit2" version="1.1.2" targetFramework="net45" />
<package id="Akka" version="1.1.3" targetFramework="net45" />
<package id="Akka.Persistence" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Persistence.Query" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Persistence.Query.Sql" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Persistence.Sql.Common" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Persistence.Sql.TestKit" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Persistence.TestKit" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Streams" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.Streams.TestKit" version="1.1.3.32-beta" targetFramework="net45" />
<package id="Akka.TestKit" version="1.1.3" targetFramework="net45" />
<package id="Akka.TestKit.Xunit2" version="1.1.3" targetFramework="net45" />
<package id="FluentAssertions" version="4.6.3" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.555" targetFramework="net45" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net45" />
<package id="Npgsql" version="3.1.7" targetFramework="net45" />
<package id="Reactive.Streams" version="1.0.1" targetFramework="net45" />
<package id="System.Collections" version="4.0.0" targetFramework="net45" />
<package id="Npgsql" version="3.1.9" targetFramework="net45" />
<package id="Reactive.Streams" version="1.0.0-RC1" targetFramework="net45" />
<package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" />
<package id="System.Globalization" version="4.0.0" targetFramework="net45" />
<package id="System.Linq" version="4.0.0" targetFramework="net45" />
<package id="System.Resources.ResourceManager" version="4.0.0" targetFramework="net45" />
<package id="System.Runtime" version="4.0.0" targetFramework="net45" />
<package id="System.Runtime.Extensions" version="4.0.0" targetFramework="net45" />
<package id="System.Threading" version="4.0.0" targetFramework="net45" />
<package id="xunit" version="2.1.0" targetFramework="net45" />
<package id="xunit.abstractions" version="2.0.0" targetFramework="net45" />
<package id="xunit.assert" version="2.1.0" targetFramework="net45" />
Expand Down
6 changes: 6 additions & 0 deletions src/Akka.Persistence.PostgreSql.sln
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\README.md = ..\README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -44,6 +46,10 @@ Global
{2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.Build.0 = Release|Any CPU
{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Debug|Any CPU.Build.0 = Debug|Any CPU
{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Release|Any CPU.ActiveCfg = Release|Any CPU
{72F4C1B4-9D86-4222-A6D5-17AC91B9ED41}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Loading