Skip to content

Commit

Permalink
Remove System.ConfigurationManager support from .NET 6+ (#7456)
Browse files Browse the repository at this point in the history
* Remove `System.ConfigurationManager` support

* completed removal of `System.ConfigurationManager`

* added API approvals
  • Loading branch information
Aaronontheweb authored Jan 13, 2025
1 parent 241a1c9 commit 447b550
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 133 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ protected override bool ReceivePluginInternal(object message)
{
case SelectCurrentPersistenceIds msg:
SelectAllPersistenceIdsAsync(msg.Offset)
.PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering), failure: e => new Status.Failure(e));
.PipeTo(msg.ReplyTo, success: h => new CurrentPersistenceIds(h.Ids, h.LastOrdering),
failure: e => new Status.Failure(e));
return true;
case ReplayTaggedMessages replay:
ReplayTaggedMessagesAsync(replay)
.PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e));
.PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h),
failure: e => new ReplayMessagesFailure(e));
return true;
case ReplayAllEvents replay:
ReplayAllEventsAsync(replay)
Expand Down Expand Up @@ -122,11 +124,13 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
else eventToTags.Add(p, ImmutableHashSet<string>.Empty);

if (IsTagId(p.PersistenceId))
throw new InvalidOperationException($"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
throw new InvalidOperationException(
$"Persistence Id {p.PersistenceId} must not start with {QueryExecutor.Configuration.TagsColumnName}");
}

var batch = new WriteJournalBatch(eventToTags);
using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
await QueryExecutor.InsertBatchAsync(connection, cancellationToken.Token, batch);
}
}).ToArray();
Expand All @@ -149,15 +153,20 @@ protected virtual async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessage
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using(var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
.SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset, replay.ToOffset, replay.Max, replayedTagged => {
foreach(var adapted in AdaptFromJournal(replayedTagged.Persistent))
{
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset), ActorRefs.NoSender);
}
});
.SelectByTagAsync(connection, cancellationToken.Token, replay.Tag, replay.FromOffset,
replay.ToOffset, replay.Max, replayedTagged =>
{
foreach (var adapted in AdaptFromJournal(replayedTagged.Persistent))
{
replay.ReplyTo.Tell(
new ReplayedTaggedMessage(adapted, replayedTagged.Tag, replayedTagged.Offset),
ActorRefs.NoSender);
}
});
}
}
}
Expand All @@ -167,34 +176,41 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor
.SelectAllEventsAsync(
connection,
cancellationToken.Token,
replay.FromOffset,
cancellationToken.Token,
replay.FromOffset,
replay.ToOffset,
replay.Max,
replayedEvent => {
replay.Max,
replayedEvent =>
{
foreach (var adapted in AdaptFromJournal(replayedEvent.Persistent))
{
replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset), ActorRefs.NoSender);
replay.ReplyTo.Tell(new ReplayedEvent(adapted, replayedEvent.Offset),
ActorRefs.NoSender);
}
});
}
}
}

protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(
long offset)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
var lastOrdering = await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token, offset);
var lastOrdering =
await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token);
var ids = await QueryExecutor.SelectAllPersistenceIdsAsync(connection, cancellationToken.Token,
offset);
return (ids, lastOrdering);
}
}
Expand All @@ -210,15 +226,18 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
/// <param name="max">TBD</param>
/// <param name="recoveryCallback">TBD</param>
/// <returns>TBD</returns>
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max,
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
long toSequenceNr, long max,
Action<IPersistentRepresentation> recoveryCallback)
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback);
await QueryExecutor.SelectByPersistenceIdAsync(connection, cancellationToken.Token, persistenceId,
fromSequenceNr, toSequenceNr, max, recoveryCallback);
}
}
}
Expand Down Expand Up @@ -257,7 +276,7 @@ protected bool WaitingForInitialization(object message)
return true;
case Status.Failure fail:
Log.Error(fail.Cause, "Failure during {0} initialization.", Self);

// trigger a restart so we have some hope of succeeding in the future even if initialization failed
throw new ApplicationException("Failed to initialize SQL Journal.", fail.Cause);
default:
Expand All @@ -268,15 +287,16 @@ protected bool WaitingForInitialization(object message)

private async Task<object> Initialize()
{
if (!Settings.AutoInitialize)
if (!Settings.AutoInitialize)
return new Status.Success(NotUsed.Instance);

try
{
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.CreateTablesAsync(connection, cancellationToken.Token);
}
Expand All @@ -286,6 +306,7 @@ private async Task<object> Initialize()
{
return new Status.Failure(e);
}

return new Status.Success(NotUsed.Instance);
}

Expand Down Expand Up @@ -328,9 +349,11 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await QueryExecutor.DeleteBatchAsync(connection, cancellationToken.Token, persistenceId, toSequenceNr);
await QueryExecutor.DeleteBatchAsync(connection, cancellationToken.Token, persistenceId,
toSequenceNr);
}
}
}
Expand All @@ -346,9 +369,11 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
using (var connection = CreateDbConnection())
{
await connection.OpenAsync();
using (var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var cancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token, persistenceId);
return await QueryExecutor.SelectHighestSequenceNrAsync(connection, cancellationToken.Token,
persistenceId);
}
}
}
Expand All @@ -361,15 +386,18 @@ protected virtual string GetConnectionString()
{
var connectionString = Settings.ConnectionString;

#if NETSTANDARD
if (string.IsNullOrEmpty(connectionString))
{
connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
connectionString =
System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
}
#endif

return connectionString;
}

protected ITimestampProvider GetTimestampProvider(string typeName) =>
TimestampProviderProvider.GetTimestampProvider(typeName, Context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ namespace Akka.Persistence.Sql.Common.Snapshot
public abstract class SqlSnapshotStore : SnapshotStore, IWithUnboundedStash
{
#region messages

private sealed class Initialized
{
public static readonly Initialized Instance = new Initialized();
private Initialized() { }

private Initialized()
{
}
}

#endregion

/// <summary>
Expand All @@ -57,6 +60,7 @@ protected SqlSnapshotStore(Config config)
/// TBD
/// </summary>
protected ILoggingAdapter Log => _log ?? (_log ?? Context.GetLogger());

private ILoggingAdapter _log;

/// <summary>
Expand Down Expand Up @@ -114,7 +118,8 @@ private async Task<object> Initialize()
try
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.CreateTableAsync(connection, nestedCancellationTokenSource.Token);
Expand All @@ -129,15 +134,15 @@ private async Task<object> Initialize()

private bool WaitingForInitialization(object message)
{
switch(message)
switch (message)
{
case Initialized _:
UnbecomeStacked();
Stash.UnstashAll();
return true;
case Status.Failure msg:
Log.Error(msg.Cause, "Error during snapshot store initialization");

// trigger a restart so we have some hope of succeeding in the future even if initialization failed
throw new ApplicationException("Failed to initialize SQL SnapshotStore.", msg.Cause);
default:
Expand All @@ -154,10 +159,13 @@ protected virtual string GetConnectionString()
{
var connectionString = Settings.ConnectionString;

#if NETSTANDARD
if (string.IsNullOrEmpty(connectionString))
{
connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
connectionString =
System.Configuration.ConfigurationManager.ConnectionStrings[Settings.ConnectionStringName].ConnectionString;
}
#endif

return connectionString;
}
Expand All @@ -168,13 +176,16 @@ protected virtual string GetConnectionString()
/// <param name="persistenceId">TBD</param>
/// <param name="criteria">TBD</param>
/// <returns>TBD</returns>
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
SnapshotSelectionCriteria criteria)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
return await QueryExecutor.SelectSnapshotAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
return await QueryExecutor.SelectSnapshotAsync(connection, nestedCancellationTokenSource.Token,
persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}

Expand All @@ -187,7 +198,8 @@ protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.InsertAsync(connection, nestedCancellationTokenSource.Token, snapshot, metadata);
Expand All @@ -202,11 +214,13 @@ protected override async Task SaveAsync(SnapshotMetadata metadata, object snapsh
protected override async Task DeleteAsync(SnapshotMetadata metadata)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
DateTime? timestamp = metadata.Timestamp != DateTime.MinValue ? metadata.Timestamp : default(DateTime?);
await QueryExecutor.DeleteAsync(connection, nestedCancellationTokenSource.Token, metadata.PersistenceId, metadata.SequenceNr, timestamp);
await QueryExecutor.DeleteAsync(connection, nestedCancellationTokenSource.Token, metadata.PersistenceId,
metadata.SequenceNr, timestamp);
}
}

Expand All @@ -219,11 +233,13 @@ protected override async Task DeleteAsync(SnapshotMetadata metadata)
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
using (var connection = CreateDbConnection())
using (var nestedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
using (var nestedCancellationTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(nestedCancellationTokenSource.Token);
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId,
criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}
}
}
}
Loading

0 comments on commit 447b550

Please sign in to comment.