diff --git a/docker-compose.yml b/docker-compose.yml index fa2eee0..4d0e26f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,8 @@ services: - "wal_level=logical" - "-c" - "wal_compression=on" + - "-c" + - "max_slot_wal_keep_size=1" pgadmin: container_name: pgadmin_container image: dpage/pgadmin4 diff --git a/src/Blumchen/DependencyInjection/Worker.cs b/src/Blumchen/DependencyInjection/Worker.cs index bd5cba5..d4d52b5 100644 --- a/src/Blumchen/DependencyInjection/Worker.cs +++ b/src/Blumchen/DependencyInjection/Worker.cs @@ -32,16 +32,16 @@ static Action LoggerAction(LogLevel ll, bool enabled) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await options.ResiliencePipeline.ExecuteAsync(async token => + await options.OuterPipeline.ExecuteAsync(async token => + await options.InnerPipeline.ExecuteAsync(async ct => { await using var subscription = new Subscription(); - await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct: token) - .GetAsyncEnumerator(token); - Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName); - while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested) + await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct) + .GetAsyncEnumerator(ct); + Notify(logger, LogLevel.Information, "{WorkerName} started", WorkerName); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) Notify(logger, LogLevel.Trace, "{cursor.Current} processed", cursor.Current); - - }, stoppingToken).ConfigureAwait(false); + }, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false); Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName); } diff --git a/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs index 750878a..b3ca9d5 100644 --- a/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs +++ b/src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs @@ -1,37 +1,67 @@ using Blumchen.Subscriber; +using Blumchen.Subscriptions.Management; +using Npgsql; +using Npgsql.Replication; using Polly; namespace Blumchen.DependencyInjection; -public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriberOptions SubscriberOptions); +public record WorkerOptions( + ISubscriberOptions SubscriberOptions, + ResiliencePipeline OuterPipeline, + ResiliencePipeline InnerPipeline); public interface IWorkerOptionsBuilder { IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline); IWorkerOptionsBuilder Subscription(Func? builder); WorkerOptions Build(); + IWorkerOptionsBuilder EnableSubscriptionAutoHeal(); } internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder { - private ResiliencePipeline? _resiliencePipeline = default; + private ResiliencePipeline? _outerPipeline = default; + private Func? _innerPipelineFn = default; private Func? _builder; public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline) { - _resiliencePipeline = resiliencePipeline; + _outerPipeline = resiliencePipeline; return this; }public IWorkerOptionsBuilder Subscription(Func? builder) { _builder = builder; - return this; + return this; } public WorkerOptions Build() { - ArgumentNullException.ThrowIfNull(_resiliencePipeline); + ArgumentNullException.ThrowIfNull(_outerPipeline); ArgumentNullException.ThrowIfNull(_builder); - return new(_resiliencePipeline, _builder(new OptionsBuilder()).Build()); + var subscriberOptions = _builder(new OptionsBuilder()).Build(); + return new(subscriberOptions, _outerPipeline, + _innerPipelineFn?.Invoke(subscriberOptions.ReplicationOptions.SlotName,subscriberOptions.ConnectionStringBuilder.ConnectionString) ?? + ResiliencePipeline.Empty + ); + } + + public IWorkerOptionsBuilder EnableSubscriptionAutoHeal() + { + _innerPipelineFn = (replicationSlotName, connectionString) => new ResiliencePipelineBuilder().AddRetry(new() + { + ShouldHandle = + new PredicateBuilder().Handle(exception => + exception.SqlState.Equals("55000", StringComparison.OrdinalIgnoreCase)), + MaxRetryAttempts = int.MaxValue, + OnRetry = async args => + { + await using var conn = new LogicalReplicationConnection(connectionString); + await conn.Open(args.Context.CancellationToken); + await conn.ReCreate(replicationSlotName, args.Context.CancellationToken).ConfigureAwait(false); + }, + }).Build(); + return this; } } diff --git a/src/Blumchen/Subscriber/OptionsBuilder.cs b/src/Blumchen/Subscriber/OptionsBuilder.cs index a83e141..a5d0a2d 100644 --- a/src/Blumchen/Subscriber/OptionsBuilder.cs +++ b/src/Blumchen/Subscriber/OptionsBuilder.cs @@ -149,7 +149,7 @@ internal ISubscriberOptions Build() if (_typeRegistry.Count > 0) { - Ensure.NotNull(_namingPolicy, $"{nameof(NamingPolicy)}"); + Ensure.NotNull(_namingPolicy, $"{nameof(NamingPolicy)}"); if (_jsonSerializerContext != null) { var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index f95fb3a..180e2c6 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -28,37 +28,15 @@ CancellationToken ct { (Subscription.CreateStyle.Never,_) => new None(), (Subscription.CreateStyle.WhenNotExists,true) => new AlreadyExists(), - (Subscription.CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false), - (Subscription.CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false), - (Subscription.CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.WhenNotExists,false) => await connection.Create(slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate,true) => await connection.ReCreate(slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate, false) => await connection.Create(slotName, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) }; - static async Task ReCreate( - LogicalReplicationConnection connection, - string slotName, - CancellationToken ct) - { - await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false); - return await Create(connection, slotName, ct).ConfigureAwait(false); - } - - static async Task Create( - LogicalReplicationConnection connection, - string slotName, - CancellationToken ct) - { - var result = await connection.CreatePgOutputReplicationSlot( - slotName, - slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, - cancellationToken: ct - ).ConfigureAwait(false); - - return new Created(result.SnapshotName!, result.ConsistentPoint); - } } - + public record ReplicationSlotOptions( string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, @@ -74,3 +52,29 @@ public record AlreadyExists: CreateReplicationSlotResult; public record Created(string SnapshotName, NpgsqlLogSequenceNumber LogSequenceNumber): CreateReplicationSlotResult; } } + +public static class LogicalReplicationConnectionExtensions +{ + internal static async Task Create( + this LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + var result = await connection.CreatePgOutputReplicationSlot( + slotName, + slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, + cancellationToken: ct + ).ConfigureAwait(false); + + return new Created(result.SnapshotName!, result.ConsistentPoint); + } + + public static async Task ReCreate( + this LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false); + return await connection.Create(slotName, ct).ConfigureAwait(false); + } +} diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 4b5a43d..80c79cf 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -70,7 +70,7 @@ internal async IAsyncEnumerable Subscribe( await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct) .ConfigureAwait(false); - IReplicationDataMapper replicationDataMapper = new ReplicationDataMapper(registry); + var replicationDataMapper = new ReplicationDataMapper(registry); PgOutputReplicationSlot slot; if (result is not Created created) diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 4fa5b27..1cedf67 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -60,7 +60,7 @@ await using var cursor1 = cursor.ConfigureAwait(false); while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested) if(logger.IsEnabled(LogLevel.Trace)) - logger.LogTrace($"{cursor.Current} processed"); + logger.LogTrace("{message} processed", cursor.Current); } catch (Exception e) { diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index dee6dca..93ec808 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -76,6 +76,7 @@ ) .ResiliencyPipeline( provider.GetRequiredService>().GetPipeline("default")) + .EnableSubscriptionAutoHeal() ) .AddBlumchen((provider, workerOptions) => workerOptions @@ -94,6 +95,7 @@ )) .ResiliencyPipeline( provider.GetRequiredService>().GetPipeline("default")) + .EnableSubscriptionAutoHeal() ); await builder