Skip to content

Commit

Permalink
Persistent and Scheduled / Deferred / Recurring Jobs (#143)
Browse files Browse the repository at this point in the history
* added some extension methods

* Finished JobScheduler and JobRunner

* message without the ack objects

* all good but need job manager for now

* added latest lang features
  • Loading branch information
Lutando authored Aug 7, 2019
1 parent fd0a776 commit c603f62
Show file tree
Hide file tree
Showing 53 changed files with 2,238 additions and 46 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- IExecute interface as an alternative way to add command handlers to the aggregate.
- JobScheduler and JobRunner actor types for scheduling jobs.
- receive timeouts for aggregate roots.

### Changed
- AggregateRoot and AggregateSaga logging members are using Eventsourced.Log member.

## [0.4.6] - 2019-07-27

Expand Down
29 changes: 27 additions & 2 deletions THIRD-PARTY-NOTICES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ bring it to our attention by posting an issue.
The attached notices are provided for information only.


License notice for Akka
1 - License notice for Akka.NET
--------------------------------
Copyright (c) .NET Foundation and Contributors
All Rights Reserved
Expand All @@ -22,4 +22,29 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
--------------------------------

2 - License notice for Cronos
--------------------------------
The MIT License (MIT)

Copyright (c) 2017 Sergey Odinokov

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<ProjectGuid>{F70444F1-9EBE-4101-9152-BF9A98A97D4E}</ProjectGuid>
<SonarQubeExclude>true</SonarQubeExclude>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<PropertyGroup>
Expand Down
1 change: 1 addition & 0 deletions src/Akkatecture.TestFixture/Akkatecture.TestFixture.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<ProjectGuid>{0CB7A8CC-E3CD-481C-81C9-BA830DC32361}</ProjectGuid>
<RootNamespace>Akkatecture.TestFixture</RootNamespace>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<PropertyGroup>
Expand Down
10 changes: 6 additions & 4 deletions src/Akkatecture/Aggregates/AggregateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public abstract class AggregateManager<TAggregate, TIdentity, TCommand> : Receiv
protected ILoggingAdapter Logger { get; set; }
protected Func<DeadLetter, bool> DeadLetterHandler => Handle;
public AggregateManagerSettings Settings { get; }
public string Name { get; }

protected AggregateManager()
{
Logger = Context.GetLogger();
Settings = new AggregateManagerSettings(Context.System.Settings.Config);

Name = GetType().PrettyPrint();
Receive<Terminated>(Terminate);

if(Settings.AutoDispatchOnReceive)
Expand All @@ -60,7 +61,7 @@ protected AggregateManager()

protected virtual bool Dispatch(TCommand command)
{
Logger.Info("{0} received {1}", GetType().PrettyPrint(),command.GetType().PrettyPrint());
Logger.Info("AggregateManager of Type={0}; has received a command of Type={1}", Name, command.GetType().PrettyPrint());

var aggregateRef = FindOrCreate(command.AggregateId);

Expand All @@ -72,7 +73,7 @@ protected virtual bool Dispatch(TCommand command)

protected virtual bool ReDispatch(TCommand command)
{
Logger.Info("{0} as dead letter {1}",GetType().PrettyPrint(), command.GetType().PrettyPrint());
Logger.Info("AggregateManager of Type={0}; is ReDispatching deadletter of Type={1}", Name, command.GetType().PrettyPrint());

var aggregateRef = FindOrCreate(command.AggregateId);

Expand Down Expand Up @@ -123,13 +124,14 @@ protected virtual IActorRef CreateAggregate(TIdentity aggregateId)

protected override SupervisorStrategy SupervisorStrategy()
{
var logger = Logger;
return new OneForOneStrategy(
maxNrOfRetries: 3,
withinTimeMilliseconds: 3000,
localOnlyDecider: x =>
{

Logger.Warning("{0} will supervise Exception={1} to be decided as {2}.",GetType().PrettyPrint(), x.ToString(), Directive.Restart);
logger.Warning("AggregateManager of Type={0}; will supervise Exception={1} to be decided as {2}.",Name, x.ToString(), Directive.Restart);
return Directive.Restart;
});
}
Expand Down
2 changes: 2 additions & 0 deletions src/Akkatecture/Aggregates/AggregateManagerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using Akka.Configuration;
using Akkatecture.Configuration;

Expand All @@ -38,6 +39,7 @@ public AggregateManagerSettings(Config config)

HandleDeadLetters = aggregateManagerConfig.GetBoolean("handle-deadletters");
AutoDispatchOnReceive = aggregateManagerConfig.GetBoolean("auto-dispatch-on-receive");

}
}
}
56 changes: 36 additions & 20 deletions src/Akkatecture/Aggregates/AggregateRoot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public abstract class AggregateRoot<TAggregate, TIdentity, TAggregateState> : Re
private CircularBuffer<ISourceId> _previousSourceIds = new CircularBuffer<ISourceId>(100);
private ICommand<TAggregate, TIdentity> PinnedCommand { get; set; }
private object PinnedReply { get; set; }

protected ILoggingAdapter Logger { get; }
private readonly IEventDefinitionService _eventDefinitionService;
private readonly ISnapshotDefinitionService _snapshotDefinitionService;
private ISnapshotStrategy SnapshotStrategy { get; set; } = SnapshotNeverStrategy.Instance;
Expand All @@ -69,10 +67,12 @@ public abstract class AggregateRoot<TAggregate, TIdentity, TAggregateState> : Re

protected AggregateRoot(TIdentity id)
{
Logger = Context.GetLogger();

Settings = new AggregateRootSettings(Context.System.Settings.Config);

if (id == null)
throw new ArgumentNullException(nameof(id));

if ((this as TAggregate) == null)
{
throw new InvalidOperationException(
Expand All @@ -87,23 +87,23 @@ protected AggregateRoot(TIdentity id)
}
catch(Exception exception)
{
Logger.Error(exception,"Unable to activate AggregateState of Type={0} for AggregateRoot of Name={1}",typeof(TAggregateState).PrettyPrint(), Name);
Context.GetLogger().Error(exception,"Unable to activate AggregateState of Type={0} for AggregateRoot of Name={1}.",typeof(TAggregateState).PrettyPrint(), Name);
}

}

PinnedCommand = null;
_eventDefinitionService = new EventDefinitionService(Logger);
_snapshotDefinitionService = new SnapshotDefinitionService(Logger);
_eventDefinitionService = new EventDefinitionService(Context.GetLogger());
_snapshotDefinitionService = new SnapshotDefinitionService(Context.GetLogger());
Id = id;
PersistenceId = id.Value;
SetSourceIdHistory(100);

if (Settings.UseDefaultSnapshotRecover)
{
Recover<SnapshotOffer>(Recover);
}


Command<SaveSnapshotSuccess>(SnapshotStatus);
Command<SaveSnapshotFailure>(SnapshotStatus);

Expand All @@ -114,6 +114,8 @@ protected AggregateRoot(TIdentity id)
}

InitReceives();
SetReceiveTimeout(Settings.SetReceiveTimeout);
Command<ReceiveTimeout>(Timeout);

}

Expand Down Expand Up @@ -241,7 +243,7 @@ public virtual CommittedEvent<TAggregate, TIdentity, TAggregateEvent> From<TAggr
}
protected virtual IAggregateSnapshot<TAggregate, TIdentity> CreateSnapshot()
{
Logger.Warning("Aggregate of Name={0}, and Id={1}; attempted to create a snapshot, override the {2}() method to get snapshotting to function.", Name, Id, nameof(CreateSnapshot));
Log.Warning("Aggregate of Name={0}, and Id={1}; attempted to create a snapshot, override the {2}() method to get snapshotting to function.", Name, Id, nameof(CreateSnapshot));
return null;
}

Expand All @@ -251,7 +253,7 @@ protected void ApplyCommittedEvent<TAggregateEvent>(ICommittedEvent<TAggregate,
var applyMethods = GetEventApplyMethods(committedEvent.AggregateEvent);
applyMethods(committedEvent.AggregateEvent);

Logger.Info("Aggregate of Name={0}, and Id={1}; committed and applied an AggregateEvent of Type={2}", Name, Id, typeof(TAggregateEvent).PrettyPrint());
Log.Info("Aggregate of Name={0}, and Id={1}; committed and applied an AggregateEvent of Type={2}.", Name, Id, typeof(TAggregateEvent).PrettyPrint());

Version++;

Expand Down Expand Up @@ -303,14 +305,14 @@ private void ApplyObjectCommittedEvent(object committedEvent)
}
catch (Exception exception)
{
Logger.Error(exception, "Aggregate of Name={0}, and Id={1}; tried to invoke Method={2} with object Type={3} .",Name, Id, nameof(ApplyCommittedEvent), committedEvent.GetType().PrettyPrint());
Log.Error(exception, "Aggregate of Name={0}, and Id={1}; tried to invoke Method={2} with object Type={3}.",Name, Id, nameof(ApplyCommittedEvent), committedEvent.GetType().PrettyPrint());
}
}

protected virtual void Publish<TEvent>(TEvent aggregateEvent)
{
Context.System.EventStream.Publish(aggregateEvent);
Logger.Info("Aggregate of Name={0}, and Id={1}; published DomainEvent of Type={2}.",Name, Id, typeof(TEvent).PrettyPrint());
Log.Info("Aggregate of Name={0}, and Id={1}; published DomainEvent of Type={2}.",Name, Id, typeof(TEvent).PrettyPrint());
}

protected override bool AroundReceive(Receive receive, object message)
Expand Down Expand Up @@ -351,7 +353,7 @@ protected virtual void ReplyIfAvailable()

protected override void Unhandled(object message)
{
Logger.Warning("Aggregate of Name={0}, and Id={1}; has received an unhandled message of Type={2}.",Name, Id, message.GetType().PrettyPrint());
Log.Warning("Aggregate of Name={0}, and Id={1}; has received an unhandled message of Type={2}.",Name, Id, message.GetType().PrettyPrint());
base.Unhandled(message);
}

Expand Down Expand Up @@ -412,12 +414,12 @@ protected virtual bool Recover(ICommittedEvent<TAggregate, TIdentity, IAggregate
{
try
{
Logger.Debug("Aggregate of Name={0}, Id={1}, and Version={2}, is recovering with CommittedEvent of Type={3}.", Name, Id, Version, committedEvent.GetType().PrettyPrint());
Log.Debug("Aggregate of Name={0}, Id={1}, and Version={2}, is recovering with CommittedEvent of Type={3}.", Name, Id, Version, committedEvent.GetType().PrettyPrint());
ApplyEvent(committedEvent.AggregateEvent);
}
catch(Exception exception)
{
Logger.Error(exception,"Aggregate of Name={0}, Id={1}; while recovering with event of Type={2} caused an exception.", Name, Id, committedEvent.GetType().PrettyPrint());
Log.Error(exception,"Aggregate of Name={0}, Id={1}; while recovering with event of Type={2} caused an exception.", Name, Id, committedEvent.GetType().PrettyPrint());
return false;
}

Expand All @@ -428,13 +430,13 @@ protected virtual bool Recover(SnapshotOffer aggregateSnapshotOffer)
{
try
{
Logger.Debug("Aggregate of Name={0}, and Id={1}; has received a SnapshotOffer of Type={2}.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint());
Log.Debug("Aggregate of Name={0}, and Id={1}; has received a SnapshotOffer of Type={2}.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint());
var comittedSnapshot = aggregateSnapshotOffer.Snapshot as CommittedSnapshot<TAggregate,TIdentity, IAggregateSnapshot<TAggregate, TIdentity>>;
HydrateSnapshot(comittedSnapshot.AggregateSnapshot, aggregateSnapshotOffer.Metadata.SequenceNr);
}
catch (Exception exception)
{
Logger.Error(exception,"Aggregate of Name={0}, Id={1}; recovering with snapshot of Type={2} caused an exception.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint());
Log.Error(exception,"Aggregate of Name={0}, Id={1}; recovering with snapshot of Type={2} caused an exception.", Name, Id, aggregateSnapshotOffer.Snapshot.GetType().PrettyPrint());

return false;
}
Expand All @@ -451,21 +453,21 @@ protected virtual void SetSnapshotStrategy(ISnapshotStrategy snapshotStrategy)
}
protected virtual bool SnapshotStatus(SaveSnapshotSuccess snapshotSuccess)
{
Logger.Debug("Aggregate of Name={0}, and Id={1}; saved a snapshot at Version={2}.", Name, Id, snapshotSuccess.Metadata.SequenceNr);
Log.Debug("Aggregate of Name={0}, and Id={1}; saved a snapshot at Version={2}.", Name, Id, snapshotSuccess.Metadata.SequenceNr);
DeleteSnapshots(new SnapshotSelectionCriteria(snapshotSuccess.Metadata.SequenceNr-1));
return true;
}

protected virtual bool SnapshotStatus(SaveSnapshotFailure snapshotFailure)
{
Logger.Error(snapshotFailure.Cause,"Aggregate of Name={0}, and Id={1}; failed to save snapshot at Version={2}.", Name, Id, snapshotFailure.Metadata.SequenceNr);
Log.Error(snapshotFailure.Cause,"Aggregate of Name={0}, and Id={1}; failed to save snapshot at Version={2}.", Name, Id, snapshotFailure.Metadata.SequenceNr);
return true;
}


protected virtual bool Recover(RecoveryCompleted recoveryCompleted)
{
Logger.Debug("Aggregate of Name={0}, and Id={1}; has completed recovering from it's event journal at Version={2}.", Name, Id, Version);
Log.Debug("Aggregate of Name={0}, and Id={1}; has completed recovering from it's event journal at Version={2}.", Name, Id, Version);
return true;
}

Expand All @@ -474,6 +476,20 @@ public override string ToString()
return $"{GetType().PrettyPrint()} v{Version}";
}

public bool Timeout(ReceiveTimeout message)
{
Log.Debug("Aggregate of Name={0}, and Id={1}; has received a timeout message and will stop.", Name, Id);
Context.Stop(Self);
return true;
}

public override void AroundPreRestart(Exception cause, object message)
{
Log.Error(cause, "Aggregate of Name={0}, and Id={1}; has experienced an error and will now restart", Name, Id);
base.AroundPreRestart(cause, message);
}


protected void Command<TCommand, TCommandHandler>(Predicate<TCommand> shouldHandle = null)
where TCommand : ICommand<TAggregate, TIdentity>
where TCommandHandler : CommandHandler<TAggregate, TIdentity, TCommand>
Expand All @@ -485,7 +501,7 @@ protected void Command<TCommand, TCommandHandler>(Predicate<TCommand> shouldHand
}
catch (Exception exception)
{
Logger.Error(exception,"Unable to activate CommandHandler of Type={0} for Aggregate of Type={1}.",typeof(TCommandHandler).PrettyPrint(), typeof(TAggregate).PrettyPrint());
Log.Error(exception,"Unable to activate CommandHandler of Type={0} for Aggregate of Type={1}.",typeof(TCommandHandler).PrettyPrint(), typeof(TAggregate).PrettyPrint());
}

}
Expand Down
8 changes: 6 additions & 2 deletions src/Akkatecture/Aggregates/AggregateRootSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,27 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using Akka.Configuration;
using Akkatecture.Configuration;

namespace Akkatecture.Aggregates
{
public class AggregateRootSettings
{
private static readonly string _section = "akkatecture.aggregate-root";
public readonly bool UseDefaultEventRecover;
public readonly bool UseDefaultSnapshotRecover;
public readonly TimeSpan SetReceiveTimeout;

public AggregateRootSettings(Config config)
{
var aggregateRootConfig = config.WithFallback(AkkatectureDefaultSettings.DefaultConfig());
aggregateRootConfig = aggregateRootConfig.GetConfig("akkatecture.aggregate-root");
var aggregateRootConfig = config.GetConfig(_section);
aggregateRootConfig = aggregateRootConfig ?? AkkatectureDefaultSettings.DefaultConfig().GetConfig(_section);

UseDefaultEventRecover = aggregateRootConfig.GetBoolean("use-default-event-recover");
UseDefaultSnapshotRecover = aggregateRootConfig.GetBoolean("use-default-snapshot-recover");
SetReceiveTimeout = aggregateRootConfig.GetTimeSpan("set-receive-timeout");
}
}
}
2 changes: 2 additions & 0 deletions src/Akkatecture/Akkatecture.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<ProjectGuid>{294B5F75-4BF1-4CD1-AA98-1369A4E4ACDE}</ProjectGuid>
<RootNamespace>Akkatecture</RootNamespace>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -26,6 +27,7 @@
<ItemGroup>
<PackageReference Include="Akka" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.Persistence" Version="$(AkkaPersistenceVersion)" />
<PackageReference Include="Cronos" Version="0.7.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="$(SourceLinkGithubVersion)" PrivateAssets="All" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
Expand Down
Loading

0 comments on commit c603f62

Please sign in to comment.