Skip to content

Commit

Permalink
expose NpgsqlDataSource along with connection string - close event-dr…
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Jul 15, 2024
1 parent eadba73 commit c097047
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 51 deletions.

This file was deleted.

23 changes: 12 additions & 11 deletions src/Blumchen.DependencyInjection/Workers/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
using System.Collections.Concurrent;
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Management;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Npgsql;
using Polly;


namespace Blumchen.Workers;

public abstract class Worker<T>(
DatabaseOptions databaseOptions,
NpgsqlDataSource dataSource,
string connectionString,
IHandler<T> handler,
JsonSerializerContext jsonSerializerContext,
IErrorProcessor errorProcessor,
Expand All @@ -21,9 +22,8 @@ public abstract class Worker<T>(
PublicationManagement.PublicationSetupOptions publicationSetupOptions,
ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions,
Func<TableDescriptorBuilder,TableDescriptorBuilder> tableDescriptorBuilder,
ILoggerFactory loggerFactory): BackgroundService where T : class
ILogger logger): BackgroundService where T : class
{
private readonly ILogger<Worker<T>> _logger = loggerFactory.CreateLogger<Worker<T>>();
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
Expand All @@ -33,9 +33,9 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)
{
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
(_, _) => (_, __, ___) => { }
(_, _) => (_, _, _) => { }
};
LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -45,21 +45,22 @@ await pipeline.ExecuteAsync(async token =>
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(builder =>
builder
.ConnectionString(databaseOptions.ConnectionString)
.DataSource(dataSource)
.ConnectionString(connectionString)
.WithTable(tableDescriptorBuilder)
.WithErrorProcessor(errorProcessor)
.Handles<T, IHandler<T>>(handler)
.NamingPolicy(namingPolicy)
.JsonContext(jsonSerializerContext)
.WithPublicationOptions(publicationSetupOptions)
.WithReplicationOptions(replicationSlotSetupOptions)
, ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token);
Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName);
, ct: token).GetAsyncEnumerator(token);
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);

}, stoppingToken).ConfigureAwait(false);
Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Blumchen/Serialization/ITypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ internal sealed class JsonTypeResolver(
internal void WhiteList(Type type)
{
var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName);
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type);
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo);
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type);
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo);
}

public (string, JsonTypeInfo) Resolve(Type type) =>
Expand Down
10 changes: 7 additions & 3 deletions src/Blumchen/Subscriptions/ISubscriptionOptions.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
using Blumchen.Subscriptions.Replication;
using JetBrains.Annotations;
using Npgsql;
using static Blumchen.Subscriptions.Management.PublicationManagement;
using static Blumchen.Subscriptions.Management.ReplicationSlotManagement;

namespace Blumchen.Subscriptions;

internal interface ISubscriptionOptions
{
[UsedImplicitly] string ConnectionString { get; }
[UsedImplicitly] NpgsqlDataSource DataSource { get; }
[UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; }
IReplicationDataMapper DataMapper { get; }
[UsedImplicitly] PublicationSetupOptions PublicationOptions { get; }
[UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; }
[UsedImplicitly] IErrorProcessor ErrorProcessor { get; }

void Deconstruct(
out string connectionString,
out NpgsqlDataSource dataSource,
out NpgsqlConnectionStringBuilder connectionStringBuilder,
out PublicationSetupOptions publicationSetupOptions,
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
out IErrorProcessor errorProcessor,
Expand All @@ -23,7 +26,8 @@ void Deconstruct(
}

internal record SubscriptionOptions(
string ConnectionString,
NpgsqlDataSource DataSource,
NpgsqlConnectionStringBuilder ConnectionStringBuilder,
PublicationSetupOptions PublicationOptions,
ReplicationSlotSetupOptions ReplicationOptions,
IErrorProcessor ErrorProcessor,
Expand Down
11 changes: 3 additions & 8 deletions src/Blumchen/Subscriptions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,17 @@ public enum CreateStyle
private ISubscriptionOptions? _options;
public async IAsyncEnumerable<IEnvelope> Subscribe(
Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder> builder,
ILoggerFactory? loggerFactory = null,
[EnumeratorCancellation] CancellationToken ct = default
)
{
_options = builder(_builder).Build();
var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
dataSourceBuilder.UseLoggerFactory(loggerFactory);

var dataSource = dataSourceBuilder.Build();
var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;

await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);

_connection = new LogicalReplicationConnection(connectionString);
_connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString);
await _connection.Open(ct).ConfigureAwait(false);


await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false);
var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false);

Expand Down
21 changes: 16 additions & 5 deletions src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
using Blumchen.Subscriptions.Management;
using Blumchen.Subscriptions.Replication;
using JetBrains.Annotations;
using Npgsql;
using System.Text.Json.Serialization;

namespace Blumchen.Subscriptions;

public sealed class SubscriptionOptionsBuilder
{
private static string? _connectionString;
private static NpgsqlConnectionStringBuilder? _connectionStringBuilder;
private static NpgsqlDataSource? _dataSource;
private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions;
private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
private static IReplicationDataMapper? _dataMapper;
Expand All @@ -22,7 +24,7 @@ public sealed class SubscriptionOptionsBuilder

static SubscriptionOptionsBuilder()
{
_connectionString = null;
_connectionStringBuilder = default;
_publicationSetupOptions = new();
_replicationSlotSetupOptions = default;
_dataMapper = default;
Expand All @@ -40,7 +42,14 @@ public SubscriptionOptionsBuilder WithTable(
[UsedImplicitly]
public SubscriptionOptionsBuilder ConnectionString(string connectionString)
{
_connectionString = connectionString;
_connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString);
return this;
}

[UsedImplicitly]
public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource)
{
_dataSource = dataSource;
return this;
}

Expand Down Expand Up @@ -91,7 +100,8 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce
internal ISubscriptionOptions Build()
{
_messageTable ??= TableDescriptorBuilder.Build();
ArgumentNullException.ThrowIfNull(_connectionString);
ArgumentNullException.ThrowIfNull(_connectionStringBuilder);
ArgumentNullException.ThrowIfNull(_dataSource);
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);

var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
Expand All @@ -104,7 +114,8 @@ internal ISubscriptionOptions Build()
if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer());

return new SubscriptionOptions(
_connectionString,
_dataSource,
_connectionStringBuilder,
_publicationSetupOptions,
_replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(),
_errorProcessor ?? new ConsoleOutErrorProcessor(),
Expand Down
6 changes: 5 additions & 1 deletion src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Blumchen.Subscriptions;
using Commons;
using Microsoft.Extensions.Logging;
using Npgsql;
using Subscriber;

#pragma warning disable CS8601 // Possible null reference assignment.
Expand All @@ -20,8 +21,11 @@

try
{
var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString)
.UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole()));
var cursor = subscription.Subscribe(
builder => builder
.DataSource(dataSourceBuilder.Build())
.ConnectionString(Settings.ConnectionString)
.WithTable(options => options
.Id("id")
Expand All @@ -31,7 +35,7 @@
.NamingPolicy(new AttributeNamingPolicy())
.JsonContext(SourceGenerationContext.Default)
.Handles<UserCreatedContract, Consumer>(consumer)
.Handles<UserDeletedContract, Consumer>(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct
.Handles<UserDeletedContract, Consumer>(consumer), ct:ct
).GetAsyncEnumerator(ct);
await using var cursor1 = cursor.ConfigureAwait(false);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested);
Expand Down
24 changes: 13 additions & 11 deletions src/SubscriberWorker/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Workers;
Expand All @@ -10,6 +9,7 @@
using Polly.Retry;
using Polly;
using SubscriberWorker;
using Npgsql;


#pragma warning disable CS8601 // Possible null reference assignment.
Expand All @@ -29,19 +29,21 @@
.AddSingleton<IHandler<UserCreatedContract>, Handler<UserCreatedContract>>()
.AddBlumchen<SubscriberWorker<UserDeletedContract>, UserDeletedContract>()
.AddSingleton<IHandler<UserDeletedContract>, Handler<UserDeletedContract>>()

.AddSingleton(Settings.ConnectionString)
.AddTransient(sp =>
new NpgsqlDataSourceBuilder(Settings.ConnectionString)
.UseLoggerFactory(sp.GetRequiredService<ILoggerFactory>()).Build())
.AddSingleton<INamingPolicy, AttributeNamingPolicy>()
.AddSingleton<IErrorProcessor, ConsoleOutErrorProcessor>()
.AddSingleton<JsonSerializerContext, SourceGenerationContext>()
.AddSingleton(new DatabaseOptions(Settings.ConnectionString))
.AddResiliencePipeline("default",(pipelineBuilder,context) =>
.AddResiliencePipeline("default", (pipelineBuilder, _) =>
pipelineBuilder
.AddRetry(new RetryStrategyOptions
{
BackoffType = DelayBackoffType.Constant,
Delay = TimeSpan.FromSeconds(5),
MaxRetryAttempts = int.MaxValue
}).Build())
.AddRetry(new RetryStrategyOptions
{
BackoffType = DelayBackoffType.Constant,
Delay = TimeSpan.FromSeconds(5),
MaxRetryAttempts = int.MaxValue
}).Build())
.AddLogging(loggingBuilder =>
{
loggingBuilder
Expand All @@ -51,7 +53,7 @@
.AddFilter("Blumchen", LogLevel.Debug)
.AddFilter("SubscriberWorker", LogLevel.Debug)
.AddSimpleConsole();
});
}).AddSingleton<ILogger>(sp => sp.GetRequiredService<ILoggerFactory>().CreateLogger<ILogger>());

await builder
.Build()
Expand Down
12 changes: 7 additions & 5 deletions src/SubscriberWorker/SubscriberWorker.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Management;
using Blumchen.Workers;
using Microsoft.Extensions.Logging;
using Npgsql;
using Polly.Registry;
// ReSharper disable ClassNeverInstantiated.Global

namespace SubscriberWorker;
public class SubscriberWorker<T>(
DatabaseOptions databaseOptions,
NpgsqlDataSource dataSource,
string connectionString,
IHandler<T> handler,
JsonSerializerContext jsonSerializerContext,
ResiliencePipelineProvider<string> pipelineProvider,
INamingPolicy namingPolicy,
IErrorProcessor errorProcessor,
ILoggerFactory loggerFactory
): Worker<T>(databaseOptions
ILogger logger
): Worker<T>(dataSource
, connectionString
, handler
, jsonSerializerContext
, errorProcessor
Expand All @@ -26,4 +28,4 @@ ILoggerFactory loggerFactory
, new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub")
, new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot")
, tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults()
, loggerFactory) where T : class;
, logger) where T : class;
1 change: 1 addition & 0 deletions src/Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri
var consumer = new TestHandler<T>(log, jsonTypeInfo);
var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder()
.WithErrorProcessor(new TestOutErrorProcessor(Output))
.DataSource(new NpgsqlDataSourceBuilder(connectionString).Build())
.ConnectionString(connectionString)
.JsonContext(info)
.NamingPolicy(namingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion src/Tests/When_Subscription_Already_Exists.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ await MessageAppender.AppendAsync(

var subscription = new Subscription();
await using var subscription1 = subscription.ConfigureAwait(false);
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
{
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine);
var subscription = new Subscription();
await using var subscription1 = subscription.ConfigureAwait(false);
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
{
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
var subscription = new Subscription();
await using var subscription1 = subscription.ConfigureAwait(false);

await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
{
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
return;
Expand Down

0 comments on commit c097047

Please sign in to comment.