Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to resolve store when handling a command #216

Merged
merged 10 commits into from
Aug 22, 2023
4 changes: 3 additions & 1 deletion .github/workflows/code_quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ jobs:
with:
fetch-depth: 0
- name: 'Qodana Scan'
uses: JetBrains/[email protected]
uses: JetBrains/[email protected]
with:
pr-mode: false
env:
QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }}
8 changes: 4 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
<PackageVersion Include="RabbitMQ.Client" Version="6.5.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.1.1" />
<PackageVersion Include="NEST" Version="7.17.5" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
<PackageVersion Include="Polly" Version="7.2.4" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="StackExchange.Redis" Version="2.6.90" />
<PackageVersion Include="Testcontainers.EventStoreDb" Version="3.4.0" />
<PackageVersion Include="Testcontainers.Kafka" Version="3.4.0" />
Expand All @@ -55,7 +55,7 @@
</ItemGroup>
<ItemGroup Label="References for packable projects">
<PackageVersion Include="MinVer" Version="4.3.0" PrivateAssets="All" />
<PackageVersion Include="JetBrains.Annotations" Version="2022.3.1" PrivateAssets="All" />
<PackageVersion Include="JetBrains.Annotations" Version="2023.2.0" PrivateAssets="All" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup Label="References for test projects">
Expand All @@ -73,7 +73,7 @@
<PackageVersion Include="RestSharp" Version="110.2.0" />
<PackageVersion Include="Hypothesist" Version="2.1.55" />
<PackageVersion Include="NodaTime" Version="3.1.9" />
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.0.0" />
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.1.0" />
<PackageVersion Include="MicroElements.AutoFixture.NodaTime" Version="1.0.0" />
<PackageVersion Include="MongoDb.Bson.NodaTime" Version="3.0.0" />
<PackageVersion Include="Testcontainers" Version="3.4.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/Benchmarks/Benchmarks/GapDetectionBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GapDetectionBenchmarks {
public void Setup() {
_store = new NoOpCheckpointStore();

_store.CheckpointStored += (sender, checkpoint) => Console.WriteLine(checkpoint);
_store.CheckpointStored += (_, checkpoint) => Console.WriteLine(checkpoint);

var numbers = Enumerable.Range(1, 1000).ToList();
numbers.RemoveAll(x => x % 10 == 0);
Expand Down
1 change: 1 addition & 0 deletions src/Benchmarks/Benchmarks/TypeMapBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
KeyValuePair<string, Type>[] _types = null!;

[Params(5, 20, 100)]
// ReSharper disable once UnusedAutoPropertyAccessor.Global
public int TypesCount { get; set; }

[GlobalSetup]
Expand Down Expand Up @@ -44,7 +45,7 @@
[Benchmark]
public void GetTypes() {
for (var i = 0; i < GetCount; i++) {
var type = _v1.GetType(_keysToFind[i]);

Check warning on line 48 in src/Benchmarks/Benchmarks/TypeMapBenchmark.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Unused local variable

Local variable 'type' is never used
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public abstract class CommandHandlerBuilder<TAggregate, TState, TId>
where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
internal abstract RegisteredHandler<TAggregate, TId> Build();
}

/// <summary>
/// Builds a command handler for a specific command type. You would not need to instantiate this class directly,
/// use <see cref="CommandService{TAggregate,TState,TId}.On{TCommand}" /> function.
/// </summary>
/// <param name="store">Default aggregate store instance for the command service</param>

Check warning on line 15 in src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

XML comment has a 'param' tag for 'Parameter', but there is no parameter by that name

Cannot resolve parameter 'store'

Check warning on line 15 in src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

XML comment has a 'param' tag for 'Parameter', but there is no parameter by that name

Cannot resolve parameter 'store'
/// <typeparam name="TCommand">Command type</typeparam>
/// <typeparam name="TAggregate">Aggregate type</typeparam>
/// <typeparam name="TState">State of the aggregate type</typeparam>
/// <typeparam name="TId">Identity of the aggregate type</typeparam>
public class CommandHandlerBuilder<TCommand, TAggregate, TState, TId>(IAggregateStore? store) : CommandHandlerBuilder<TAggregate, TState, TId>
where TCommand : class
where TAggregate : Aggregate<TState>, new()
where TState : State<TState>, new()
where TId : Id {
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate>? _action;
ResolveStore<TCommand>? _resolveStore;
ExpectedState _expectedState = ExpectedState.Any;

/// <summary>
/// Set the expected aggregate state for the command handler.
/// If the aggregate won't be in the expected state, the command handler will return an error.
/// The default is <see cref="ExpectedState.Any" />.
/// </summary>
/// <param name="expectedState">Expected aggregate state</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> InState(ExpectedState expectedState) {
_expectedState = expectedState;

return this;
}

/// <summary>
/// Defines how the aggregate id is extracted from the command.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command.</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetId(GetIdFromCommand<TId, TCommand> getId) {
_getId = getId.AsGetId();

return this;
}

/// <summary>
/// Defines how the aggregate id is extracted from the command, asynchronously.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command.</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetIdAsync(GetIdFromCommandAsync<TId, TCommand> getId) {
_getId = getId.AsGetId();

return this;
}

/// <summary>
/// Defines how the aggregate is acted upon by the command.
/// </summary>
/// <param name="action">A function that executes an operation on an aggregate</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> Act(ActOnAggregate<TAggregate, TCommand> action) {
_action = action.AsAct();

return this;
}

/// <summary>
/// Defines how the aggregate is acted upon by the command, asynchronously.
/// </summary>
/// <param name="action">A function that executes an asynchronous operation on an aggregate</param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> ActAsync(ActOnAggregateAsync<TAggregate, TCommand> action) {
_action = action.AsAct();

return this;
}

/// <summary>
/// Defines how the aggregate store is resolved from the command. It is optional. If not defined, the default
/// aggregate store of the command service will be used.
/// </summary>
/// <param name="resolveStore"></param>
/// <returns></returns>
public CommandHandlerBuilder<TCommand, TAggregate, TState, TId> ResolveStore(ResolveStore<TCommand>? resolveStore) {

Check notice on line 93 in src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Method return value is never used (non-private accessibility)

Method 'ResolveStore' return value is never used

Check notice on line 93 in src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Method return value is never used (non-private accessibility)

Method 'ResolveStore' return value is never used
_resolveStore = resolveStore;

return this;
}

internal override RegisteredHandler<TAggregate, TId> Build() {
return new RegisteredHandler<TAggregate, TId>(
_expectedState,
Ensure.NotNull(_getId, $"Function to get the aggregate id from {typeof(TCommand).Name} is not defined"),
Ensure.NotNull(_action, $"Function to act on the aggregate for command {typeof(TCommand).Name} is not defined"),
(_resolveStore ?? DefaultResolve()).AsResolveStore()
);
}

ResolveStore<TCommand> DefaultResolve() {
ArgumentNullException.ThrowIfNull(store, nameof(store));

return _ => store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Reflection;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;

public delegate Task ActOnAggregateAsync<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command, CancellationToken cancellationToken)
where TAggregate : Aggregate;

public delegate void ActOnAggregate<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command) where TAggregate : Aggregate;

delegate ValueTask<T> HandleUntypedCommand<T>(T aggregate, object command, CancellationToken cancellationToken) where T : Aggregate;

public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken) where TId : Id where TCommand : class;

public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : Id where TCommand : class;

delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken) where TId : Id;

public delegate IAggregateStore ResolveStore<in TCommand>(TCommand command) where TCommand : class;

delegate IAggregateStore ResolveStoreFromCommand(object command);

record RegisteredHandler<T, TId>(
ExpectedState ExpectedState,
GetIdFromUntypedCommand<TId> GetId,
HandleUntypedCommand<T> Handler,
ResolveStoreFromCommand ResolveStore
) where T : Aggregate where TId : Id;

class HandlersMap<TAggregate, TId> where TAggregate : Aggregate where TId : Id {
readonly TypeMap<RegisteredHandler<TAggregate, TId>> _typeMap = new();

static readonly MethodInfo AddHandlerInternalMethod =
typeof(HandlersMap<TAggregate, TId>).GetMethod(nameof(AddHandlerInternal), BindingFlags.NonPublic | BindingFlags.Instance)!;

internal void AddHandlerInternal<TCommand>(RegisteredHandler<TAggregate, TId> handler) {
try {
_typeMap.Add<TCommand>(handler);
Log.CommandHandlerRegistered<TCommand>();
} catch (Exceptions.DuplicateTypeException<TCommand>) {
Log.CommandHandlerAlreadyRegistered<TCommand>();

throw new Exceptions.CommandHandlerAlreadyRegistered<TCommand>();
}
}

internal void AddHandlerUntyped(Type command, RegisteredHandler<TAggregate, TId> handler)
=> AddHandlerInternalMethod.MakeGenericMethod(command).Invoke(this, new object?[] { handler });

public bool TryGet<TCommand>([NotNullWhen(true)] out RegisteredHandler<TAggregate, TId>? handler) => _typeMap.TryGetValue<TCommand>(out handler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

static class CommandHandlingDelegateExtensions {
public static GetIdFromUntypedCommand<TId> AsGetId<TId, TCommand>(this GetIdFromCommandAsync<TId, TCommand> getId) where TId : Id where TCommand : class
=> async (cmd, ct) => await getId((TCommand)cmd, ct);

public static GetIdFromUntypedCommand<TId> AsGetId<TId, TCommand>(this GetIdFromCommand<TId, TCommand> getId) where TId : Id where TCommand : class
=> (cmd, _) => ValueTask.FromResult(getId((TCommand)cmd));

public static HandleUntypedCommand<TAggregate> AsAct<TAggregate, TCommand>(this ActOnAggregateAsync<TAggregate, TCommand> act) where TAggregate : Aggregate
=> async (aggregate, cmd, ct) => {
await act(aggregate, (TCommand)cmd, ct).NoContext();

return aggregate;
};

public static HandleUntypedCommand<TAggregate> AsAct<TAggregate, TCommand>(this ActOnAggregate<TAggregate, TCommand> act) where TAggregate : Aggregate
=> (aggregate, cmd, _) => {
act(aggregate, (TCommand)cmd);

return ValueTask.FromResult(aggregate);
};

public static ResolveStoreFromCommand AsResolveStore<TCommand>(this ResolveStore<TCommand> resolveStore) where TCommand : class
=> cmd => resolveStore((TCommand)cmd);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public abstract partial class CommandService<TAggregate, TState, TId> {
/// <summary>
/// Register an asynchronous handler for a command, which is expected to create a new aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.New).GetId(...).ActAsync(...).ResolveStore(...) instead")]
protected void OnNewAsync<TCommand>(

Check notice on line 15 in src/Core/src/Eventuous.Application/AggregateService/CommandService.Async.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Type member is never used (non-private accessibility)

Method 'OnNewAsync' is never used

Check notice on line 15 in src/Core/src/Eventuous.Application/AggregateService/CommandService.Async.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Type member is never used (non-private accessibility)

Method 'OnNewAsync' is never used
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
=> On<TCommand>().InState(ExpectedState.New).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Existing).GetId(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnExistingAsync<TCommand>(
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
=> On<TCommand>().InState(ExpectedState.Existing).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an existing aggregate instance.
/// </summary>
/// <param name="getId">Asynchronous function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Existing).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnExistingAsync<TCommand>(
GetIdFromCommandAsync<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Existing, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Existing).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance.
/// </summary>
/// <param name="getId">A function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Any).GetId(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnAnyAsync<TCommand>(
GetIdFromCommand<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Any).GetId(getId).ActAsync(action).ResolveStore(resolveStore);

/// <summary>
/// Register an asynchronous handler for a command, which is expected to use an a new or an existing aggregate instance.
/// </summary>
/// <param name="getId">Asynchronous function to get the aggregate id from the command</param>
/// <param name="action">Asynchronous action to be performed on the aggregate, given the aggregate instance and the command</param>
/// <param name="resolveStore">Resolve aggregate store from the command</param>
/// <typeparam name="TCommand">Command type</typeparam>
[Obsolete("Use On<TCommand>().InState(ExpectedState.Any).GetIdAsync(...).ActAsync(...).ResolveStore(...) instead")]
[PublicAPI]
protected void OnAnyAsync<TCommand>(
GetIdFromCommandAsync<TId, TCommand> getId,
ActOnAggregateAsync<TAggregate, TCommand> action,
ResolveStore<TCommand>? resolveStore = null
) where TCommand : class
// => _handlers.AddHandler(ExpectedState.Any, getId, action, resolveStore ?? DefaultResolve<TCommand>());
=> On<TCommand>().InState(ExpectedState.Any).GetIdAsync(getId).ActAsync(action).ResolveStore(resolveStore);
}
Loading
Loading