diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
index 60a5b758de6..f14582dbff5 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs
@@ -84,7 +84,9 @@ public ReplayFilterSettings(Config config)
case "repair-by-discard-old": mode = ReplayFilterMode.RepairByDiscardOld; break;
case "fail": mode = ReplayFilterMode.Fail; break;
case "warn": mode = ReplayFilterMode.Warn; break;
- default: throw new ConfigurationException($"Invalid replay-filter.mode [{replayModeString}], supported values [off, repair-by-discard-old, fail, warn]");
+ default:
+ throw new ConfigurationException(
+ $"Invalid replay-filter.mode [{replayModeString}], supported values [off, repair-by-discard-old, fail, warn]");
}
Mode = mode;
@@ -236,14 +238,16 @@ public abstract class BatchingSqlJournalSetup
///
/// The default serializer used when not type override matching is found
///
- [Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
+ [Obsolete(
+ message:
+ "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }
///
/// The fully qualified name of the type that should be used as timestamp provider.
///
public string TimestampProviderTypeName { get; }
-
+
///
/// Isolation level of transactions used during read query execution.
///
@@ -278,19 +282,21 @@ protected BatchingSqlJournalSetup(Config config, QueryConfiguration namingConven
throw ConfigurationException.NullOrEmptyConfig();
var connectionString = config.GetString("connection-string", null);
+#if NETSTANDARD
if (string.IsNullOrWhiteSpace(connectionString))
{
connectionString = System.Configuration.ConfigurationManager
.ConnectionStrings[config.GetString("connection-string-name", "DefaultConnection")]?
.ConnectionString;
}
+#endif
if (string.IsNullOrWhiteSpace(connectionString))
throw new ConfigurationException("No connection string for Sql Event Journal was specified");
ReadIsolationLevel = namingConventions.ReadIsolationLevel;
WriteIsolationLevel = namingConventions.WriteIsolationLevel;
-
+
// backward compatibility
var level = config.GetString("isolation-level");
if (level is { })
@@ -357,8 +363,9 @@ protected BatchingSqlJournalSetup(
replayFilterSettings: replayFilterSettings,
namingConventions: namingConventions,
defaultSerializer: defaultSerializer)
- { }
-
+ {
+ }
+
///
/// Initializes a new instance of the class.
///
@@ -630,8 +637,9 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
{
Setup = setup;
- CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands;
- TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context);
+ CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands;
+ TimestampProvider =
+ TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context);
_remainingOperations = Setup.MaxConcurrentOperations;
_buffers = new[]
@@ -648,7 +656,7 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
maxFailures: Setup.CircuitBreakerSettings.MaxFailures,
callTimeout: Setup.CircuitBreakerSettings.CallTimeout,
resetTimeout: Setup.CircuitBreakerSettings.ResetTimeout);
-
+
_writeIsolationLevel = Setup.WriteIsolationLevel;
_readIsolationLevel = Setup.ReadIsolationLevel;
@@ -808,11 +816,12 @@ private void FailChunkExecution(ChunkExecutionFailure message)
var cause = message.Cause is AggregateException aggregateException
? aggregateException.Flatten().InnerExceptions.OfType().FirstOrDefault()
- ?? aggregateException.Flatten().InnerExceptions[0]
+ ?? aggregateException.Flatten().InnerExceptions[0]
: message.Cause;
- Log.Error(cause, "An error occurred during event batch processing. ChunkId: [{0}], batched requests: [{1}]", message.ChunkId, message.Requests.Length);
-
+ Log.Error(cause, "An error occurred during event batch processing. ChunkId: [{0}], batched requests: [{1}]",
+ message.ChunkId, message.Requests.Length);
+
foreach (var request in message.Requests)
{
switch (request)
@@ -826,7 +835,7 @@ private void FailChunkExecution(ChunkExecutionFailure message)
aRef.Tell(new WriteMessagesFailed(cause, atomicWriteCount));
foreach (var envelope in req.Messages)
{
- if (!(envelope is AtomicWrite write))
+ if (!(envelope is AtomicWrite write))
continue;
var writes = (IImmutableList)write.Payload;
@@ -835,19 +844,23 @@ private void FailChunkExecution(ChunkExecutionFailure message)
if (cause is DbException)
{
// database-related exceptions should result in failure
- aRef.Tell(new WriteMessageFailure(unadapted, cause, actorInstanceId), unadapted.Sender);
+ aRef.Tell(new WriteMessageFailure(unadapted, cause, actorInstanceId),
+ unadapted.Sender);
}
else
{
- aRef.Tell(new WriteMessageRejected(unadapted, cause, actorInstanceId), unadapted.Sender);
+ aRef.Tell(new WriteMessageRejected(unadapted, cause, actorInstanceId),
+ unadapted.Sender);
}
}
}
+
break;
}
case DeleteMessagesTo delete:
- delete.PersistentActor.Tell(new DeleteMessagesFailure(cause, delete.ToSequenceNr), ActorRefs.NoSender);
+ delete.PersistentActor.Tell(new DeleteMessagesFailure(cause, delete.ToSequenceNr),
+ ActorRefs.NoSender);
break;
case ReplayMessages replay:
@@ -869,7 +882,8 @@ private void FailChunkExecution(ChunkExecutionFailure message)
break;
default:
- Log.Error(cause, $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}].");
+ Log.Error(cause,
+ $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}].");
break;
}
}
@@ -916,19 +930,26 @@ protected virtual void OnBufferOverflow(IJournalMessage request)
{
case WriteMessages msg:
var atomicWriteCount = msg.Messages.OfType().Count();
- msg.PersistentActor.Tell(new WriteMessagesFailed(JournalBufferOverflowException.Instance, atomicWriteCount), ActorRefs.NoSender);
+ msg.PersistentActor.Tell(
+ new WriteMessagesFailed(JournalBufferOverflowException.Instance, atomicWriteCount),
+ ActorRefs.NoSender);
break;
case ReplayMessages msg:
- msg.PersistentActor.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance), ActorRefs.NoSender);
+ msg.PersistentActor.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance),
+ ActorRefs.NoSender);
break;
case DeleteMessagesTo msg:
- msg.PersistentActor.Tell(new DeleteMessagesFailure(JournalBufferOverflowException.Instance, msg.ToSequenceNr), ActorRefs.NoSender);
+ msg.PersistentActor.Tell(
+ new DeleteMessagesFailure(JournalBufferOverflowException.Instance, msg.ToSequenceNr),
+ ActorRefs.NoSender);
break;
case ReplayTaggedMessages msg:
- msg.ReplyTo.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance), ActorRefs.NoSender);
+ msg.ReplyTo.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance),
+ ActorRefs.NoSender);
break;
case ReplayAllEvents msg:
- msg.ReplyTo.Tell(new EventReplayFailure(JournalBufferOverflowException.Instance), ActorRefs.NoSender);
+ msg.ReplyTo.Tell(new EventReplayFailure(JournalBufferOverflowException.Instance),
+ ActorRefs.NoSender);
break;
}
}
@@ -956,7 +977,9 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext
// In the grand scheme of thing, using a transaction in an all read batch operation
// should not hurt performance by much, because it is done only once at the start.
- using (var tx = connection.BeginTransaction(isWriteOperation ? _writeIsolationLevel : _readIsolationLevel))
+ using (var tx = connection.BeginTransaction(isWriteOperation
+ ? _writeIsolationLevel
+ : _readIsolationLevel))
using (var command = (TCommand)connection.CreateCommand())
{
command.CommandTimeout = (int)Setup.ConnectionTimeout.TotalSeconds;
@@ -971,7 +994,7 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext
switch (req)
{
case WriteMessages msg:
- writeResults.Enqueue(await HandleWriteMessages(msg, command));
+ writeResults.Enqueue(await HandleWriteMessages(msg, command));
break;
case DeleteMessagesTo msg:
await HandleDeleteMessagesTo(msg, command);
@@ -993,6 +1016,7 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext
break;
}
}
+
tx.Commit();
}
catch (Exception e1)
@@ -1005,6 +1029,7 @@ private async Task ExecuteChunk(RequestChunk chunk, IActorContext
{
throw new AggregateException(e2, e1);
}
+
throw;
}
finally
@@ -1091,7 +1116,8 @@ protected virtual async Task ReadHighestSequenceNr(TCommand command)
return highestSequenceNr;
}
- protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message, TCommand command)
+ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message,
+ TCommand command)
{
long highestOrderingNumber = await ReadHighestSequenceNr(command);
@@ -1279,10 +1305,12 @@ private async Task HandleWriteMessages(WriteMessages req, T
tagBuilder.Append(tag).Append(';');
}
}
+
persistent = persistent.WithPayload(tagged.Payload);
}
- WriteEvent(command, persistent.WithTimestamp(TimestampProvider.GenerateTimestamp(persistent)), tagBuilder.ToString());
+ WriteEvent(command, persistent.WithTimestamp(TimestampProvider.GenerateTimestamp(persistent)),
+ tagBuilder.ToString());
await command.ExecuteNonQueryAsync();
@@ -1364,7 +1392,8 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
deserialized = _serialization.Deserialize((byte[])payload, serializerId, manifest);
}
- return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null, timestamp);
+ return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender,
+ null, timestamp);
}
///
@@ -1390,7 +1419,7 @@ protected void AddParameter(TCommand command, string paramName, DbType dbType, o
param.DbType = dbType;
PreAddParameterToCommand(command, param);
-
+
command.Parameters.Add(param);
}
@@ -1399,8 +1428,10 @@ protected void AddParameter(TCommand command, string paramName, DbType dbType, o
///
/// used to define a parameter in.
/// Parameter to customize
- protected virtual void PreAddParameterToCommand(TCommand command, DbParameter param) { }
-
+ protected virtual void PreAddParameterToCommand(TCommand command, DbParameter param)
+ {
+ }
+
///
/// Select the buffer that has the smallest id on its first item, retrieve a maximum Setup.MaxBatchSize
/// items from it, and return it as a chunk that needs to be batched
@@ -1418,28 +1449,31 @@ protected virtual void PreAddParameterToCommand(TCommand command, DbParameter pa
// We don't batch delete and writes in the same batch, reason being a database
// can be deadlocked if write and delete happens in the same transaction
var writeType = currentBuffer.Peek().request.GetType();
- while(currentBuffer.Count > 0 && currentBuffer.Peek().request.GetType() == writeType)
+ while (currentBuffer.Count > 0 && currentBuffer.Peek().request.GetType() == writeType)
{
operations.Add(currentBuffer.Dequeue().request);
if (operations.Count == Setup.MaxBatchSize)
break;
}
+
return (new RequestChunk(chunkId, operations.ToArray()), true);
}
-
- while(currentBuffer.Count > 0)
+
+ while (currentBuffer.Count > 0)
{
operations.Add(currentBuffer.Dequeue().request);
if (operations.Count == Setup.MaxBatchSize)
break;
}
+
return (new RequestChunk(chunkId, operations.ToArray()), false);
}
private void CompleteBatch(BatchComplete msg)
{
_remainingOperations++;
- Log.Debug("Completed batch (chunkId: {0}) of {1} operations in {2} milliseconds", msg.ChunkId, msg.OperationCount, msg.TimeSpent.TotalMilliseconds);
+ Log.Debug("Completed batch (chunkId: {0}) of {1} operations in {2} milliseconds", msg.ChunkId,
+ msg.OperationCount, msg.TimeSpent.TotalMilliseconds);
TryProcess();
}
@@ -1453,7 +1487,7 @@ private class WriteMessagesResult
public WriteMessagesResult(
WriteMessages request,
- IEnumerable tags,
+ IEnumerable tags,
IEnumerable persistenceIds)
{
_request = request;
@@ -1516,4 +1550,4 @@ protected JournalBufferOverflowException(SerializationInfo info, StreamingContex
{
}
}
-}
+}
\ No newline at end of file
diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
index 7394eb56be5..2115c3ed539 100644
--- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
+++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/SqlJournal.cs
@@ -76,11 +76,13 @@ protected override bool ReceivePluginInternal(object message)
{
case SelectCurrentPersistenceIds msg:
SelectAllPersistenceIdsAsync(msg.Offset)
- .PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering), failure: e => new Status.Failure(e));
+ .PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering),
+ failure: e => new Status.Failure(e));
return true;
case ReplayTaggedMessages replay:
ReplayTaggedMessagesAsync(replay)
- .PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e));
+ .PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h),
+ failure: e => new ReplayMessagesFailure(e));
return true;
case ReplayAllEvents replay:
ReplayAllEventsAsync(replay)
@@ -122,11 +124,13 @@ protected override async Task> WriteMessagesAsync(IEnu
else eventToTags.Add(p, ImmutableHashSet.Empty);
if (IsTagId(p.PersistenceId))
- throw new InvalidOperationException($"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
+ throw new InvalidOperationException(
+ $"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
}
var batch = new WriteJournalBatch(eventToTags);
- using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ using (var cancellationToken =
+ CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
await QueryExecutor.InsertBatchAsync(connection, cancellationToken.Token, batch);
}
}).ToArray();
@@ -149,15 +153,20 @@ protected virtual async Task ReplayTaggedMessagesAsync(ReplayTaggedMessage
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
- using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ using (var cancellationToken =
+ CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
- .SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset, replay.ToOffset, replay.Max, replayedTagged => {
- foreach(var adapted in AdaptFromJournal(replayedTagged.Persistent))
- {
- replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset), ActorRefs.NoSender);
- }
- });
+ .SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset,
+ replay.ToOffset, replay.Max, replayedTagged =>
+ {
+ foreach (var adapted in AdaptFromJournal(replayedTagged.Persistent))
+ {
+ replay.ReplyTo.Tell(
+ new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset),
+ ActorRefs.NoSender);
+ }
+ });
}
}
}
@@ -167,34 +176,41 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay)
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
- using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ using (var cancellationToken =
+ CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
.SelectAllEventsAsync(
connection,
- cancellationToken.Token,
- replay.FromOffset,
+ cancellationToken.Token,
+ replay.FromOffset,
replay.ToOffset,
- replay.Max,
- replayedEvent => {
+ replay.Max,
+ replayedEvent =>
+ {
foreach (var adapted in AdaptFromJournal(replayedEvent.Persistent))
{
- replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset), ActorRefs.NoSender);
+ replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset),
+ ActorRefs.NoSender);
}
});
}
}
}
- protected virtual async Task<(IEnumerable Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
+ protected virtual async Task<(IEnumerable Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(
+ long offset)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
- using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ using (var cancellationToken =
+ CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
- var lastOrdering = await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
- var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token, offset);
+ var lastOrdering =
+ await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
+ var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token,
+ offset);
return (ids, lastOrdering);
}
}
@@ -210,15 +226,18 @@ protected virtual async Task ReplayAllEventsAsync(ReplayAllEvents replay)
/// TBD
/// TBD
/// TBD
- public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max,
+ public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
+ long toSequenceNr, long max,
Action recoveryCallback)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
- using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
+ using (var cancellationToken =
+ CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
- await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback);
+ await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId,
+ fromSequenceNr, toSequenceNr, max, recoveryCallback);
}
}
}
@@ -257,7 +276,7 @@ protected bool WaitingForInitialization(object message)
return true;
case Status.Failure fail:
Log.Error(fail.Cause, "Failure during {0} initialization.", Self);
-
+
// trigger a restart so we have some hope of succeeding in the future even if initialization failed
throw new ApplicationException("Failed to initialize SQL Journal.", fail.Cause);
default:
@@ -268,7 +287,7 @@ protected bool WaitingForInitialization(object message)
private async Task