Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added missing configuration. Updated akka. Added PublicApi annotations. #37

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,40 +1,41 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.5.18</AkkaVersion>
<AkkaVersion>1.5.19</AkkaVersion>
<EventStoreVersion>23.2.1</EventStoreVersion>
<NBenchVersion>1.2.2</NBenchVersion>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>15.9.0</TestSdkVersion>
<XunitVersion>2.7.1</XunitVersion>
<TestSdkVersion>17.9.0</TestSdkVersion>
</PropertyGroup>
<!-- Akka.NET Package Versions -->
<ItemGroup>
<PackageVersion Include="Akka.Hosting" Version="1.5.6.1" />
<PackageVersion Include="Akka.Persistence.Hosting" Version="1.5.18" />
<PackageVersion Include="NuGet.Frameworks" Version="6.9.1" />
<PackageVersion Include="Akka" Version="1.5.18" />
<PackageVersion Include="Akka.Persistence.Hosting" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Persistence" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
</ItemGroup>
<!-- EventStore Package Versions -->
<ItemGroup>
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.ProjectionManagement" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="$(EventStoreVersion)"/>
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="$(EventStoreVersion)" />
<PackageVersion Include="EventStore.Client.Grpc.ProjectionManagement" Version="$(EventStoreVersion)" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="$(EventStoreVersion)" />
</ItemGroup>
<!-- Testing Utilities -->
<ItemGroup>
<PackageVersion Include="Akka.Persistence.TCK" Version="1.5.18" />
<PackageVersion Include="Akka.Hosting.TestKit" Version="1.5.18" />
<PackageVersion Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageVersion Include="Akka.Hosting.TestKit" Version="$(AkkaVersion)" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="xunit" Version="2.7.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.7" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageVersion Include="xunit" Version="$(XunitVersion)" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.8">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="Docker.DotNet" Version="3.125.15" />
</ItemGroup>
<ItemGroup>
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageVersion Include="JetBrains.Annotations" Version="2023.3.0" />
<PackageVersion Include="NuGet.Frameworks" Version="6.9.1" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void DefaultOptionsTest()
actualConfig.GetString("persistence-ids-stream-name").Should().Be(defaultConfig.GetString("persistence-ids-stream-name"));
actualConfig.GetString("persisted-events-stream-name").Should().Be(defaultConfig.GetString("persisted-events-stream-name"));
actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant"));
actualConfig.GetString("materializer-dispatcher").Should()
.Be(defaultConfig.GetString("materializer-dispatcher"));
}

[Fact(DisplayName = "Custom Options should modify default config")]
Expand All @@ -52,7 +54,8 @@ public void ModifiedOptionsTest()
TaggedStreamNamePattern = "custom-tagged-[[TAG]]",
PersistedEventsStreamName = "persisted-events-custom",
PersistenceIdsStreamName = "persistence-ids-custom",
Tenant = "tenant"
Tenant = "tenant",
MaterializerDispatcher = "custom-dispatcher"
};

var fullConfig = opt.ToConfig();
Expand All @@ -71,5 +74,6 @@ public void ModifiedOptionsTest()
config.PersistedEventsStreamName.Should().Be("persisted-events-custom");
config.PersistenceIdsStreamName.Should().Be("persistence-ids-custom");
config.Tenant.Should().Be("tenant");
config.MaterializerDispatcher.Should().Be("custom-dispatcher");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public void DefaultOptionsTest()
actualConfig.GetString("adapter").Should().Be(defaultConfig.GetString("adapter"));
actualConfig.GetString("prefix").Should().Be(defaultConfig.GetString("prefix"));
actualConfig.GetString("tenant").Should().Be(defaultConfig.GetString("tenant"));
actualConfig.GetString("materializer-dispatcher").Should()
.Be(defaultConfig.GetString("materializer-dispatcher"));
}

[Fact(DisplayName = "Custom Options should modify default config")]
Expand All @@ -43,7 +45,8 @@ public void ModifiedOptionsTest()
ConnectionString = "a",
Adapter = "custom",
Prefix = "custom@",
Tenant = "tenant"
Tenant = "tenant",
MaterializerDispatcher = "custom-dispatcher"
};

var fullConfig = opt.ToConfig();
Expand All @@ -57,5 +60,6 @@ public void ModifiedOptionsTest()
config.Adapter.Should().Be("custom");
config.StreamPrefix.Should().Be("custom@");
config.Tenant.Should().Be("tenant");
config.MaterializerDispatcher.Should().Be("custom-dispatcher");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ public EventStoreJournalOptions() : this(true)
private static readonly Config Default = EventStorePersistence.DefaultJournalConfiguration;
private static readonly Config DefaultQuery = EventStorePersistence.DefaultQueryConfiguration;

public string? ConnectionString { get; set; }
public string? Adapter { get; set; }
public string? StreamPrefix { get; set; }
public string? TaggedStreamNamePattern { get; set; }
public string? PersistenceIdsStreamName { get; set; }
public string? PersistedEventsStreamName { get; set; }
public TimeSpan? QueryRefreshInterval { get; set; }
public string? Tenant { get; set; }
public string? ConnectionString { get; init; }
public string? Adapter { get; init; }
public string? StreamPrefix { get; init; }
public string? TaggedStreamNamePattern { get; init; }
public string? PersistenceIdsStreamName { get; init; }
public string? PersistedEventsStreamName { get; init; }
public TimeSpan? QueryRefreshInterval { get; init; }
public TimeSpan? QueryProjectionCatchupTimeout { get; init; }
public string? Tenant { get; init; }
public string? MaterializerDispatcher { get; init; }
public override string Identifier { get; set; } = identifier;
public Config DefaultQueryConfig => DefaultQuery.MoveTo(QueryPluginId);
protected override Config InternalDefaultConfig => Default;
public string QueryPluginId => $"akka.persistence.query.journal.{Identifier}";

private string QueryPluginId => $"akka.persistence.query.journal.{Identifier}";

protected override StringBuilder Build(StringBuilder sb)
{
Expand All @@ -50,6 +53,9 @@ protected override StringBuilder Build(StringBuilder sb)

if (!string.IsNullOrEmpty(PersistedEventsStreamName))
sb.AppendLine($"persisted-events-stream-name = {PersistedEventsStreamName.ToHocon()}");

if (!string.IsNullOrEmpty(MaterializerDispatcher))
sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}");

if (!string.IsNullOrEmpty(Tenant))
sb.AppendLine($"tenant = {Tenant.ToHocon()}");
Expand All @@ -62,6 +68,9 @@ protected override StringBuilder Build(StringBuilder sb)

if (QueryRefreshInterval != null)
sb.AppendLine($"refresh-interval = {QueryRefreshInterval.ToHocon()}");

if (QueryProjectionCatchupTimeout != null)
sb.AppendLine($"projection-catchup-timeout = {QueryProjectionCatchupTimeout.ToHocon()}");

sb.AppendLine("}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public EventStoreSnapshotOptions() : this(true)
public string? Adapter { get; set; }
public string? Prefix { get; set; }
public string? Tenant { get; set; }
public string? MaterializerDispatcher { get; set; }
public override string Identifier { get; set; } = identifier;
protected override Config InternalDefaultConfig => Default;

Expand All @@ -34,6 +35,9 @@ protected override StringBuilder Build(StringBuilder sb)
if (!string.IsNullOrEmpty(Prefix))
sb.AppendLine($"prefix = {Prefix.ToHocon()}");

if (!string.IsNullOrEmpty(MaterializerDispatcher))
sb.AppendLine($"materializer-dispatcher = {MaterializerDispatcher.ToHocon()}");

if (!string.IsNullOrEmpty(Tenant))
sb.AppendLine($"tenant = {Tenant.ToHocon()}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ namespace Akka.Persistence.EventStore.Hosting;

public class EventStoreTenantOptions(string? tenantStreamNamePattern)
{
public string? TenantStreamNamePattern => tenantStreamNamePattern;

public StringBuilder Build(StringBuilder sb)
private StringBuilder Build(StringBuilder sb)
{
sb.AppendLine($"{EventStorePersistence.TenantConfigPath} {{");

if (!string.IsNullOrEmpty(TenantStreamNamePattern))
sb.AppendLine($"tenant-stream-name-pattern = {TenantStreamNamePattern.ToHocon()}");
if (!string.IsNullOrEmpty(tenantStreamNamePattern))
sb.AppendLine($"tenant-stream-name-pattern = {tenantStreamNamePattern.ToHocon()}");

sb.AppendLine("}");

Expand Down
15 changes: 12 additions & 3 deletions src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Akka.Hosting;
using Akka.Persistence.Hosting;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore.Hosting;

[PublicAPI]
public static class HostingExtensions
{
public static AkkaConfigurationBuilder WithEventStorePersistence(
Expand All @@ -20,7 +22,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
string? taggedJournalStreamPattern = null,
string? persistenceIdsStreamName = null,
string? persistedEventsStreamName = null,
string? tenantStreamNamePattern = null)
string? tenantStreamNamePattern = null,
string? materializerDispatcher = null,
TimeSpan? queryRefreshInterval = null,
TimeSpan? queryProjectionCatchupTimeout = null)
{
if (mode == PersistenceMode.SnapshotStore && journalBuilder is not null)
throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}");
Expand All @@ -37,7 +42,10 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
TaggedStreamNamePattern = taggedJournalStreamPattern,
PersistedEventsStreamName = persistedEventsStreamName,
PersistenceIdsStreamName = persistenceIdsStreamName,
Tenant = tenant
Tenant = tenant,
MaterializerDispatcher = materializerDispatcher,
QueryRefreshInterval = queryRefreshInterval,
QueryProjectionCatchupTimeout = queryProjectionCatchupTimeout
};

var adapters = new AkkaPersistenceJournalBuilder(journalOptions.Identifier, builder);
Expand All @@ -52,7 +60,8 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
AutoInitialize = autoInitialize,
Adapter = adapter,
Prefix = snapshotStreamPrefix,
Tenant = tenant
Tenant = tenant,
MaterializerDispatcher = materializerDispatcher
};

var tenantOptions = !string.IsNullOrEmpty(tenantStreamNamePattern)
Expand Down
34 changes: 29 additions & 5 deletions src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Docker.DotNet;
using System.Diagnostics;
using Docker.DotNet;
using Docker.DotNet.Models;
using Microsoft.Extensions.Configuration;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -93,8 +94,7 @@
{
"EVENTSTORE_RUN_PROJECTIONS=All",
"EVENTSTORE_MEM_DB=True",
"EVENTSTORE_INSECURE=True",
"EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=True"
"EVENTSTORE_INSECURE=True"
},
HostConfig = new HostConfig
{
Expand All @@ -120,8 +120,32 @@
new ContainerStartParameters());

ConnectionString = $"esdb://admin:changeit@localhost:{_httpPort}?tls=false&tlsVerifyCert=false";

await Task.Delay(5000);

await WaitForEventStoreToStart(TimeSpan.FromSeconds(5), _client);

async Task WaitForEventStoreToStart(TimeSpan timeout, IDockerClient dockerClient)
{
var logStream = await dockerClient.Containers.GetContainerLogsAsync(_eventStoreContainerName, new ContainerLogsParameters

Check warning on line 128 in src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs

View workflow job for this annotation

GitHub Actions / Test-ubuntu-latest

'IContainerOperations.GetContainerLogsAsync(string, ContainerLogsParameters, CancellationToken)' is obsolete: 'The stream returned by this method won't be demultiplexed properly if the container was created without a TTY. Use GetContainerLogsAsync(string, bool, ContainerLogsParameters, CancellationToken) instead'

Check warning on line 128 in src/Akka.Persistence.EventStore.Tests/DatabaseFixture.cs

View workflow job for this annotation

GitHub Actions / Test-ubuntu-latest

'IContainerOperations.GetContainerLogsAsync(string, ContainerLogsParameters, CancellationToken)' is obsolete: 'The stream returned by this method won't be demultiplexed properly if the container was created without a TTY. Use GetContainerLogsAsync(string, bool, ContainerLogsParameters, CancellationToken) instead'
{
Follow = true,
ShowStdout = true,
ShowStderr = true
});

using (var reader = new StreamReader(logStream))
{
var stopwatch = Stopwatch.StartNew();

while (stopwatch.Elapsed < timeout && await reader.ReadLineAsync() is { } line)
{
if (line.Contains("IS LEADER... SPARTA!")) break;
}

stopwatch.Stop();
}

await logStream.DisposeAsync();
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class = ""Akka.Persistence.EventStore.Snapshot.EventStoreSnapshotStore, Akka.Per
class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka.Persistence.EventStore""
write-plugin = ""akka.persistence.journal.eventstore""
refresh-interval = 1s
projection-catchup-timeout = 1s
}}
akka.test.single-expect-default = 10s");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions"/>
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement"/>
<PackageReference Include="EventStore.Client.Grpc.Streams"/>
<PackageReference Include="JetBrains.Annotations" />
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
18 changes: 0 additions & 18 deletions src/Akka.Persistence.EventStore/Akka.Persistence.EventStore.nuspec

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public EventStoreReadJournalSettings(Config config)

WritePlugin = config.GetString("write-plugin");
QueryRefreshInterval = config.GetTimeSpan("refresh-interval", TimeSpan.FromSeconds(5));
ProjectionCatchupTimeout = config.GetTimeSpan("projection-catchup-timeout", TimeSpan.FromMilliseconds(500));
}

public string WritePlugin { get; }
public TimeSpan QueryRefreshInterval { get; }
public TimeSpan ProjectionCatchupTimeout { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@ public interface ISettingsWithAdapter
string Adapter { get; }
string DefaultSerializer { get; }
public string Tenant { get; }
string StreamPrefix { get; }

string GetStreamName(string persistenceId, EventStoreTenantSettings tenantSettings);
}
2 changes: 2 additions & 0 deletions src/Akka.Persistence.EventStore/EventStorePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
using Akka.Configuration;
using Akka.Persistence.EventStore.Journal;
using Akka.Persistence.EventStore.Snapshot;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore;

[PublicAPI]
public class EventStorePersistence : IExtension
{
public const string JournalConfigPath = "akka.persistence.journal.eventstore";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Akka.Actor;
using JetBrains.Annotations;

namespace Akka.Persistence.EventStore;

/// <summary>
/// Extension Id provider for the EventStore Persistence extension.
/// </summary>
[PublicAPI]
public class EventStorePersistenceProvider : ExtensionIdProvider<EventStorePersistence>
{
/// <summary>
Expand Down
Loading