From 040273a139b654c420f53123943342cc1e2afa42 Mon Sep 17 00:00:00 2001 From: giordanol Date: Sat, 22 Jun 2024 11:12:03 +0200 Subject: [PATCH 01/19] Enforce provided whitelist at publication level --- src/Blumchen/Database/Run.cs | 27 +++--- src/Blumchen/Serialization/ITypeResolver.cs | 2 + .../Subscriptions/ISubscriptionOptions.cs | 2 +- .../Management/PublicationManagement.cs | 86 ++++++++++++++----- .../Management/ReplicationSlotManagement.cs | 14 +-- .../Replication/ReplicationDataMapper.cs | 15 ++-- .../SnapshotReader/SnapshotReader.cs | 2 +- src/Blumchen/Subscriptions/Subscription.cs | 29 ++++--- .../SubscriptionOptionsBuilder.cs | 16 ++-- src/Blumchen/Table/MessageAppender.cs | 15 ++-- src/Publisher/Contracts.cs | 9 +- src/Publisher/Program.cs | 15 ++-- src/Subscriber/Contracts.cs | 9 +- src/Subscriber/Program.cs | 8 +- src/Tests/DatabaseFixture.cs | 8 +- ...n_First_Subscription_And_Table_Is_Empty.cs | 5 +- src/Tests/When_Subscription_Already_Exists.cs | 17 ++-- ...n_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 5 +- 18 files changed, 177 insertions(+), 107 deletions(-) diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 75dd716..51ecfef 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -16,7 +16,7 @@ public static async Task Execute( CancellationToken ct) { await using var command = dataSource.CreateCommand(sql); - await command.ExecuteNonQueryAsync(ct); + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, string tableName, CancellationToken ct) @@ -28,7 +28,7 @@ message_type VARCHAR(250) NOT NULL, data JSONB NOT NULL ); "; - await dataSource.Execute(sql, ct); + await dataSource.Execute(sql, ct).ConfigureAwait(false); } public static async Task Exists( @@ -38,12 +38,13 @@ public static async Task Exists( object[] parameters, CancellationToken ct) { - await using var command = dataSource.CreateCommand( + var command = dataSource.CreateCommand( $"SELECT EXISTS(SELECT 1 FROM {table} WHERE {where})" ); + await using var command1 = command.ConfigureAwait(false); foreach (var parameter in parameters) command.Parameters.AddWithValue(parameter); - return ((await command.ExecuteScalarAsync(ct)) as bool?) == true; + return ((await command.ExecuteScalarAsync(ct).ConfigureAwait(false)) as bool?) == true; } internal static async IAsyncEnumerable QueryTransactionSnapshot( @@ -53,16 +54,20 @@ internal static async IAsyncEnumerable QueryTransactionSnapshot( IReplicationDataMapper dataMapper, [EnumeratorCancellation] CancellationToken ct) { - await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.RepeatableRead, ct); + var transaction = await connection.BeginTransactionAsync(IsolationLevel.RepeatableRead, ct).ConfigureAwait(false); + await using var transaction1 = transaction.ConfigureAwait(false); - await using var command = + var command = new NpgsqlCommand($"SET TRANSACTION SNAPSHOT '{snapshotName}';", connection, transaction); - await command.ExecuteScalarAsync(ct); + await using var command1 = command.ConfigureAwait(false); + await command.ExecuteScalarAsync(ct).ConfigureAwait(false); - await using var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}", connection, transaction); - await using var reader = await cmd.ExecuteReaderAsync(ct); + var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}", connection, transaction); + await using var cmd1 = cmd.ConfigureAwait(false); + var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + await using var reader1 = reader.ConfigureAwait(false); - while (await reader.ReadAsync(ct)) - yield return await dataMapper.ReadFromSnapshot(reader, ct); + while (await reader.ReadAsync(ct).ConfigureAwait(false)) + yield return await dataMapper.ReadFromSnapshot(reader, ct).ConfigureAwait(false); } } diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 6c6866b..a738116 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -7,6 +7,7 @@ namespace Blumchen.Serialization; public interface ITypeResolver { + ISet RegisteredTypes { get; } Type Resolve(string value); (string, JsonTypeInfo) Resolve(Type type); JsonSerializerContext SerializationContext { get; } @@ -17,6 +18,7 @@ public class TypeResolver(JsonSerializerContext serializationContext, INamingPol public JsonSerializerContext SerializationContext { get; } = serializationContext; private static readonly ConcurrentDictionary TypeDictionary = []; private static readonly ConcurrentDictionary TypeInfoDictionary = []; + public ISet RegisteredTypes { get; } = TypeDictionary.Keys.ToHashSet(); public TypeResolver WhiteList() where T:class { diff --git a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs index 24ea481..1be177c 100644 --- a/src/Blumchen/Subscriptions/ISubscriptionOptions.cs +++ b/src/Blumchen/Subscriptions/ISubscriptionOptions.cs @@ -9,7 +9,7 @@ internal interface ISubscriptionOptions { [UsedImplicitly] string ConnectionString { get; } IReplicationDataMapper DataMapper { get; } - PublicationSetupOptions PublicationOptions { get; } + [UsedImplicitly] PublicationSetupOptions PublicationOptions { get; } [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 822c075..63ff6fe 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -1,4 +1,5 @@ using Blumchen.Database; +using Blumchen.Serialization; using Npgsql; #pragma warning disable CA2208 @@ -12,36 +13,41 @@ public static class PublicationManagement { public static async Task SetupPublication( this NpgsqlDataSource dataSource, - PublicationSetupOptions options, + PublicationSetupOptions setupOptions, CancellationToken ct ) { - var (publicationName, tableName, createStyle, shouldReAddTablesIfWereRecreated) = options; + var (publicationName, tableName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver) = setupOptions; return createStyle switch { CreateStyle.Never => new None(), - CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, ct), - CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct), - CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, ct), - _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) + CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), + CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), + CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), + _ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle)) }; static async Task ReCreate( NpgsqlDataSource dataSource, string publicationName, - string tableName, CancellationToken ct) - { - await dataSource.DropPublication(publicationName, ct); - return await Create(dataSource, publicationName, tableName, ct); + string tableName, + ITypeResolver? typeResolver, + CancellationToken ct + ) { + await dataSource.DropPublication(publicationName, ct).ConfigureAwait(false); + return await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false); } - static async Task Create( - NpgsqlDataSource dataSource, + static async Task Create(NpgsqlDataSource dataSource, string publicationName, - string tableName, CancellationToken ct) - { - await dataSource.CreatePublication(publicationName, tableName, ct); + string tableName, + ITypeResolver? typeResolver, + CancellationToken ct + ) { + await dataSource.CreatePublication(publicationName, tableName, + typeResolver?.RegisteredTypes ?? Enumerable.Empty().ToHashSet(), ct).ConfigureAwait(false); + return new Created(); } @@ -49,10 +55,10 @@ static async Task Refresh(NpgsqlDataSource dataSource, string publicationName, string tableName, bool shouldReAddTablesIfWereRecreated, - CancellationToken ct) - { + CancellationToken ct + ) { if(shouldReAddTablesIfWereRecreated) - await dataSource.RefreshPublicationTables(publicationName, tableName, ct); + await dataSource.RefreshPublicationTables(publicationName, tableName, ct).ConfigureAwait(false); return new AlreadyExists(); } } @@ -61,9 +67,31 @@ private static Task CreatePublication( this NpgsqlDataSource dataSource, string publicationName, string tableName, + ISet eventTypes, CancellationToken ct - ) => - dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct); + ) { + return eventTypes.Count switch + { + 0 => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", + ct + ), + _ => Execute(dataSource, $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WHERE ({PublicationFilter(eventTypes)}) WITH (publish = 'insert');", + ct + ) + }; + static string PublicationFilter(ICollection input) => string.Join(" OR ", input.Select(s => $"message_type = '{s}'")); + } + + private static async Task Execute( + this NpgsqlDataSource dataSource, + string sql, + CancellationToken ct + ) + { + var command = dataSource.CreateCommand(sql); + await using (command.ConfigureAwait(false)) + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } private static Task DropPublication( this NpgsqlDataSource dataSource, @@ -114,12 +142,28 @@ public record Created: SetupPublicationResult; } public sealed record PublicationSetupOptions( - string PublicationName = "pub", + string PublicationName = PublicationSetupOptions.DefaultPublicationName, string TableName = PublicationSetupOptions.DefaultTableName, CreateStyle CreateStyle = CreateStyle.WhenNotExists, bool ShouldReAddTablesIfWereRecreated = false ) { internal const string DefaultTableName = "outbox"; + internal const string DefaultPublicationName = "pub"; + public ITypeResolver? TypeResolver { get; internal init; } = default; + + public void Deconstruct( + out string publicationName, + out string tableName, + out CreateStyle createStyle, + out bool reAddTablesIfWereRecreated, + out ITypeResolver? typeResolver) + { + publicationName = PublicationName; + tableName = TableName; + createStyle = CreateStyle.WhenNotExists; + reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated; + typeResolver = TypeResolver; + } } } diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 030ef5a..53aeede 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -19,13 +19,13 @@ CancellationToken ct { var (slotName, createStyle, _) = options; - return (createStyle, await dataSource.ReplicationSlotExists(slotName, ct)) switch + return (createStyle, await dataSource.ReplicationSlotExists(slotName, ct).ConfigureAwait(false)) switch { (CreateStyle.Never,_) => new None(), (CreateStyle.WhenNotExists,true) => new AlreadyExists(), - (CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct), - (CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct), - (CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct), + (CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false), + (CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false), + (CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) }; @@ -35,8 +35,8 @@ static async Task ReCreate( string slotName, CancellationToken ct) { - await connection.DropReplicationSlot(slotName, true, ct); - return await Create(connection, slotName, ct); + await connection.DropReplicationSlot(slotName, true, ct).ConfigureAwait(false); + return await Create(connection, slotName, ct).ConfigureAwait(false); } static async Task Create( @@ -48,7 +48,7 @@ static async Task Create( slotName, slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, cancellationToken: ct - ); + ).ConfigureAwait(false); return new Created(result.SnapshotName!, result.ConsistentPoint); } diff --git a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs index 6c8ba21..5ebfb15 100644 --- a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs +++ b/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs @@ -14,7 +14,7 @@ public async Task ReadFromReplication(InsertMessage insertMessage, Ca var id = string.Empty; var columnNumber = 0; var typeName = string.Empty; - await foreach (var column in insertMessage.NewRow) + await foreach (var column in insertMessage.NewRow.ConfigureAwait(false)) { try { @@ -22,20 +22,20 @@ public async Task ReadFromReplication(InsertMessage insertMessage, Ca { case 0: id = column.Kind == TupleDataKind.BinaryValue - ? (await column.Get(ct)).ToString() - : await column.Get(ct); + ? (await column.Get(ct).ConfigureAwait(false)).ToString() + : await column.Get(ct).ConfigureAwait(false); break; case 1: using (var textReader = column.GetTextReader()) { - typeName = await textReader.ReadToEndAsync(ct); + typeName = await textReader.ReadToEndAsync(ct).ConfigureAwait(false); break; } case 2 when column.GetDataTypeName().Equals("jsonb", StringComparison.OrdinalIgnoreCase): { var type = resolver.Resolve(typeName); ArgumentNullException.ThrowIfNull(type, typeName); - return new OkEnvelope(await JsonSerialization.FromJsonAsync(type, column.GetStream(), resolver.SerializationContext, ct)); + return new OkEnvelope(await JsonSerialization.FromJsonAsync(type, column.GetStream(), resolver.SerializationContext, ct).ConfigureAwait(false)); } } } @@ -57,8 +57,9 @@ public async Task ReadFromSnapshot(NpgsqlDataReader reader, Cancellat var eventTypeName = reader.GetString(1); var eventType = resolver.Resolve(eventTypeName); ArgumentNullException.ThrowIfNull(eventType, eventTypeName); - await using var stream = await reader.GetStreamAsync(2, ct); - return new OkEnvelope(await JsonSerialization.FromJsonAsync(eventType, stream, resolver.SerializationContext, ct)); + var stream = await reader.GetStreamAsync(2, ct).ConfigureAwait(false); + await using var stream1 = stream.ConfigureAwait(false); + return new OkEnvelope(await JsonSerialization.FromJsonAsync(eventType, stream, resolver.SerializationContext, ct).ConfigureAwait(false)); } catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) { diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs index 35eec50..51c8f67 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -17,7 +17,7 @@ internal static async IAsyncEnumerable GetRowsFromSnapshot( [EnumeratorCancellation] CancellationToken ct = default ) { - await foreach (var @event in connection.QueryTransactionSnapshot(snapshotName, tableName, dataMapper, ct)) + await foreach (var @event in connection.QueryTransactionSnapshot(snapshotName, tableName, dataMapper, ct).ConfigureAwait(false)) yield return @event; } } diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 62aad1e..ea3cd17 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -48,14 +48,14 @@ public async IAsyncEnumerable Subscribe( dataSourceBuilder.UseLoggerFactory(loggerFactory); var dataSource = dataSourceBuilder.Build(); - await dataSource.EnsureTableExists(publicationSetupOptions.TableName, ct); + await dataSource.EnsureTableExists(publicationSetupOptions.TableName, ct).ConfigureAwait(false); _connection = new LogicalReplicationConnection(connectionString); - await _connection.Open(ct); + await _connection.Open(ct).ConfigureAwait(false); - await dataSource.SetupPublication(publicationSetupOptions, ct); - var result = await dataSource.SetupReplicationSlot(_connection, slotSetupOptions, ct); + await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); + var result = await dataSource.SetupReplicationSlot(_connection, slotSetupOptions, ct).ConfigureAwait(false); PgOutputReplicationSlot slot; @@ -72,25 +72,25 @@ public async IAsyncEnumerable Subscribe( ) ); - await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct)) - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct)) + await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false)) + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) yield return subscribe; } await foreach (var message in _connection.StartReplication(slot, - new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, slotSetupOptions.Binary), ct)) + new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, slotSetupOptions.Binary), ct).ConfigureAwait(false)) { if (message is InsertMessage insertMessage) { - var envelope = await replicationDataMapper.ReadFromReplication(insertMessage, ct); - await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct)) + var envelope = await replicationDataMapper.ReadFromReplication(insertMessage, ct).ConfigureAwait(false); + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false)) yield return subscribe; } // Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually // so that Npgsql can inform the server which WAL files can be removed/recycled. _connection.SetReplicationStatus(message.WalEnd); - await _connection.SendStatusUpdate(ct); + await _connection.SendStatusUpdate(ct).ConfigureAwait(false); } } @@ -103,7 +103,7 @@ IErrorProcessor errorProcessor switch (envelope) { case KoEnvelope error: - await errorProcessor.Process(error.Error); + await errorProcessor.Process(error.Error).ConfigureAwait(false); yield break; case OkEnvelope okEnvelope: { @@ -148,14 +148,15 @@ private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( [EnumeratorCancellation] CancellationToken ct = default ) { - await using var connection = await dataSource.OpenConnectionAsync(ct); + var connection = await dataSource.OpenConnectionAsync(ct).ConfigureAwait(false); + await using var connection1 = connection.ConfigureAwait(false); await foreach (var row in connection.GetRowsFromSnapshot( snapshotName, options.PublicationOptions.TableName, options.DataMapper, - ct)) + ct).ConfigureAwait(false)) yield return row; } - public async ValueTask DisposeAsync() => await _connection!.DisposeAsync(); + public async ValueTask DisposeAsync() => await _connection!.DisposeAsync().ConfigureAwait(false); } diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 570c935..7810211 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -9,27 +9,30 @@ namespace Blumchen.Subscriptions; public sealed class SubscriptionOptionsBuilder { private static string? _connectionString; - private static PublicationManagement.PublicationSetupOptions? _publicationSetupOptions; + private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _slotOptions; private static IReplicationDataMapper? _dataMapper; static SubscriptionOptionsBuilder() { _connectionString = null; - _publicationSetupOptions = default; + _publicationSetupOptions = new(); _slotOptions = default; _dataMapper = default; } + [UsedImplicitly] public SubscriptionOptionsBuilder ConnectionString(string connectionString) { _connectionString = connectionString; return this; } + [UsedImplicitly] public SubscriptionOptionsBuilder TypeResolver(ITypeResolver resolver) { _dataMapper = new ReplicationDataMapper(resolver); + _publicationSetupOptions = _publicationSetupOptions with{ TypeResolver = resolver }; return this; } @@ -41,9 +44,10 @@ public SubscriptionOptionsBuilder WithMapper(IReplicationDataMapper dataMapper) } [UsedImplicitly] - public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.PublicationSetupOptions publicationSetupOptions) + public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.PublicationSetupOptions publicationOptions) { - _publicationSetupOptions = publicationSetupOptions; + _publicationSetupOptions = + publicationOptions with { TypeResolver = _publicationSetupOptions.TypeResolver }; return this; } @@ -57,6 +61,7 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme private readonly Dictionary _registry = []; private IErrorProcessor? _errorProcessor; + [UsedImplicitly] public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class where TU : class, IConsumes { @@ -64,6 +69,7 @@ public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class return this; } + [UsedImplicitly] public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProcessor) { _errorProcessor = errorProcessor; @@ -78,7 +84,7 @@ internal ISubscriptionOptions Build() return new SubscriptionOptions( _connectionString, - _publicationSetupOptions ?? new PublicationManagement.PublicationSetupOptions(), + _publicationSetupOptions, _slotOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), _errorProcessor ?? new ConsoleOutErrorProcessor(), _dataMapper, diff --git a/src/Blumchen/Table/MessageAppender.cs b/src/Blumchen/Table/MessageAppender.cs index 29d82f0..7364529 100644 --- a/src/Blumchen/Table/MessageAppender.cs +++ b/src/Blumchen/Table/MessageAppender.cs @@ -14,11 +14,12 @@ public static async Task AppendAsync(string tableName, T @event, ITypeResolve var (typeName, jsonTypeInfo) = resolver.Resolve(type); var data = JsonSerialization.ToJson(@event, jsonTypeInfo); - await using var connection = new NpgsqlConnection(connectionString); - await connection.OpenAsync(ct); + var connection = new NpgsqlConnection(connectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.OpenAsync(ct).ConfigureAwait(false); var command = connection.CreateCommand(); command.CommandText = $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; - await command.ExecuteNonQueryAsync(ct); + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } public static async Task AppendAsync(string tableName, T @input, ITypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct) @@ -29,10 +30,10 @@ public static async Task AppendAsync(string tableName, T @input, ITypeResolve case null: throw new ArgumentNullException(nameof(@input)); case IEnumerable inputs: - await AppendBatchAsyncOfT(tableName, inputs, resolver, connection, transaction, ct); + await AppendBatchAsyncOfT(tableName, inputs, resolver, connection, transaction, ct).ConfigureAwait(false); break; default: - await AppendAsyncOfT(tableName, input, resolver, connection, transaction, ct); + await AppendAsyncOfT(tableName, input, resolver, connection, transaction, ct).ConfigureAwait(false); break; } } @@ -57,7 +58,7 @@ string tableName $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; batch.BatchCommands.Add(batchCommand); } - await batch.ExecuteNonQueryAsync(ct); + await batch.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } private static async Task AppendAsyncOfT( @@ -76,6 +77,6 @@ string tableName connection, transaction ); - await command.ExecuteNonQueryAsync(ct); + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } } diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs index ebbdd94..cd8fc4e 100644 --- a/src/Publisher/Contracts.cs +++ b/src/Publisher/Contracts.cs @@ -25,11 +25,12 @@ internal partial class SourceGenerationContext: JsonSerializerContext internal class PublisherTypesResolver: ITypeResolver { - private readonly TypeResolver _inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) + private static readonly TypeResolver Inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) .WhiteList() .WhiteList(); - public Type Resolve(string value) => _inner.Resolve(value); - public (string, JsonTypeInfo) Resolve(Type type) => _inner.Resolve(type); - public JsonSerializerContext SerializationContext => _inner.SerializationContext; + public ISet RegisteredTypes { get; } = Inner.RegisteredTypes; + public Type Resolve(string value) => Inner.Resolve(value); + public (string, JsonTypeInfo) Resolve(Type type) => Inner.Resolve(type); + public JsonSerializerContext SerializationContext => Inner.SerializationContext; } diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index 98459b1..f1da3d9 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -19,8 +19,9 @@ var cts = new CancellationTokenSource(); var ct = cts.Token; - await using var connection = new NpgsqlConnection(Settings.ConnectionString); - await connection.OpenAsync(ct); + var connection = new NpgsqlConnection(Settings.ConnectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.OpenAsync(ct).ConfigureAwait(false); //use a command for each message { var @events = Enumerable.Range(0, result).Select(i => @@ -29,24 +30,24 @@ : new UserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString())); foreach (var @event in @events) { - var transaction = await connection.BeginTransactionAsync(ct); + var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); try { switch (@event) { case UserCreated c: - await MessageAppender.AppendAsync("outbox", c, resolver, connection, transaction, ct); + await MessageAppender.AppendAsync("outbox", c, resolver, connection, transaction, ct).ConfigureAwait(false); break; case UserDeleted d: - await MessageAppender.AppendAsync("outbox", d, resolver, connection, transaction, ct); + await MessageAppender.AppendAsync("outbox", d, resolver, connection, transaction, ct).ConfigureAwait(false); break; } - await transaction.CommitAsync(ct); + await transaction.CommitAsync(ct).ConfigureAwait(false); } catch (Exception e) { - await transaction.RollbackAsync(ct); + await transaction.RollbackAsync(ct).ConfigureAwait(false); Console.WriteLine(e); throw; } diff --git a/src/Subscriber/Contracts.cs b/src/Subscriber/Contracts.cs index 5c7d2d5..7827526 100644 --- a/src/Subscriber/Contracts.cs +++ b/src/Subscriber/Contracts.cs @@ -25,12 +25,13 @@ internal partial class SourceGenerationContext: JsonSerializerContext internal class SubscriberTypesResolver: ITypeResolver { - private readonly TypeResolver _inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) + private static readonly TypeResolver Inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) .WhiteList() .WhiteList(); - public Type Resolve(string value) => _inner.Resolve(value); - public (string, JsonTypeInfo) Resolve(Type type) => _inner.Resolve(type); - public JsonSerializerContext SerializationContext => _inner.SerializationContext; + public ISet RegisteredTypes { get; } = Inner.RegisteredTypes; + public Type Resolve(string value) => Inner.Resolve(value); + public (string, JsonTypeInfo) Resolve(Type type) => Inner.Resolve(type); + public JsonSerializerContext SerializationContext => Inner.SerializationContext; } } diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 52f91fc..0b5380b 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -15,18 +15,20 @@ var ct = cancellationTokenSource.Token; var consumer = new Consumer(); -await using var subscription = new Subscription(); +var subscription = new Subscription(); +await using var subscription1 = subscription.ConfigureAwait(false); try { - await using var cursor = subscription.Subscribe( + var cursor = subscription.Subscribe( builder => builder .ConnectionString(Settings.ConnectionString) .TypeResolver(new SubscriberTypesResolver()) .Consumes(consumer) .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct ).GetAsyncEnumerator(ct); - while (await cursor.MoveNextAsync() && !ct.IsCancellationRequested); + await using var cursor1 = cursor.ConfigureAwait(false); + while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested); } catch (Exception e) { diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 9c075b1..5cd8c54 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -22,7 +22,7 @@ public async Task Handle(T value) { Console.WriteLine(e); } - await Task.CompletedTask; + await Task.CompletedTask.ConfigureAwait(false); } } @@ -37,7 +37,7 @@ public Task InitializeAsync() public async Task DisposeAsync() { - await Container.DisposeAsync(); + await Container.DisposeAsync().ConfigureAwait(false); } protected static async Task CreateOutboxTable( @@ -47,7 +47,7 @@ CancellationToken ct { var tableName = Randomise("outbox"); - await dataSource.EnsureTableExists(tableName, ct); + await dataSource.EnsureTableExists(tableName, ct).ConfigureAwait(false); return tableName; } @@ -70,7 +70,7 @@ protected static (TypeResolver typeResolver, TestConsumer consumer, Subscript .TypeResolver(typeResolver) .Consumes>(consumer) .WithPublicationOptions( - new PublicationManagement.PublicationSetupOptions(publicationName ?? Randomise("events_pub"), eventsTable) + new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub"), TableName: eventsTable) ) .WithReplicationOptions( new ReplicationSlotManagement.ReplicationSlotSetupOptions(slotName ?? Randomise("events_slot")) diff --git a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs index 93df4c3..898b0aa 100644 --- a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs +++ b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs @@ -20,11 +20,12 @@ public async Task Execute() var (typeResolver, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, SourceGenerationContext.Default.UserCreated, testOutputHelper.WriteLine); - await using var subscription = new Subscription(); + var subscription = new Subscription(); + await using var subscription1 = subscription.ConfigureAwait(false); var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { Assert.Equal(@event, ((OkEnvelope)envelope).Value); return; diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 0778e5e..2bfce80 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -30,8 +30,9 @@ public async Task Execute() var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); - await using var subscription = new Subscription(); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + var subscription = new Subscription(); + await using var subscription1 = subscription.ConfigureAwait(false); + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { Assert.Equal(@event, ((OkEnvelope)envelope).Value); return; @@ -45,14 +46,16 @@ private static async Task SetupReplication( string tableName, CancellationToken ct) { - await using var dataSource = NpgsqlDataSource.Create(connectionString); - await dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct); - await using var connection = new LogicalReplicationConnection(connectionString); - await connection.Open(ct); + var dataSource = NpgsqlDataSource.Create(connectionString); + await using var source = dataSource.ConfigureAwait(false); + await dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct).ConfigureAwait(false); + var connection = new LogicalReplicationConnection(connectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.Open(ct).ConfigureAwait(false); await connection.CreatePgOutputReplicationSlot( slotName, slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, cancellationToken: ct - ); + ).ConfigureAwait(false); } } diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index f20c53a..884c365 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -27,9 +27,10 @@ public async Task Execute() var (_, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, SourceGenerationContext.Default.UserDeleted, testOutputHelper.WriteLine); - await using var subscription = new Subscription(); + var subscription = new Subscription(); + await using var subscription1 = subscription.ConfigureAwait(false); - await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { Assert.Equal(@event, ((OkEnvelope)envelope).Value); return; From b72c81adae9138d98f5f45e3486f8bf0529b42f5 Mon Sep 17 00:00:00 2001 From: giordanol Date: Sat, 22 Jun 2024 17:43:11 +0200 Subject: [PATCH 02/19] move from initializer to getter --- src/Blumchen/Serialization/ITypeResolver.cs | 5 +++-- src/Subscriber/Contracts.cs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index a738116..1ce6d97 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -18,9 +18,9 @@ public class TypeResolver(JsonSerializerContext serializationContext, INamingPol public JsonSerializerContext SerializationContext { get; } = serializationContext; private static readonly ConcurrentDictionary TypeDictionary = []; private static readonly ConcurrentDictionary TypeInfoDictionary = []; - public ISet RegisteredTypes { get; } = TypeDictionary.Keys.ToHashSet(); + - public TypeResolver WhiteList() where T:class + public TypeResolver WhiteList() where T:class { var type = typeof(T); var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); @@ -32,5 +32,6 @@ public TypeResolver WhiteList() where T:class public (string, JsonTypeInfo) Resolve(Type type) => (TypeDictionary.Single(kv => kv.Value == type).Key, TypeInfoDictionary[type]); + public ISet RegisteredTypes { get => TypeDictionary.Keys.ToHashSet(); } public Type Resolve(string type) => TypeDictionary[type]; } diff --git a/src/Subscriber/Contracts.cs b/src/Subscriber/Contracts.cs index 7827526..efbd26d 100644 --- a/src/Subscriber/Contracts.cs +++ b/src/Subscriber/Contracts.cs @@ -29,7 +29,7 @@ internal class SubscriberTypesResolver: ITypeResolver .WhiteList() .WhiteList(); - public ISet RegisteredTypes { get; } = Inner.RegisteredTypes; + public ISet RegisteredTypes { get => Inner.RegisteredTypes; } public Type Resolve(string value) => Inner.Resolve(value); public (string, JsonTypeInfo) Resolve(Type type) => Inner.Resolve(type); public JsonSerializerContext SerializationContext => Inner.SerializationContext; From 04e4b2ed4bf3b2927dafd2abdfd32c3b547d0540 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 25 Jun 2024 22:56:01 +0200 Subject: [PATCH 03/19] Some heavy lifting on library ergonomis aimed at simplifyng usage. Enabled filters on snapshot reading --- src/Blumchen/Database/Run.cs | 14 +- .../MessageAppender.cs | 11 +- .../PublisherSetupOptionsBuilder.cs | 43 +++++ .../Serialization/IDictionaryExtensions.cs | 9 + src/Blumchen/Serialization/ITypeResolver.cs | 35 ++-- .../Serialization/SOHSkippingStream.cs | 160 ------------------ .../Management/PublicationManagement.cs | 26 +-- .../Management/ReplicationSlotManagement.cs | 12 +- .../Replication/ReplicationDataMapper.cs | 2 +- .../SnapshotReader/SnapshotReader.cs | 14 +- src/Blumchen/Subscriptions/Subscription.cs | 30 ++-- .../SubscriptionOptionsBuilder.cs | 46 +++-- src/Publisher/Contracts.cs | 34 ++-- src/Publisher/Program.cs | 32 ++-- src/Subscriber/Contracts.cs | 17 +- src/Subscriber/Program.cs | 3 +- src/Tests/DatabaseFixture.cs | 29 ++-- ...nerationContext.cs => PublisherContext.cs} | 10 +- src/Tests/SubscriberContext.cs | 14 ++ ...n_First_Subscription_And_Table_Is_Empty.cs | 28 ++- src/Tests/When_Subscription_Already_Exists.cs | 25 ++- ...n_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 29 ++-- 22 files changed, 283 insertions(+), 340 deletions(-) rename src/Blumchen/{Table => Publications}/MessageAppender.cs (90%) create mode 100644 src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs create mode 100644 src/Blumchen/Serialization/IDictionaryExtensions.cs delete mode 100644 src/Blumchen/Serialization/SOHSkippingStream.cs rename src/Tests/{SourceGenerationContext.cs => PublisherContext.cs} (53%) create mode 100644 src/Tests/SubscriberContext.cs diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 51ecfef..59f55a8 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -44,13 +44,13 @@ public static async Task Exists( await using var command1 = command.ConfigureAwait(false); foreach (var parameter in parameters) command.Parameters.AddWithValue(parameter); - return ((await command.ExecuteScalarAsync(ct).ConfigureAwait(false)) as bool?) == true; + return await command.ExecuteScalarAsync(ct).ConfigureAwait(false) as bool? == true; } - internal static async IAsyncEnumerable QueryTransactionSnapshot( - this NpgsqlConnection connection, + internal static async IAsyncEnumerable QueryTransactionSnapshot(this NpgsqlConnection connection, string snapshotName, string tableName, + ISet registeredTypesKeys, IReplicationDataMapper dataMapper, [EnumeratorCancellation] CancellationToken ct) { @@ -61,13 +61,17 @@ internal static async IAsyncEnumerable QueryTransactionSnapshot( new NpgsqlCommand($"SET TRANSACTION SNAPSHOT '{snapshotName}';", connection, transaction); await using var command1 = command.ConfigureAwait(false); await command.ExecuteScalarAsync(ct).ConfigureAwait(false); - - var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}", connection, transaction); + var whereClause = registeredTypesKeys.Count > 0 + ? $" WHERE message_type IN({PublicationFilter(registeredTypesKeys)})" + : null; + var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}{whereClause}", connection, transaction); await using var cmd1 = cmd.ConfigureAwait(false); var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); await using var reader1 = reader.ConfigureAwait(false); while (await reader.ReadAsync(ct).ConfigureAwait(false)) yield return await dataMapper.ReadFromSnapshot(reader, ct).ConfigureAwait(false); + + static string PublicationFilter(ICollection input) => string.Join(", ", input.Select(s => $"'{s}'")); } } diff --git a/src/Blumchen/Table/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs similarity index 90% rename from src/Blumchen/Table/MessageAppender.cs rename to src/Blumchen/Publications/MessageAppender.cs index 7364529..7485006 100644 --- a/src/Blumchen/Table/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -2,12 +2,13 @@ using Blumchen.Serialization; using Npgsql; -namespace Blumchen.Table; +namespace Blumchen.Publications; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public static class MessageAppender { - public static async Task AppendAsync(string tableName, T @event, ITypeResolver resolver, string connectionString, CancellationToken ct) + + public static async Task AppendAsync(string tableName, T @event, IJsonTypeResolver resolver, string connectionString, CancellationToken ct) where T: class { var type = typeof(T); @@ -22,7 +23,7 @@ public static async Task AppendAsync(string tableName, T @event, ITypeResolve await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } - public static async Task AppendAsync(string tableName, T @input, ITypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct) + public static async Task AppendAsync(string tableName, T @input, IJsonTypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct) where T : class { switch (@input) @@ -41,7 +42,7 @@ public static async Task AppendAsync(string tableName, T @input, ITypeResolve private static async Task AppendBatchAsyncOfT( string tableName , T inputs - , ITypeResolver resolver + , IJsonTypeResolver resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct) where T : class, IEnumerable @@ -64,7 +65,7 @@ string tableName private static async Task AppendAsyncOfT( string tableName , T @input - , ITypeResolver resolver + , IJsonTypeResolver resolver , NpgsqlConnection connection , NpgsqlTransaction transaction , CancellationToken ct) where T : class diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs new file mode 100644 index 0000000..6c811f2 --- /dev/null +++ b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs @@ -0,0 +1,43 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; +using JetBrains.Annotations; + +namespace Blumchen.Publications; + +#pragma warning disable CS1591 +public class PublisherSetupOptionsBuilder +{ + private INamingPolicy? _namingPolicy; + private JsonSerializerContext? _jsonSerializerContext; + + [UsedImplicitly] + public PublisherSetupOptionsBuilder NamingPolicy(INamingPolicy namingPolicy) + { + _namingPolicy = namingPolicy; + return this; + } + + [UsedImplicitly] + public PublisherSetupOptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext) + { + _jsonSerializerContext = jsonSerializerContext; + return this; + } + + public IJsonTypeResolver Build() + { + ArgumentNullException.ThrowIfNull(_jsonSerializerContext); + ArgumentNullException.ThrowIfNull(_namingPolicy); + + var jsonTypeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); + using var typeEnum = _jsonSerializerContext.GetType() + .GetCustomAttributesData() + .Where(attributeData => attributeData.AttributeType == typeof(JsonSerializableAttribute)) + .Select(att => att.ConstructorArguments.Single()) + .Select(ca => ca.Value).OfType().GetEnumerator(); + while (typeEnum.MoveNext()) + jsonTypeResolver.WhiteList(typeEnum.Current); + + return jsonTypeResolver; + } +} diff --git a/src/Blumchen/Serialization/IDictionaryExtensions.cs b/src/Blumchen/Serialization/IDictionaryExtensions.cs new file mode 100644 index 0000000..f97da92 --- /dev/null +++ b/src/Blumchen/Serialization/IDictionaryExtensions.cs @@ -0,0 +1,9 @@ +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member +namespace Blumchen.Serialization; + + +public static class DictionaryExtensions +{ + internal static IEnumerable Keys(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Keys ?? Enumerable.Empty(); + internal static IEnumerable Values(this JsonTypeResolver? resolver) => resolver?.RegisteredTypes.Values ?? Enumerable.Empty(); +} diff --git a/src/Blumchen/Serialization/ITypeResolver.cs b/src/Blumchen/Serialization/ITypeResolver.cs index 1ce6d97..049462b 100644 --- a/src/Blumchen/Serialization/ITypeResolver.cs +++ b/src/Blumchen/Serialization/ITypeResolver.cs @@ -5,33 +5,34 @@ namespace Blumchen.Serialization; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member -public interface ITypeResolver +public interface ITypeResolver { - ISet RegisteredTypes { get; } - Type Resolve(string value); - (string, JsonTypeInfo) Resolve(Type type); - JsonSerializerContext SerializationContext { get; } + (string, T) Resolve(Type type); } -public class TypeResolver(JsonSerializerContext serializationContext, INamingPolicy? namingPolicy=default): ITypeResolver +public interface IJsonTypeResolver: ITypeResolver; + +internal sealed class JsonTypeResolver( + JsonSerializerContext serializationContext, + INamingPolicy? namingPolicy = default) + : IJsonTypeResolver { public JsonSerializerContext SerializationContext { get; } = serializationContext; - private static readonly ConcurrentDictionary TypeDictionary = []; - private static readonly ConcurrentDictionary TypeInfoDictionary = []; - + private readonly ConcurrentDictionary _typeDictionary = []; + private readonly ConcurrentDictionary _typeInfoDictionary = []; + private readonly INamingPolicy _namingPolicy = namingPolicy ?? new FQNNamingPolicy(); - public TypeResolver WhiteList() where T:class + internal void WhiteList(Type type) { - var type = typeof(T); var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); - TypeDictionary.AddOrUpdate((namingPolicy ?? new FQNNamingPolicy()).Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); - TypeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); - return this; + _typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); + _typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); } public (string, JsonTypeInfo) Resolve(Type type) => - (TypeDictionary.Single(kv => kv.Value == type).Key, TypeInfoDictionary[type]); + (_typeDictionary.Single(kv => kv.Value == type).Key, _typeInfoDictionary[type]); - public ISet RegisteredTypes { get => TypeDictionary.Keys.ToHashSet(); } - public Type Resolve(string type) => TypeDictionary[type]; + internal IDictionary RegisteredTypes { get => _typeDictionary; } + internal Type Resolve(string type) => _typeDictionary[type]; } + diff --git a/src/Blumchen/Serialization/SOHSkippingStream.cs b/src/Blumchen/Serialization/SOHSkippingStream.cs deleted file mode 100644 index 92f84be..0000000 --- a/src/Blumchen/Serialization/SOHSkippingStream.cs +++ /dev/null @@ -1,160 +0,0 @@ -namespace Blumchen.Serialization; - -internal class SOHSkippingStream(Stream inner): Stream -{ - public override bool CanRead => true; - public override bool CanSeek => false; - public override bool CanWrite => false; - public override long Length => inner.Length; - - public override long Position - { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); - } - - public override int ReadByte() - { - var initialPosition = inner.Position; - var result = inner.ReadByte(); - if (result == 1 && initialPosition == 0) - { - result = inner.ReadByte(); - } - - return result; - } - - public override int Read(byte[] buffer, int offset, int count) - { - var totalRead = 0; - if (inner.Position == 0) - { - var readBytes = inner.Read(buffer, 0, 1); - if (readBytes <= 0) - { - return readBytes; - } - - if (buffer[0] != 1) - { - offset += 1; - count -= 1; - totalRead = 1; - } - } - - return totalRead + inner.Read(buffer, offset, count); - } - - public override async Task ReadAsync(byte[] buffer, int offset, int count, - CancellationToken cancellationToken) - { - var totalRead = 0; - if (inner.Position == 0) - { - var readBytes = await inner.ReadAsync(buffer.AsMemory(0, 1), cancellationToken).ConfigureAwait(false); - if (readBytes <= 0) - { - return readBytes; - } - - if (buffer[0] != 1) - { - offset += 1; - count -= 1; - totalRead = 1; - } - } - - return totalRead + await inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); - } - - public override async ValueTask ReadAsync(Memory buffer, - CancellationToken cancellationToken = default) - { - var totalRead = 0; - if (inner.Position == 0) - { - var singleByteBuffer = buffer[..1]; - - var readBytes = await inner.ReadAsync(singleByteBuffer, cancellationToken).ConfigureAwait(false); - if (readBytes <= 0) - { - return readBytes; - } - - if (singleByteBuffer.Span[0] != 1) - { - totalRead = 1; - buffer = buffer[1..]; - } - } - - return totalRead + await inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); - } - - public override int Read(Span buffer) - { - var totalRead = 0; - if (inner.Position == 0) - { - var singleByteBuffer = buffer[..1]; - var readBytes = inner.Read(singleByteBuffer); - if (readBytes <= 0) - { - return readBytes; - } - - if (singleByteBuffer[0] != 1) - { - totalRead = 1; - buffer = buffer[1..]; - } - } - - return totalRead + inner.Read(buffer); - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, - object? state) - { - throw new NotSupportedException(); - } - - public override int EndRead(IAsyncResult asyncResult) - { - throw new NotSupportedException(); - } - - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } - - public override void Flush() - { - throw new NotSupportedException(); - } - - protected override void Dispose(bool disposing) - { - inner.Dispose(); - } - - public override ValueTask DisposeAsync() - { - return inner.DisposeAsync(); - } -} diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index 63ff6fe..d941a92 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -21,10 +21,10 @@ CancellationToken ct return createStyle switch { - CreateStyle.Never => new None(), - CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), - CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), - CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), + Subscription.CreateStyle.Never => new None(), + Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), + Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), + Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle)) }; @@ -32,7 +32,7 @@ static async Task ReCreate( NpgsqlDataSource dataSource, string publicationName, string tableName, - ITypeResolver? typeResolver, + JsonTypeResolver? typeResolver, CancellationToken ct ) { await dataSource.DropPublication(publicationName, ct).ConfigureAwait(false); @@ -42,11 +42,11 @@ CancellationToken ct static async Task Create(NpgsqlDataSource dataSource, string publicationName, string tableName, - ITypeResolver? typeResolver, + JsonTypeResolver? typeResolver, CancellationToken ct ) { await dataSource.CreatePublication(publicationName, tableName, - typeResolver?.RegisteredTypes ?? Enumerable.Empty().ToHashSet(), ct).ConfigureAwait(false); + typeResolver.Keys().ToHashSet(), ct).ConfigureAwait(false); return new Created(); } @@ -144,24 +144,24 @@ public record Created: SetupPublicationResult; public sealed record PublicationSetupOptions( string PublicationName = PublicationSetupOptions.DefaultPublicationName, string TableName = PublicationSetupOptions.DefaultTableName, - CreateStyle CreateStyle = CreateStyle.WhenNotExists, + Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, bool ShouldReAddTablesIfWereRecreated = false ) { internal const string DefaultTableName = "outbox"; internal const string DefaultPublicationName = "pub"; - public ITypeResolver? TypeResolver { get; internal init; } = default; + internal JsonTypeResolver? TypeResolver { get; init; } = default; - public void Deconstruct( + internal void Deconstruct( out string publicationName, out string tableName, - out CreateStyle createStyle, + out Subscription.CreateStyle createStyle, out bool reAddTablesIfWereRecreated, - out ITypeResolver? typeResolver) + out JsonTypeResolver? typeResolver) { publicationName = PublicationName; tableName = TableName; - createStyle = CreateStyle.WhenNotExists; + createStyle = Subscription.CreateStyle.WhenNotExists; reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated; typeResolver = TypeResolver; } diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 53aeede..22e1226 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -21,11 +21,11 @@ CancellationToken ct return (createStyle, await dataSource.ReplicationSlotExists(slotName, ct).ConfigureAwait(false)) switch { - (CreateStyle.Never,_) => new None(), - (CreateStyle.WhenNotExists,true) => new AlreadyExists(), - (CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false), - (CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false), - (CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.Never,_) => new None(), + (Subscription.CreateStyle.WhenNotExists,true) => new AlreadyExists(), + (Subscription.CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct).ConfigureAwait(false), + (Subscription.CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) }; @@ -62,7 +62,7 @@ CancellationToken ct public record ReplicationSlotSetupOptions( string SlotName = $"{PublicationManagement.PublicationSetupOptions.DefaultTableName}_slot", - CreateStyle CreateStyle = CreateStyle.WhenNotExists, + Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY ); diff --git a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs b/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs index 5ebfb15..75be676 100644 --- a/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs +++ b/src/Blumchen/Subscriptions/Replication/ReplicationDataMapper.cs @@ -7,7 +7,7 @@ namespace Blumchen.Subscriptions.Replication; -internal sealed class ReplicationDataMapper(ITypeResolver resolver): IReplicationDataMapper +internal sealed class ReplicationDataMapper(JsonTypeResolver resolver): IReplicationDataMapper { public async Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct) { diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs index 51c8f67..4bf4b5f 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -9,15 +9,19 @@ namespace Blumchen.Subscriptions.SnapshotReader; public static class SnapshotReader { - internal static async IAsyncEnumerable GetRowsFromSnapshot( - this NpgsqlConnection connection, + internal static async IAsyncEnumerable GetRowsFromSnapshot(this NpgsqlConnection connection, string snapshotName, string tableName, IReplicationDataMapper dataMapper, - [EnumeratorCancellation] CancellationToken ct = default - ) + ISet registeredTypes, + [EnumeratorCancellation] CancellationToken ct = default) { - await foreach (var @event in connection.QueryTransactionSnapshot(snapshotName, tableName, dataMapper, ct).ConfigureAwait(false)) + await foreach (var @event in connection.QueryTransactionSnapshot( + snapshotName, + tableName, + registeredTypes, + dataMapper, + ct).ConfigureAwait(false)) yield return @event; } } diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index ea3cd17..9b274e6 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -1,6 +1,7 @@ using System.Reflection; using System.Runtime.CompilerServices; using Blumchen.Database; +using Blumchen.Serialization; using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.ReplicationMessageHandlers; using Blumchen.Subscriptions.SnapshotReader; @@ -17,22 +18,14 @@ namespace Blumchen.Subscriptions; using static ReplicationSlotManagement; using static ReplicationSlotManagement.CreateReplicationSlotResult; -public interface ISubscription -{ - IAsyncEnumerable Subscribe(Func builder, - ILoggerFactory? loggerFactory, - CancellationToken ct); -} - -public enum CreateStyle -{ - WhenNotExists, - AlwaysRecreate, - Never -} - -public sealed class Subscription: ISubscription, IAsyncDisposable +public sealed class Subscription: IAsyncDisposable { + public enum CreateStyle + { + WhenNotExists, + AlwaysRecreate, + Never + } private static LogicalReplicationConnection? _connection; private static readonly SubscriptionOptionsBuilder Builder = new(); private ISubscriptionOptions? _options; @@ -154,9 +147,14 @@ private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( snapshotName, options.PublicationOptions.TableName, options.DataMapper, + options.PublicationOptions.TypeResolver.Keys().ToHashSet(), ct).ConfigureAwait(false)) yield return row; } - public async ValueTask DisposeAsync() => await _connection!.DisposeAsync().ConfigureAwait(false); + public async ValueTask DisposeAsync() + { + if(_connection != null) + await _connection.DisposeAsync().ConfigureAwait(false); + } } diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index 7810211..eadbd1e 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -2,6 +2,7 @@ using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.Replication; using JetBrains.Annotations; +using System.Text.Json.Serialization; namespace Blumchen.Subscriptions; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member @@ -10,14 +11,19 @@ public sealed class SubscriptionOptionsBuilder { private static string? _connectionString; private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions; - private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _slotOptions; + private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions; private static IReplicationDataMapper? _dataMapper; + private readonly Dictionary _registry = []; + private IErrorProcessor? _errorProcessor; + private INamingPolicy? _namingPolicy; + private JsonSerializerContext? _jsonSerializerContext; + static SubscriptionOptionsBuilder() { _connectionString = null; _publicationSetupOptions = new(); - _slotOptions = default; + _replicationSlotSetupOptions = default; _dataMapper = default; } @@ -29,17 +35,16 @@ public SubscriptionOptionsBuilder ConnectionString(string connectionString) } [UsedImplicitly] - public SubscriptionOptionsBuilder TypeResolver(ITypeResolver resolver) + public SubscriptionOptionsBuilder NamingPolicy(INamingPolicy namingPolicy) { - _dataMapper = new ReplicationDataMapper(resolver); - _publicationSetupOptions = _publicationSetupOptions with{ TypeResolver = resolver }; + _namingPolicy = namingPolicy; return this; } [UsedImplicitly] - public SubscriptionOptionsBuilder WithMapper(IReplicationDataMapper dataMapper) + public SubscriptionOptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext) { - _dataMapper = dataMapper; + _jsonSerializerContext = jsonSerializerContext; return this; } @@ -54,13 +59,10 @@ public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.P [UsedImplicitly] public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotOptions) { - _slotOptions = replicationSlotOptions; + _replicationSlotSetupOptions = replicationSlotOptions; return this; } - private readonly Dictionary _registry = []; - private IErrorProcessor? _errorProcessor; - [UsedImplicitly] public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class where TU : class, IConsumes @@ -75,20 +77,34 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce _errorProcessor = errorProcessor; return this; } - + internal ISubscriptionOptions Build() { ArgumentNullException.ThrowIfNull(_connectionString); - ArgumentNullException.ThrowIfNull(_dataMapper); - if(_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); + ArgumentNullException.ThrowIfNull(_jsonSerializerContext); + var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); + foreach (var type in _registry.Keys) typeResolver.WhiteList(type); + _dataMapper = new ReplicationDataMapper(typeResolver); + _publicationSetupOptions = _publicationSetupOptions with { TypeResolver = typeResolver }; + + Ensure(() =>_registry.Keys.Except(_publicationSetupOptions.TypeResolver.Values()), "Unregistered types:{0}"); + Ensure(() => _publicationSetupOptions.TypeResolver.Values().Except(_registry.Keys), "Unregistered consumer for type:{0}"); + if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); + return new SubscriptionOptions( _connectionString, _publicationSetupOptions, - _slotOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), + _replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), _errorProcessor ?? new ConsoleOutErrorProcessor(), _dataMapper, _registry); + static void Ensure(Func> evalFn, string formattedMsg) + { + var misses = evalFn().ToArray(); + if (misses.Length > 0) throw new Exception(string.Format(formattedMsg, string.Join(", ", misses.Select(t => $"'{t.Name}'")))); + } + } } diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs index cd8fc4e..e9995e9 100644 --- a/src/Publisher/Contracts.cs +++ b/src/Publisher/Contracts.cs @@ -1,36 +1,30 @@ -using System.Text.Json.Serialization.Metadata; using System.Text.Json.Serialization; using Blumchen.Serialization; namespace Publisher; +public interface IContract{} [MessageUrn("user-created:v1")] internal record UserCreated( Guid Id, - string Name -); + string Name = "Created" +):IContract; [MessageUrn("user-deleted:v1")] internal record UserDeleted( Guid Id, - string Name -); + string Name = "Deleted" +): IContract; + +[MessageUrn("user-modified:v1")] //subscription ignored +internal record UserModified( + Guid Id, + string Name = "Modified" +): IContract; + [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreated))] [JsonSerializable(typeof(UserDeleted))] -internal partial class SourceGenerationContext: JsonSerializerContext -{ -} - -internal class PublisherTypesResolver: ITypeResolver -{ - private static readonly TypeResolver Inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) - .WhiteList() - .WhiteList(); - - public ISet RegisteredTypes { get; } = Inner.RegisteredTypes; - public Type Resolve(string value) => Inner.Resolve(value); - public (string, JsonTypeInfo) Resolve(Type type) => Inner.Resolve(type); - public JsonSerializerContext SerializationContext => Inner.SerializationContext; -} +[JsonSerializable(typeof(UserModified))] +internal partial class SourceGenerationContext: JsonSerializerContext; diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index f1da3d9..02ad312 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -1,14 +1,19 @@ -using Blumchen.Table; +using Blumchen.Publications; +using Blumchen.Serialization; using Commons; using Npgsql; using Publisher; using UserCreated = Publisher.UserCreated; using UserDeleted = Publisher.UserDeleted; +using UserModified = Publisher.UserModified; Console.Title = typeof(Program).Assembly.GetName().Name!; Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); -var resolver = new PublisherTypesResolver(); +var resolver = new PublisherSetupOptionsBuilder() + .JsonContext(SourceGenerationContext.Default) + .NamingPolicy(new AttributeNamingPolicy()) + .Build(); do { @@ -25,9 +30,12 @@ //use a command for each message { var @events = Enumerable.Range(0, result).Select(i => - int.IsEvenInteger(i) - ? new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()) as object - : new UserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString())); + (i % 3) switch + { + 0 => new UserCreated(Guid.NewGuid()) as object, + 1 => new UserDeleted(Guid.NewGuid()), + _ => new UserModified(Guid.NewGuid()) + }); foreach (var @event in @events) { var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false); @@ -35,11 +43,14 @@ { switch (@event) { - case UserCreated c: - await MessageAppender.AppendAsync("outbox", c, resolver, connection, transaction, ct).ConfigureAwait(false); + case UserCreated m: + await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); + break; + case UserDeleted m: + await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); break; - case UserDeleted d: - await MessageAppender.AppendAsync("outbox", d, resolver, connection, transaction, ct).ConfigureAwait(false); + case UserModified m: + await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); break; } @@ -70,6 +81,3 @@ //} } } while (true); - - - diff --git a/src/Subscriber/Contracts.cs b/src/Subscriber/Contracts.cs index efbd26d..527486b 100644 --- a/src/Subscriber/Contracts.cs +++ b/src/Subscriber/Contracts.cs @@ -1,5 +1,4 @@ using System.Text.Json.Serialization; -using System.Text.Json.Serialization.Metadata; using Blumchen.Serialization; namespace Subscriber @@ -19,19 +18,5 @@ string Name [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(UserCreatedContract))] [JsonSerializable(typeof(UserDeletedContract))] - internal partial class SourceGenerationContext: JsonSerializerContext - { - } - - internal class SubscriberTypesResolver: ITypeResolver - { - private static readonly TypeResolver Inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) - .WhiteList() - .WhiteList(); - - public ISet RegisteredTypes { get => Inner.RegisteredTypes; } - public Type Resolve(string value) => Inner.Resolve(value); - public (string, JsonTypeInfo) Resolve(Type type) => Inner.Resolve(type); - public JsonSerializerContext SerializationContext => Inner.SerializationContext; - } + internal partial class SourceGenerationContext: JsonSerializerContext; } diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 0b5380b..6d9a473 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -23,7 +23,8 @@ var cursor = subscription.Subscribe( builder => builder .ConnectionString(Settings.ConnectionString) - .TypeResolver(new SubscriberTypesResolver()) + .NamingPolicy(new AttributeNamingPolicy()) + .JsonContext(SourceGenerationContext.Default) .Consumes(consumer) .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct ).GetAsyncEnumerator(ct); diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 5cd8c54..d538a1a 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -1,3 +1,5 @@ +using System.Diagnostics; +using System.Text.Json.Serialization; using System.Text.Json.Serialization.Metadata; using Blumchen.Database; using Blumchen.Serialization; @@ -8,8 +10,10 @@ namespace Tests; + public abstract class DatabaseFixture: IAsyncLifetime { + protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2)); protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class { public async Task Handle(T value) @@ -30,15 +34,9 @@ public async Task Handle(T value) .WithCommand("-c", "wal_level=logical") .Build(); - public Task InitializeAsync() - { - return Container.StartAsync(); - } + public Task InitializeAsync() => Container.StartAsync(); - public async Task DisposeAsync() - { - await Container.DisposeAsync().ConfigureAwait(false); - } + public async Task DisposeAsync() => await Container.DisposeAsync().ConfigureAwait(false); protected static async Task CreateOutboxTable( NpgsqlDataSource dataSource, @@ -55,19 +53,22 @@ CancellationToken ct private static string Randomise(string prefix) => $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; - protected static (TypeResolver typeResolver, TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + protected static (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( string connectionString, string eventsTable, - JsonTypeInfo info, + JsonSerializerContext info, + INamingPolicy namingPolicy, Action log, string? publicationName = null, string? slotName = null) where T : class { - var typeResolver = new TypeResolver(SourceGenerationContext.Default).WhiteList(); - var consumer = new TestConsumer(log, info); + var jsonTypeInfo = info.GetTypeInfo(typeof(T)); + ArgumentNullException.ThrowIfNull(jsonTypeInfo); + var consumer = new TestConsumer(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() .ConnectionString(connectionString) - .TypeResolver(typeResolver) + .JsonContext(info) + .NamingPolicy(namingPolicy) .Consumes>(consumer) .WithPublicationOptions( new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub"), TableName: eventsTable) @@ -75,7 +76,7 @@ protected static (TypeResolver typeResolver, TestConsumer consumer, Subscript .WithReplicationOptions( new ReplicationSlotManagement.ReplicationSlotSetupOptions(slotName ?? Randomise("events_slot")) ); - return (typeResolver, consumer, subscriptionOptionsBuilder); + return (consumer, subscriptionOptionsBuilder); } } diff --git a/src/Tests/SourceGenerationContext.cs b/src/Tests/PublisherContext.cs similarity index 53% rename from src/Tests/SourceGenerationContext.cs rename to src/Tests/PublisherContext.cs index 1620367..a35dd25 100644 --- a/src/Tests/SourceGenerationContext.cs +++ b/src/Tests/PublisherContext.cs @@ -4,18 +4,18 @@ namespace Tests; [MessageUrn("user-created:v1")] -internal record UserCreated( +internal record PublisherUserCreated( Guid Id, string Name ); [MessageUrn("user-deleted:v1")] -internal record UserDeleted( +internal record PublisherUserDeleted( Guid Id, string Name ); [JsonSourceGenerationOptions(WriteIndented = true)] -[JsonSerializable(typeof(UserCreated))] -[JsonSerializable(typeof(UserDeleted))] -internal partial class SourceGenerationContext: JsonSerializerContext{} +[JsonSerializable(typeof(PublisherUserCreated))] +[JsonSerializable(typeof(PublisherUserDeleted))] +internal partial class PublisherContext: JsonSerializerContext; diff --git a/src/Tests/SubscriberContext.cs b/src/Tests/SubscriberContext.cs new file mode 100644 index 0000000..9eece14 --- /dev/null +++ b/src/Tests/SubscriberContext.cs @@ -0,0 +1,14 @@ +using System.Text.Json.Serialization; +using Blumchen.Serialization; + +namespace Tests; + +[MessageUrn("user-created:v1")] +internal record SubscriberUserCreated( + Guid Id, + string Name +); + +[JsonSourceGenerationOptions(WriteIndented = true)] +[JsonSerializable(typeof(SubscriberUserCreated))] +internal partial class SubscriberContext: JsonSerializerContext; diff --git a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs index 898b0aa..5df08ed 100644 --- a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs +++ b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs @@ -1,6 +1,8 @@ +using System.Diagnostics; +using Blumchen.Publications; +using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.ReplicationMessageHandlers; -using Blumchen.Table; using Npgsql; using Xunit.Abstractions; @@ -12,22 +14,30 @@ public class When_First_Subscription_And_Table_Is_Empty(ITestOutputHelper testOu [Fact] public async Task Execute() { - var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2)); - var ct = cancellationTokenSource.Token; + var ct = TimeoutTokenSource().Token; var connectionString = Container.GetConnectionString(); var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + var sharedNamingPolicy = new AttributeNamingPolicy(); + var resolver = new PublisherSetupOptionsBuilder() + .JsonContext(PublisherContext.Default) + .NamingPolicy(sharedNamingPolicy) + .Build(); + //poison msg + await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); - var (typeResolver, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, - SourceGenerationContext.Default.UserCreated, testOutputHelper.WriteLine); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); + var @expected = new SubscriberUserCreated(@event.Id, @event.Name); + + await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); + + var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, + SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); - - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { - Assert.Equal(@event, ((OkEnvelope)envelope).Value); + Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; } } diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 2bfce80..bc0cbb4 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -1,7 +1,9 @@ +using System.Diagnostics; using Blumchen.Database; +using Blumchen.Publications; +using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.ReplicationMessageHandlers; -using Blumchen.Table; using Npgsql; using Npgsql.Replication; using Xunit.Abstractions; @@ -14,27 +16,32 @@ public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper [Fact] public async Task Execute() { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; - + var ct = TimeoutTokenSource().Token; + var sharedNamingPolicy = new AttributeNamingPolicy(); var connectionString = Container.GetConnectionString(); var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + var publisherResolver = new PublisherSetupOptionsBuilder() + .JsonContext(PublisherContext.Default) + .NamingPolicy(sharedNamingPolicy) + .Build(); var slotName = "subscription_test"; var publicationName = "publication_test"; await SetupReplication(connectionString, slotName, publicationName, eventsTable, ct); - var (typeResolver, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, - SourceGenerationContext.Default.UserCreated, testOutputHelper.WriteLine, publicationName, slotName); + var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, + SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine, publicationName: publicationName, slotName: slotName); + + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); + await MessageAppender.AppendAsync(eventsTable, @event, publisherResolver, connectionString, ct); - var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); + var @expected = new SubscriberUserCreated(@event.Id, @event.Name); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { - Assert.Equal(@event, ((OkEnvelope)envelope).Value); + Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; } } diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index 884c365..f5026e5 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -1,38 +1,45 @@ +using Blumchen.Publications; using Blumchen.Serialization; using Blumchen.Subscriptions; using Blumchen.Subscriptions.ReplicationMessageHandlers; -using Blumchen.Table; using Npgsql; using Xunit.Abstractions; namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper) : DatabaseFixture +public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] public async Task Execute() { - var cancellationTokenSource = new CancellationTokenSource(); - var ct = cancellationTokenSource.Token; + var ct = TimeoutTokenSource().Token; + var sharedNamingPolicy = new AttributeNamingPolicy(); var connectionString = Container.GetConnectionString(); var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + var resolver = new PublisherSetupOptionsBuilder() + .JsonContext(PublisherContext.Default) + .NamingPolicy(sharedNamingPolicy) + .Build(); + { + var @event1 = new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()); + await MessageAppender.AppendAsync(eventsTable, @event1, resolver, connectionString, ct); + } - var @event = new UserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()); - var typeResolver = new TypeResolver(SourceGenerationContext.Default).WhiteList(); - - await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); + await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); + var @expected = new SubscriberUserCreated(@event.Id, @event.Name); - var (_, testConsumer, subscriptionOptions) = - SetupFor(connectionString, eventsTable, SourceGenerationContext.Default.UserDeleted, testOutputHelper.WriteLine); + var ( _, subscriptionOptions) = + SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) { - Assert.Equal(@event, ((OkEnvelope)envelope).Value); + Assert.Equal(@expected, ((OkEnvelope)envelope).Value); return; } } From c4c239a882d7e5126c702316a0f01f2aeac4d7f7 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 25 Jun 2024 23:21:14 +0200 Subject: [PATCH 04/19] updated test deps --- src/Tests/Tests.csproj | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Tests/Tests.csproj b/src/Tests/Tests.csproj index 667d7cd..029d4bc 100644 --- a/src/Tests/Tests.csproj +++ b/src/Tests/Tests.csproj @@ -8,13 +8,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive From 1476e44ad71c02cb0d70d3eae16429bb2701e3c7 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 25 Jun 2024 23:24:59 +0200 Subject: [PATCH 05/19] Marked assembly as private --- src/Blumchen/Blumchen.csproj | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 7489b2b..43596e3 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -27,7 +27,11 @@ - + + all + none + all + From a4c83d61db374bdc010098c4a599f3faf06fce04 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 25 Jun 2024 23:25:28 +0200 Subject: [PATCH 06/19] updated assembly version --- src/Blumchen/Blumchen.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 43596e3..65103c3 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -1,7 +1,7 @@ - 0.0.1 + 0.1.0 net8.0 true true From 4256afa2464e800cd3fb31f1224b841cf25c58f9 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 26 Jun 2024 13:26:04 +0200 Subject: [PATCH 07/19] test case renamed --- src/Tests/When_First_Subscription_And_Table_Is_Empty.cs | 2 +- src/Tests/When_Subscription_Already_Exists.cs | 2 +- ...When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs index 5df08ed..423278b 100644 --- a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs +++ b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs @@ -12,7 +12,7 @@ namespace Tests; public class When_First_Subscription_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] - public async Task Execute() + public async Task Read_from_table_using_named_transaction_snapshot() { var ct = TimeoutTokenSource().Token; diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index bc0cbb4..32159ec 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -14,7 +14,7 @@ namespace Tests; public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] - public async Task Execute() + public async Task Read_from_transaction_log() { var ct = TimeoutTokenSource().Token; var sharedNamingPolicy = new AttributeNamingPolicy(); diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index f5026e5..a6cd1fd 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -8,10 +8,10 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_First_Subscription_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] - public async Task Execute() + public async Task Read_from_table_using_named_transaction_snapshot() { var ct = TimeoutTokenSource().Token; var sharedNamingPolicy = new AttributeNamingPolicy(); From 51eb9f828d7c31508ea33044b6fd85e65c58d863 Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 26 Jun 2024 15:38:38 +0200 Subject: [PATCH 08/19] cannot be null - not needed --- src/Blumchen/Subscriptions/Subscription.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index 9b274e6..d5bb7b2 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -129,7 +129,7 @@ private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionarymi.GetParameters().Any(pa => pa.ParameterType == objType)) + var methodInfo = methodInfos.SingleOrDefault(mi=>mi.GetParameters().Any(pa => pa.ParameterType == objType)) ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); return (consumer, methodInfo); } From 0410268704c22c939d69d53946e2ff5d20bd58cf Mon Sep 17 00:00:00 2001 From: giordanol Date: Wed, 26 Jun 2024 15:39:04 +0200 Subject: [PATCH 09/19] renamed variable --- src/Blumchen/Subscriptions/Subscription.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index d5bb7b2..e79e920 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -36,7 +36,7 @@ public async IAsyncEnumerable Subscribe( ) { _options = builder(Builder).Build(); - var (connectionString, publicationSetupOptions, slotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; + var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); dataSourceBuilder.UseLoggerFactory(loggerFactory); @@ -48,19 +48,19 @@ public async IAsyncEnumerable Subscribe( await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false); - var result = await dataSource.SetupReplicationSlot(_connection, slotSetupOptions, ct).ConfigureAwait(false); + var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false); PgOutputReplicationSlot slot; if (result is not Created created) { - slot = new PgOutputReplicationSlot(slotSetupOptions.SlotName); + slot = new PgOutputReplicationSlot(replicationSlotSetupOptions.SlotName); } else { slot = new PgOutputReplicationSlot( new ReplicationSlotOptions( - slotSetupOptions.SlotName, + replicationSlotSetupOptions.SlotName, created.LogSequenceNumber ) ); @@ -72,7 +72,7 @@ public async IAsyncEnumerable Subscribe( await foreach (var message in _connection.StartReplication(slot, - new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, slotSetupOptions.Binary), ct).ConfigureAwait(false)) + new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, replicationSlotSetupOptions.Binary), ct).ConfigureAwait(false)) { if (message is InsertMessage insertMessage) { From cf404b4f7b2bda328b6452b6bb5e2fbd462651df Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 08:21:11 +0200 Subject: [PATCH 10/19] =?UTF-8?q?moved=20demo=20apps=20a=C3=ACunder=20'dem?= =?UTF-8?q?o'=20folder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Blumchen.sln | 18 ++++++++++++------ README.md | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/Blumchen.sln b/Blumchen.sln index 0356752..a5020ed 100644 --- a/Blumchen.sln +++ b/Blumchen.sln @@ -1,4 +1,5 @@ -Microsoft Visual Studio Solution File, Format Version 12.00 + +Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.9.34622.214 MinimumVisualStudioVersion = 10.0.40219.1 @@ -11,8 +12,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen", "src\Blumchen\Bl EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{D0AB0FF4-C8A1-4B4B-A682-64F353A2D248}" ProjectSection(SolutionItems) = preProject - src\Directory.Build.props = src\Directory.Build.props .github\workflows\build.dotnet.yml = .github\workflows\build.dotnet.yml + src\Directory.Build.props = src\Directory.Build.props .github\workflows\publish-nuget.yml = .github\workflows\publish-nuget.yml EndProjectSection EndProject @@ -31,8 +32,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "src\Tests\Tests.cs EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "pgAdmin", "pgAdmin", "{C050E9E8-3FB6-4581-953F-31826E385FB4}" ProjectSection(SolutionItems) = preProject - docker\pgAdmin\servers.json = docker\pgAdmin\servers.json docker\pgAdmin\pgpass = docker\pgAdmin\pgpass + docker\pgAdmin\servers.json = docker\pgAdmin\servers.json EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8AAAA344-B5FD-48D9-B2BA-379E374448D4}" @@ -40,6 +41,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A docker\postgres\init.sql = docker\postgres\init.sql EndProjectSection EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -70,11 +73,14 @@ Global GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection - GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} - EndGlobalSection GlobalSection(NestedProjects) = preSolution + {06A60918-32EB-4C38-B369-7E1D76B809A0} = {A4044484-FE08-4399-8239-14AABFA30AD7} + {F2878625-0919-4C26-8DC9-58CD8FA34050} = {A4044484-FE08-4399-8239-14AABFA30AD7} + {F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7} {C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} {8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} + EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 9d3c668..48856b2 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr ```shell docker-compose up ``` -2. Run(order doesn't matter) Publisher and Subscriber apps from vs-studio and follow Publisher instructions. +2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions. ## Testing (against default docker instance) From 933e504a3b844b6a1c6bbd46d141be63f84fcdb3 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 08:25:39 +0200 Subject: [PATCH 11/19] tests renamed --- ...s => When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs} | 2 +- .../When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename src/Tests/{When_First_Subscription_And_Table_Is_Empty.cs => When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs} (93%) diff --git a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs similarity index 93% rename from src/Tests/When_First_Subscription_And_Table_Is_Empty.cs rename to src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 423278b..7adef76 100644 --- a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -9,7 +9,7 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_First_Subscription_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_Subscription_Does_Not_Exist_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] public async Task Read_from_table_using_named_transaction_snapshot() diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index a6cd1fd..e5b3866 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -8,7 +8,7 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_First_Subscription_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture { [Fact] public async Task Read_from_table_using_named_transaction_snapshot() From 10a6eec884b411e3485afb9722cbf7b407959f2f Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 08:31:18 +0200 Subject: [PATCH 12/19] verify subscriber does not receive massegas different by the ones specified in subscriptions --- src/Tests/When_Subscription_Already_Exists.cs | 3 +++ ...n_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 7 +++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 32159ec..71cc293 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -32,6 +32,9 @@ public async Task Read_from_transaction_log() var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine, publicationName: publicationName, slotName: slotName); + //subscriber ignored msg + await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), publisherResolver, connectionString, ct); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, publisherResolver, connectionString, ct); diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index e5b3866..d57f339 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -22,10 +22,9 @@ public async Task Read_from_table_using_named_transaction_snapshot() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) .Build(); - { - var @event1 = new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()); - await MessageAppender.AppendAsync(eventsTable, @event1, resolver, connectionString, ct); - } + + //subscriber ignored msg + await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); From 7ecef6478c0ccd26e0926acffc573cd9725d95d0 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:00:32 +0200 Subject: [PATCH 13/19] mark CreatePublication internal to expose to testing suite --- src/Blumchen/Blumchen.csproj | 5 +++ .../Management/PublicationManagement.cs | 2 +- src/Tests/When_Subscription_Already_Exists.cs | 31 ++++--------------- ...ption_Does_Not_Exist_And_Table_Is_Empty.cs | 2 +- 4 files changed, 13 insertions(+), 27 deletions(-) diff --git a/src/Blumchen/Blumchen.csproj b/src/Blumchen/Blumchen.csproj index 65103c3..4e6b52c 100644 --- a/src/Blumchen/Blumchen.csproj +++ b/src/Blumchen/Blumchen.csproj @@ -25,6 +25,11 @@ snupkg Blumchen + + + <_Parameter1>Tests + + diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index d941a92..f4f2f4d 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -63,7 +63,7 @@ CancellationToken ct } } - private static Task CreatePublication( + internal static Task CreatePublication( this NpgsqlDataSource dataSource, string publicationName, string tableName, diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 71cc293..bf24d0e 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -1,11 +1,9 @@ -using System.Diagnostics; -using Blumchen.Database; using Blumchen.Publications; using Blumchen.Serialization; using Blumchen.Subscriptions; +using Blumchen.Subscriptions.Management; using Blumchen.Subscriptions.ReplicationMessageHandlers; using Npgsql; -using Npgsql.Replication; using Xunit.Abstractions; namespace Tests; @@ -19,14 +17,17 @@ public async Task Read_from_transaction_log() var ct = TimeoutTokenSource().Token; var sharedNamingPolicy = new AttributeNamingPolicy(); var connectionString = Container.GetConnectionString(); - var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + var dataSource = NpgsqlDataSource.Create(connectionString); + var eventsTable = await CreateOutboxTable(dataSource, ct); var publisherResolver = new PublisherSetupOptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) .Build(); var slotName = "subscription_test"; var publicationName = "publication_test"; - await SetupReplication(connectionString, slotName, publicationName, eventsTable, ct); + + + await dataSource.CreatePublication(publicationName, eventsTable, new HashSet{"urn:message:user-created:v1"}, ct); var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, @@ -48,24 +49,4 @@ public async Task Read_from_transaction_log() return; } } - - private static async Task SetupReplication( - string connectionString, - string slotName, - string publicationName, - string tableName, - CancellationToken ct) - { - var dataSource = NpgsqlDataSource.Create(connectionString); - await using var source = dataSource.ConfigureAwait(false); - await dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct).ConfigureAwait(false); - var connection = new LogicalReplicationConnection(connectionString); - await using var connection1 = connection.ConfigureAwait(false); - await connection.Open(ct).ConfigureAwait(false); - await connection.CreatePgOutputReplicationSlot( - slotName, - slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, - cancellationToken: ct - ).ConfigureAwait(false); - } } diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 7adef76..6bb64fb 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -23,7 +23,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) .Build(); - //poison msg + //subscriber ignored msg await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); From d4a9dc6643c418febd2564f0f038f7cbb11f6738 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:42:07 +0200 Subject: [PATCH 14/19] to explicit yield --- src/Blumchen/Database/Run.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index 59f55a8..ef4f3d3 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -71,6 +71,7 @@ internal static async IAsyncEnumerable QueryTransactionSnapshot(this while (await reader.ReadAsync(ct).ConfigureAwait(false)) yield return await dataMapper.ReadFromSnapshot(reader, ct).ConfigureAwait(false); + yield break; static string PublicationFilter(ICollection input) => string.Join(", ", input.Select(s => $"'{s}'")); } From 6a211f096d02d7d96776a6030d4f50103efb2ba4 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:42:34 +0200 Subject: [PATCH 15/19] norrowed method scope --- src/Blumchen/Database/Run.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index ef4f3d3..d1f0411 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -10,7 +10,7 @@ namespace Blumchen.Database; public static class Run { - public static async Task Execute( + private static async Task Execute( this NpgsqlDataSource dataSource, string sql, CancellationToken ct) From df63f297da04b24447e29690cac91ac3c0607f87 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:44:41 +0200 Subject: [PATCH 16/19] expose testoutputhelper to base class --- src/Tests/DatabaseFixture.cs | 5 ++++- src/Tests/When_Subscription_Already_Exists.cs | 2 +- .../When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs | 2 +- ...hen_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index d538a1a..198d2b1 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -7,12 +7,15 @@ using Blumchen.Subscriptions.Management; using Npgsql; using Testcontainers.PostgreSql; +using Xunit.Abstractions; +using Xunit.Sdk; namespace Tests; -public abstract class DatabaseFixture: IAsyncLifetime +public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime { + protected ITestOutputHelper Output { get; } = output; protected readonly Func TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2)); protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class { diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index bf24d0e..65e6fc5 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -9,7 +9,7 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] public async Task Read_from_transaction_log() diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 6bb64fb..edec49e 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -9,7 +9,7 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_Subscription_Does_Not_Exist_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] public async Task Read_from_table_using_named_transaction_snapshot() diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index d57f339..c6af1f3 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -8,7 +8,7 @@ namespace Tests; // ReSharper disable once InconsistentNaming -public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture(testOutputHelper) { [Fact] public async Task Read_from_table_using_named_transaction_snapshot() From ed4a262cbcaa845a92c62d3879e054c34fb97eb8 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:46:06 +0200 Subject: [PATCH 17/19] inject error handler to trace poisoining messages --- src/Tests/DatabaseFixture.cs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index 198d2b1..a08da09 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -56,7 +56,17 @@ CancellationToken ct private static string Randomise(string prefix) => $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; - protected static (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + protected static async Task InsertPoisoningMessage(string connectionString, string eventsTable, CancellationToken ct) + { + var connection = new NpgsqlConnection(connectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.OpenAsync(ct); + var command = connection.CreateCommand(); + command.CommandText = $"INSERT INTO {eventsTable}(message_type, data) values ('urn:message:user-created:v1', '{{\"prop\":\"some faking text\"}}')"; + await command.ExecuteNonQueryAsync(ct); + } + + protected (TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( string connectionString, string eventsTable, JsonSerializerContext info, @@ -69,6 +79,7 @@ protected static (TestConsumer consumer, SubscriptionOptionsBuilder subscript ArgumentNullException.ThrowIfNull(jsonTypeInfo); var consumer = new TestConsumer(log, jsonTypeInfo); var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() + .WithErrorProcessor(new TestOutErrorProcessor(Output)) .ConnectionString(connectionString) .JsonContext(info) .NamingPolicy(namingPolicy) @@ -82,4 +93,12 @@ protected static (TestConsumer consumer, SubscriptionOptionsBuilder subscript return (consumer, subscriptionOptionsBuilder); } + private sealed record TestOutErrorProcessor(ITestOutputHelper Output): IErrorProcessor + { + public Func Process => exception => + { + Output.WriteLine($"record id:{0} resulted in error:{exception.Message}"); + return Task.CompletedTask; + }; + } } From 0a98cf874d9224e5e13cb8dcf900616f40935636 Mon Sep 17 00:00:00 2001 From: giordanol Date: Mon, 1 Jul 2024 09:51:22 +0200 Subject: [PATCH 18/19] enforce required attrebutes on deserialization to catch invalid data --- src/Tests/SubscriberContext.cs | 12 +++++++++--- src/Tests/When_Subscription_Already_Exists.cs | 5 ++++- ...Subscription_Does_Not_Exist_And_Table_Is_Empty.cs | 5 ++++- ...cription_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 5 ++++- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/Tests/SubscriberContext.cs b/src/Tests/SubscriberContext.cs index 9eece14..892d0a8 100644 --- a/src/Tests/SubscriberContext.cs +++ b/src/Tests/SubscriberContext.cs @@ -1,13 +1,19 @@ using System.Text.Json.Serialization; using Blumchen.Serialization; +// ReSharper disable All namespace Tests; [MessageUrn("user-created:v1")] internal record SubscriberUserCreated( - Guid Id, - string Name -); + Guid Id, + string Name +) +{ + //JsonRequired is here a Guard clause to prevent null data + [JsonRequired] public Guid Id { get; init; } = Id; + [JsonRequired] public string Name { get; init; } = Name; +}; [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(SubscriberUserCreated))] diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 65e6fc5..495a579 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -31,11 +31,14 @@ public async Task Read_from_transaction_log() var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, - SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine, publicationName: publicationName, slotName: slotName); + SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine, publicationName: publicationName, slotName: slotName); //subscriber ignored msg await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), publisherResolver, connectionString, ct); + //poison message + await InsertPoisoningMessage(connectionString, eventsTable, ct); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, publisherResolver, connectionString, ct); diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index edec49e..5fbd24a 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -26,13 +26,16 @@ public async Task Read_from_table_using_named_transaction_snapshot() //subscriber ignored msg await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); + //poison message + await InsertPoisoningMessage(connectionString, eventsTable, ct); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); var @expected = new SubscriberUserCreated(@event.Id, @event.Name); await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, - SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine); + SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false)) diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index c6af1f3..574bb8e 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -26,13 +26,16 @@ public async Task Read_from_table_using_named_transaction_snapshot() //subscriber ignored msg await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); + //poison message + await InsertPoisoningMessage(connectionString, eventsTable, ct); + var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); var @expected = new SubscriberUserCreated(@event.Id, @event.Name); var ( _, subscriptionOptions) = - SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, testOutputHelper.WriteLine); + SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); var subscription = new Subscription(); await using var subscription1 = subscription.ConfigureAwait(false); From 3e149550c58bb4a1fd2ca9cdc7e4939eb7d38fb4 Mon Sep 17 00:00:00 2001 From: giordanol Date: Tue, 2 Jul 2024 10:48:30 +0200 Subject: [PATCH 19/19] Allowed message table customization: table name, columns names and dimension of varchar column --- src/Blumchen/Database/Run.cs | 21 ++--- src/Blumchen/MessageTableOptions.cs | 76 ++++++++++++++++ src/Blumchen/Publications/MessageAppender.cs | 90 ++++++++++--------- .../PublisherSetupOptionsBuilder.cs | 15 +++- .../Management/PublicationManagement.cs | 19 ++-- .../Management/ReplicationSlotManagement.cs | 2 +- src/Blumchen/Subscriptions/MimeType.cs | 7 ++ .../SnapshotReader/SnapshotReader.cs | 4 +- src/Blumchen/Subscriptions/Subscription.cs | 4 +- .../SubscriptionOptionsBuilder.cs | 17 +++- src/Publisher/Program.cs | 8 +- src/Subscriber/Program.cs | 5 ++ src/Tests/DatabaseFixture.cs | 8 +- src/Tests/When_Subscription_Already_Exists.cs | 16 ++-- ...ption_Does_Not_Exist_And_Table_Is_Empty.cs | 7 +- ...n_Does_Not_Exist_And_Table_Is_Not_Empty.cs | 5 +- 16 files changed, 208 insertions(+), 96 deletions(-) create mode 100644 src/Blumchen/MessageTableOptions.cs create mode 100644 src/Blumchen/Subscriptions/MimeType.cs diff --git a/src/Blumchen/Database/Run.cs b/src/Blumchen/Database/Run.cs index d1f0411..4e03b17 100644 --- a/src/Blumchen/Database/Run.cs +++ b/src/Blumchen/Database/Run.cs @@ -19,17 +19,8 @@ private static async Task Execute( await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } - public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, string tableName, CancellationToken ct) - { - var sql = @$" - CREATE TABLE IF NOT EXISTS {tableName} ( - id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - message_type VARCHAR(250) NOT NULL, - data JSONB NOT NULL - ); - "; - await dataSource.Execute(sql, ct).ConfigureAwait(false); - } + public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, TableDescriptorBuilder.MessageTable tableDescriptor, CancellationToken ct) + => await dataSource.Execute(tableDescriptor.ToString(), ct).ConfigureAwait(false); public static async Task Exists( this NpgsqlDataSource dataSource, @@ -49,7 +40,7 @@ public static async Task Exists( internal static async IAsyncEnumerable QueryTransactionSnapshot(this NpgsqlConnection connection, string snapshotName, - string tableName, + TableDescriptorBuilder.MessageTable tableDescriptor, ISet registeredTypesKeys, IReplicationDataMapper dataMapper, [EnumeratorCancellation] CancellationToken ct) @@ -62,9 +53,9 @@ internal static async IAsyncEnumerable QueryTransactionSnapshot(this await using var command1 = command.ConfigureAwait(false); await command.ExecuteScalarAsync(ct).ConfigureAwait(false); var whereClause = registeredTypesKeys.Count > 0 - ? $" WHERE message_type IN({PublicationFilter(registeredTypesKeys)})" + ? $" WHERE {tableDescriptor.MessageType.Name} IN({PublicationFilter(registeredTypesKeys)})" : null; - var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}{whereClause}", connection, transaction); + var cmd = new NpgsqlCommand($"SELECT * FROM {tableDescriptor.Name}{whereClause}", connection, transaction); await using var cmd1 = cmd.ConfigureAwait(false); var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); await using var reader1 = reader.ConfigureAwait(false); @@ -76,3 +67,5 @@ internal static async IAsyncEnumerable QueryTransactionSnapshot(this static string PublicationFilter(ICollection input) => string.Join(", ", input.Select(s => $"'{s}'")); } } + + diff --git a/src/Blumchen/MessageTableOptions.cs b/src/Blumchen/MessageTableOptions.cs new file mode 100644 index 0000000..a7fe0b1 --- /dev/null +++ b/src/Blumchen/MessageTableOptions.cs @@ -0,0 +1,76 @@ +using Blumchen.Subscriptions; +using NpgsqlTypes; + +namespace Blumchen; + +#pragma warning disable CS1591 +public record TableDescriptorBuilder +{ + private MessageTable TableDescriptor { get; set; } = new(); + + public MessageTable Build() => TableDescriptor.Build(); + + public TableDescriptorBuilder Name(string eventsTable) + { + TableDescriptor = new MessageTable(eventsTable); + return this; + } + + public TableDescriptorBuilder Id(string name) + { + TableDescriptor = TableDescriptor with { Id = new Column.Id(name) }; + return this; + } + + public TableDescriptorBuilder MessageData(string name, MimeType mime) + { + TableDescriptor = TableDescriptor with { Data = new Column.Data(name), MimeType = mime }; + return this; + } + + public TableDescriptorBuilder MessageType(string name, int dimension = 250) + { + TableDescriptor = TableDescriptor with { MessageType = new Column.MessageType(name, dimension) }; + return this; + } + + public record MessageTable(string Name = MessageTable.DefaultName) + { + internal const string DefaultName = "outbox"; + public Column.Id Id { get; internal init; } = Column.Id.Default(); + public Column.MessageType MessageType { get; internal init; } = Column.MessageType.Default(); + public Column.Data Data { get; internal init; } = Column.Data.Default(); + public MimeType MimeType { get; internal init; } = new MimeType.Json(); + public MessageTable Build() => this; + + public override string ToString() => @$" + CREATE TABLE IF NOT EXISTS {Name} ( + {Id} PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + {MessageType} NOT NULL, + {Data} NOT NULL + );"; + } + + public record Column(string Name, NpgsqlDbType Type) + { + public override string ToString() => $"{Name} {Type}"; + + public record Id(string Name): Column(Name, NpgsqlDbType.Bigint) + { + public override string ToString() => base.ToString(); + internal static readonly Func Default = () => new("id"); + } + + public record MessageType(string Name, int Dimension): Column(Name, NpgsqlDbType.Varchar) + { + internal static readonly Func Default = () => new("message_type", 250); + public override string ToString() => $"{base.ToString()}({Dimension})"; + } + + public record Data(string Name): Column(Name, NpgsqlDbType.Jsonb) + { + internal static readonly Func Default = () => new("data"); + public override string ToString() => base.ToString(); + } + } +} diff --git a/src/Blumchen/Publications/MessageAppender.cs b/src/Blumchen/Publications/MessageAppender.cs index 7485006..623c8b4 100644 --- a/src/Blumchen/Publications/MessageAppender.cs +++ b/src/Blumchen/Publications/MessageAppender.cs @@ -7,41 +7,64 @@ namespace Blumchen.Publications; public static class MessageAppender { - - public static async Task AppendAsync(string tableName, T @event, IJsonTypeResolver resolver, string connectionString, CancellationToken ct) - where T: class - { - var type = typeof(T); - var (typeName, jsonTypeInfo) = resolver.Resolve(type); - var data = JsonSerialization.ToJson(@event, jsonTypeInfo); - - var connection = new NpgsqlConnection(connectionString); - await using var connection1 = connection.ConfigureAwait(false); - await connection.OpenAsync(ct).ConfigureAwait(false); - var command = connection.CreateCommand(); - command.CommandText = $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; - await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); - } - - public static async Task AppendAsync(string tableName, T @input, IJsonTypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct) - where T : class + public static async Task AppendAsync(T @input + , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver + , NpgsqlConnection connection + , NpgsqlTransaction transaction + , CancellationToken ct + ) where T : class { switch (@input) { case null: throw new ArgumentNullException(nameof(@input)); case IEnumerable inputs: - await AppendBatchAsyncOfT(tableName, inputs, resolver, connection, transaction, ct).ConfigureAwait(false); + await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; default: - await AppendAsyncOfT(tableName, input, resolver, connection, transaction, ct).ConfigureAwait(false); + await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false); break; } } - private static async Task AppendBatchAsyncOfT( - string tableName - , T inputs + private static async Task AppendAsyncOfT(T input + , TableDescriptorBuilder.MessageTable tableDescriptor + , IJsonTypeResolver typeResolver + , NpgsqlConnection connection + , NpgsqlTransaction transaction + , CancellationToken ct) where T : class + { + var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T)); + var data = JsonSerialization.ToJson(@input, jsonTypeInfo); + var command = new NpgsqlCommand( + $"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')", + connection, + transaction + ); + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } + + public static async Task AppendAsync(T input + , (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options + , string connectionString + , CancellationToken ct) + where T: class + { + var type = typeof(T); + var (typeName, jsonTypeInfo) = options.resolver.Resolve(type); + var data = JsonSerialization.ToJson(input, jsonTypeInfo); + + var connection = new NpgsqlConnection(connectionString); + await using var connection1 = connection.ConfigureAwait(false); + await connection.OpenAsync(ct).ConfigureAwait(false); + var command = connection.CreateCommand(); + command.CommandText = + $"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; + await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } + + private static async Task AppendBatchAsyncOfT(T inputs + , TableDescriptorBuilder.MessageTable tableDescriptor , IJsonTypeResolver resolver , NpgsqlConnection connection , NpgsqlTransaction transaction @@ -56,28 +79,9 @@ string tableName batchCommand.CommandText = - $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; + $"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')"; batch.BatchCommands.Add(batchCommand); } await batch.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } - - private static async Task AppendAsyncOfT( - string tableName - , T @input - , IJsonTypeResolver resolver - , NpgsqlConnection connection - , NpgsqlTransaction transaction - , CancellationToken ct) where T : class - { - - var (typeName, jsonTypeInfo) = resolver.Resolve(typeof(T)); - var data = JsonSerialization.ToJson(@input, jsonTypeInfo); - var command = new NpgsqlCommand( - $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')", - connection, - transaction - ); - await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); - } } diff --git a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs index 6c811f2..90e7792 100644 --- a/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs +++ b/src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs @@ -1,6 +1,7 @@ using System.Text.Json.Serialization; using Blumchen.Serialization; using JetBrains.Annotations; +using static Blumchen.TableDescriptorBuilder; namespace Blumchen.Publications; @@ -9,6 +10,8 @@ public class PublisherSetupOptionsBuilder { private INamingPolicy? _namingPolicy; private JsonSerializerContext? _jsonSerializerContext; + private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); + private MessageTable? _tableDescriptor; [UsedImplicitly] public PublisherSetupOptionsBuilder NamingPolicy(INamingPolicy namingPolicy) @@ -24,11 +27,19 @@ public PublisherSetupOptionsBuilder JsonContext(JsonSerializerContext jsonSerial return this; } - public IJsonTypeResolver Build() + [UsedImplicitly] + public PublisherSetupOptionsBuilder WithTable(Func builder) + { + _tableDescriptor = builder(TableDescriptorBuilder).Build(); + return this; + } + + public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build() { ArgumentNullException.ThrowIfNull(_jsonSerializerContext); ArgumentNullException.ThrowIfNull(_namingPolicy); + _tableDescriptor ??= TableDescriptorBuilder.Build(); var jsonTypeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); using var typeEnum = _jsonSerializerContext.GetType() .GetCustomAttributesData() @@ -38,6 +49,6 @@ public IJsonTypeResolver Build() while (typeEnum.MoveNext()) jsonTypeResolver.WhiteList(typeEnum.Current); - return jsonTypeResolver; + return (_tableDescriptor,jsonTypeResolver); } } diff --git a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs index f4f2f4d..64e6033 100644 --- a/src/Blumchen/Subscriptions/Management/PublicationManagement.cs +++ b/src/Blumchen/Subscriptions/Management/PublicationManagement.cs @@ -17,14 +17,14 @@ public static async Task SetupPublication( CancellationToken ct ) { - var (publicationName, tableName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver) = setupOptions; + var (publicationName, createStyle, shouldReAddTablesIfWereRecreated, typeResolver, tableDescription) = setupOptions; return createStyle switch { Subscription.CreateStyle.Never => new None(), - Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), - Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), - Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, typeResolver, ct).ConfigureAwait(false), + Subscription.CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false), + Subscription.CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct).ConfigureAwait(false) => await Refresh(dataSource, publicationName, tableDescription.Name, shouldReAddTablesIfWereRecreated, ct).ConfigureAwait(false), + Subscription.CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableDescription.Name, typeResolver, ct).ConfigureAwait(false), _ => throw new ArgumentOutOfRangeException(nameof(setupOptions.CreateStyle)) }; @@ -143,27 +143,28 @@ public record Created: SetupPublicationResult; public sealed record PublicationSetupOptions( string PublicationName = PublicationSetupOptions.DefaultPublicationName, - string TableName = PublicationSetupOptions.DefaultTableName, Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, bool ShouldReAddTablesIfWereRecreated = false ) { - internal const string DefaultTableName = "outbox"; internal const string DefaultPublicationName = "pub"; internal JsonTypeResolver? TypeResolver { get; init; } = default; + internal TableDescriptorBuilder.MessageTable TableDescriptor { get; init; } = new TableDescriptorBuilder().Build(); + internal void Deconstruct( out string publicationName, - out string tableName, out Subscription.CreateStyle createStyle, out bool reAddTablesIfWereRecreated, - out JsonTypeResolver? typeResolver) + out JsonTypeResolver? typeResolver, + out TableDescriptorBuilder.MessageTable tableDescription) { publicationName = PublicationName; - tableName = TableName; createStyle = Subscription.CreateStyle.WhenNotExists; reAddTablesIfWereRecreated = ShouldReAddTablesIfWereRecreated; typeResolver = TypeResolver; + tableDescription = TableDescriptor; } + } } diff --git a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs index 22e1226..4595dd4 100644 --- a/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs +++ b/src/Blumchen/Subscriptions/Management/ReplicationSlotManagement.cs @@ -61,7 +61,7 @@ CancellationToken ct ) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct); public record ReplicationSlotSetupOptions( - string SlotName = $"{PublicationManagement.PublicationSetupOptions.DefaultTableName}_slot", + string SlotName = $"{TableDescriptorBuilder.MessageTable.DefaultName}_slot", Subscription.CreateStyle CreateStyle = Subscription.CreateStyle.WhenNotExists, bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY ); diff --git a/src/Blumchen/Subscriptions/MimeType.cs b/src/Blumchen/Subscriptions/MimeType.cs new file mode 100644 index 0000000..8bb3ef9 --- /dev/null +++ b/src/Blumchen/Subscriptions/MimeType.cs @@ -0,0 +1,7 @@ +namespace Blumchen.Subscriptions; + +#pragma warning disable CS1591 +public abstract record MimeType(string mimeType) +{ + public record Json(): MimeType("application/json"); +} diff --git a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs index 4bf4b5f..5603e79 100644 --- a/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/Blumchen/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -11,14 +11,14 @@ public static class SnapshotReader { internal static async IAsyncEnumerable GetRowsFromSnapshot(this NpgsqlConnection connection, string snapshotName, - string tableName, + TableDescriptorBuilder.MessageTable tableDescriptor, IReplicationDataMapper dataMapper, ISet registeredTypes, [EnumeratorCancellation] CancellationToken ct = default) { await foreach (var @event in connection.QueryTransactionSnapshot( snapshotName, - tableName, + tableDescriptor, registeredTypes, dataMapper, ct).ConfigureAwait(false)) diff --git a/src/Blumchen/Subscriptions/Subscription.cs b/src/Blumchen/Subscriptions/Subscription.cs index e79e920..95854af 100644 --- a/src/Blumchen/Subscriptions/Subscription.cs +++ b/src/Blumchen/Subscriptions/Subscription.cs @@ -41,7 +41,7 @@ public async IAsyncEnumerable Subscribe( dataSourceBuilder.UseLoggerFactory(loggerFactory); var dataSource = dataSourceBuilder.Build(); - await dataSource.EnsureTableExists(publicationSetupOptions.TableName, ct).ConfigureAwait(false); + await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false); _connection = new LogicalReplicationConnection(connectionString); await _connection.Open(ct).ConfigureAwait(false); @@ -145,7 +145,7 @@ private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( await using var connection1 = connection.ConfigureAwait(false); await foreach (var row in connection.GetRowsFromSnapshot( snapshotName, - options.PublicationOptions.TableName, + options.PublicationOptions.TableDescriptor, options.DataMapper, options.PublicationOptions.TypeResolver.Keys().ToHashSet(), ct).ConfigureAwait(false)) diff --git a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs index eadbd1e..13a97d8 100644 --- a/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs +++ b/src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs @@ -6,7 +6,6 @@ namespace Blumchen.Subscriptions; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member - public sealed class SubscriptionOptionsBuilder { private static string? _connectionString; @@ -17,6 +16,8 @@ public sealed class SubscriptionOptionsBuilder private IErrorProcessor? _errorProcessor; private INamingPolicy? _namingPolicy; private JsonSerializerContext? _jsonSerializerContext; + private static readonly TableDescriptorBuilder TableDescriptorBuilder = new(); + private TableDescriptorBuilder.MessageTable? _messageTable; static SubscriptionOptionsBuilder() @@ -27,6 +28,15 @@ static SubscriptionOptionsBuilder() _dataMapper = default; } + + [UsedImplicitly] + public SubscriptionOptionsBuilder WithTable( + Func builder) + { + _messageTable = builder(TableDescriptorBuilder).Build(); + return this; + } + [UsedImplicitly] public SubscriptionOptionsBuilder ConnectionString(string connectionString) { @@ -52,7 +62,7 @@ public SubscriptionOptionsBuilder JsonContext(JsonSerializerContext jsonSerializ public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.PublicationSetupOptions publicationOptions) { _publicationSetupOptions = - publicationOptions with { TypeResolver = _publicationSetupOptions.TypeResolver }; + publicationOptions with { TypeResolver = _publicationSetupOptions.TypeResolver}; return this; } @@ -82,11 +92,12 @@ internal ISubscriptionOptions Build() { ArgumentNullException.ThrowIfNull(_connectionString); ArgumentNullException.ThrowIfNull(_jsonSerializerContext); + ArgumentNullException.ThrowIfNull(_messageTable); var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy); foreach (var type in _registry.Keys) typeResolver.WhiteList(type); _dataMapper = new ReplicationDataMapper(typeResolver); - _publicationSetupOptions = _publicationSetupOptions with { TypeResolver = typeResolver }; + _publicationSetupOptions = _publicationSetupOptions with { TypeResolver = typeResolver, TableDescriptor = _messageTable}; Ensure(() =>_registry.Keys.Except(_publicationSetupOptions.TypeResolver.Values()), "Unregistered types:{0}"); Ensure(() => _publicationSetupOptions.TypeResolver.Values().Except(_registry.Keys), "Unregistered consumer for type:{0}"); diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs index 02ad312..dae2987 100644 --- a/src/Publisher/Program.cs +++ b/src/Publisher/Program.cs @@ -44,13 +44,13 @@ switch (@event) { case UserCreated m: - await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); + await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); break; case UserDeleted m: - await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); + await MessageAppender.AppendAsync( m, resolver, connection, transaction, ct).ConfigureAwait(false); break; case UserModified m: - await MessageAppender.AppendAsync("outbox", m, resolver, connection, transaction, ct).ConfigureAwait(false); + await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false); break; } @@ -71,7 +71,7 @@ // { // var @events = Enumerable.Range(0, result) // .Select(i1 => new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString())); - // await EventsAppender.AppendAsync("outbox", @events, resolver, connection, transaction, ct); + // await MessageAppender.AppendAsync(@events, resolver, connection, transaction, ct); // } // catch (Exception e) // { diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 6d9a473..c023ac0 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -23,6 +23,11 @@ var cursor = subscription.Subscribe( builder => builder .ConnectionString(Settings.ConnectionString) + .WithTable(options => options + .Id("id") + .MessageType("message_type") + .MessageData("data", new MimeType.Json()) + ) .NamingPolicy(new AttributeNamingPolicy()) .JsonContext(SourceGenerationContext.Default) .Consumes(consumer) diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index a08da09..fc71e14 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using System.Text.Json.Serialization; using System.Text.Json.Serialization.Metadata; +using Blumchen; using Blumchen.Database; using Blumchen.Serialization; using Blumchen.Subscriptions; @@ -8,7 +9,6 @@ using Npgsql; using Testcontainers.PostgreSql; using Xunit.Abstractions; -using Xunit.Sdk; namespace Tests; @@ -48,7 +48,8 @@ CancellationToken ct { var tableName = Randomise("outbox"); - await dataSource.EnsureTableExists(tableName, ct).ConfigureAwait(false); + var tableDesc = new TableDescriptorBuilder().Name(tableName).Build(); + await dataSource.EnsureTableExists(tableDesc, ct).ConfigureAwait(false); return tableName; } @@ -84,8 +85,9 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri .JsonContext(info) .NamingPolicy(namingPolicy) .Consumes>(consumer) + .WithTable(o => o.Name(eventsTable)) .WithPublicationOptions( - new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub"), TableName: eventsTable) + new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub")) ) .WithReplicationOptions( new ReplicationSlotManagement.ReplicationSlotSetupOptions(slotName ?? Randomise("events_slot")) diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs index 495a579..4575b69 100644 --- a/src/Tests/When_Subscription_Already_Exists.cs +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -19,28 +19,28 @@ public async Task Read_from_transaction_log() var connectionString = Container.GetConnectionString(); var dataSource = NpgsqlDataSource.Create(connectionString); var eventsTable = await CreateOutboxTable(dataSource, ct); - var publisherResolver = new PublisherSetupOptionsBuilder() + var opts = new PublisherSetupOptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) + .WithTable(o => o.Name(eventsTable)) .Build(); var slotName = "subscription_test"; var publicationName = "publication_test"; - - + await dataSource.CreatePublication(publicationName, eventsTable, new HashSet{"urn:message:user-created:v1"}, ct); - - - var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, + + var (_, subscriptionOptions) = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine, publicationName: publicationName, slotName: slotName); //subscriber ignored msg - await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), publisherResolver, connectionString, ct); + await MessageAppender.AppendAsync( + new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), opts, connectionString, ct); //poison message await InsertPoisoningMessage(connectionString, eventsTable, ct); var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await MessageAppender.AppendAsync(eventsTable, @event, publisherResolver, connectionString, ct); + await MessageAppender.AppendAsync(@event, opts, connectionString, ct); var @expected = new SubscriberUserCreated(@event.Id, @event.Name); diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs index 5fbd24a..fd378d2 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs @@ -1,4 +1,4 @@ -using System.Diagnostics; +using Blumchen; using Blumchen.Publications; using Blumchen.Serialization; using Blumchen.Subscriptions; @@ -22,9 +22,10 @@ public async Task Read_from_table_using_named_transaction_snapshot() var resolver = new PublisherSetupOptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) + .WithTable(o => o.Name(eventsTable)) .Build(); //subscriber ignored msg - await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); + await MessageAppender.AppendAsync(new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); //poison message await InsertPoisoningMessage(connectionString, eventsTable, ct); @@ -32,7 +33,7 @@ public async Task Read_from_table_using_named_transaction_snapshot() var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); var @expected = new SubscriberUserCreated(@event.Id, @event.Name); - await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); + await MessageAppender.AppendAsync(@event, resolver, connectionString, ct); var ( _, subscriptionOptions) = SetupFor(connectionString, eventsTable, SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine); diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs index 574bb8e..6422396 100644 --- a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -21,16 +21,17 @@ public async Task Read_from_table_using_named_transaction_snapshot() var resolver = new PublisherSetupOptionsBuilder() .JsonContext(PublisherContext.Default) .NamingPolicy(sharedNamingPolicy) + .WithTable(o => o.Name(eventsTable)) .Build(); //subscriber ignored msg - await MessageAppender.AppendAsync(eventsTable, new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); + await MessageAppender.AppendAsync( new PublisherUserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()), resolver, connectionString, ct); //poison message await InsertPoisoningMessage(connectionString, eventsTable, ct); var @event = new PublisherUserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); - await MessageAppender.AppendAsync(eventsTable, @event, resolver, connectionString, ct); + await MessageAppender.AppendAsync(@event, resolver, connectionString, ct); var @expected = new SubscriberUserCreated(@event.Id, @event.Name);