diff --git a/.github/workflows/build.dotnet.yml b/.github/workflows/build.dotnet.yml index 0444b22..d7faf26 100644 --- a/.github/workflows/build.dotnet.yml +++ b/.github/workflows/build.dotnet.yml @@ -20,7 +20,7 @@ jobs: - name: Setup .NET Core uses: actions/setup-dotnet@v3 with: - dotnet-version: "7.0.x" + dotnet-version: "8.0.x" - name: Restore NuGet packages run: dotnet restore diff --git a/PostgresOutbox.Console/Events/UserCreated.cs b/PostgresOutbox.Console/Events/UserCreated.cs deleted file mode 100644 index 364c943..0000000 --- a/PostgresOutbox.Console/Events/UserCreated.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace PostgresOutbox.Console.Events; - -public record UserCreated( - Guid Id, - string Name -); diff --git a/PostgresOutbox.Console/Program.cs b/PostgresOutbox.Console/Program.cs deleted file mode 100644 index 21f96f2..0000000 --- a/PostgresOutbox.Console/Program.cs +++ /dev/null @@ -1,28 +0,0 @@ -using PostgresForDotnetDev.CLI; -using PostgresOutbox.Serialization; -using PostgresOutbox.Subscriptions; -using PostgresOutbox.Subscriptions.Replication; - -using static PostgresOutbox.Subscriptions.Management.PublicationManagement; -using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; - -var cancellationTokenSource = new CancellationTokenSource(); - -var ct = cancellationTokenSource.Token; - -var slotName = "events_slot" + Guid.NewGuid().ToString().Replace("-", ""); - -var subscriptionOptions = new SubscriptionOptions( - Settings.ConnectionString, - new PublicationSetupOptions("events_pub","events" ), - new ReplicationSlotSetupOptions(slotName), - new EventDataMapper() -); - -var subscription = new Subscription(); - -await foreach (var readEvent in subscription.Subscribe(subscriptionOptions, ct:ct)) -{ - Console.WriteLine(JsonSerialization.ToJson(readEvent)); -} - diff --git a/PostgresOutbox/Database/DatabaseResultToDictionaryMapper.cs b/PostgresOutbox/Database/DatabaseResultToDictionaryMapper.cs deleted file mode 100644 index 9839523..0000000 --- a/PostgresOutbox/Database/DatabaseResultToDictionaryMapper.cs +++ /dev/null @@ -1,37 +0,0 @@ -using Npgsql; -using Npgsql.Replication.PgOutput.Messages; - -namespace PostgresOutbox.Database; - -public static class DatabaseResultToDictionaryMapper -{ - public static async ValueTask> ToDictionary(this InsertMessage message, CancellationToken ct) - { - var result = new Dictionary(); - var columnIndex = 0; - - await foreach (var value in message.NewRow) - { - var fieldName = message.Relation.Columns[columnIndex].ColumnName; - var fieldValue = await value.Get(ct); - result.Add(fieldName, fieldValue); - - columnIndex++; - } - return result; - } - - - public static ValueTask> ToDictionary(this NpgsqlDataReader reader, CancellationToken ct) - { - var result = new Dictionary(); - - for (var i = 0; i < reader.FieldCount; i++) - { - var value = reader.GetValue(i); - result[reader.GetName(i)] = value; - } - - return new ValueTask>(result); - } -} diff --git a/PostgresOutbox/Events/EventsAppender.cs b/PostgresOutbox/Events/EventsAppender.cs deleted file mode 100644 index 5094a08..0000000 --- a/PostgresOutbox/Events/EventsAppender.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Npgsql; -using PostgresOutbox.Serialization; - -namespace PostgresOutbox.Events; - -public static class EventsAppender -{ - public static async Task AppendAsync(string tableName, T @event, string connectionString, CancellationToken ct) - where T: class - { - var eventTypeName = typeof(T).AssemblyQualifiedName; - var eventData = JsonSerialization.ToJson(@event); - - await using var connection = new NpgsqlConnection(connectionString); - await connection.OpenAsync(ct); - var command = connection.CreateCommand(); - command.CommandText = $"INSERT INTO {tableName}(event_type, data) values ('{eventTypeName}', '{eventData}')"; - await command.ExecuteNonQueryAsync(ct); - } -} diff --git a/PostgresOutbox/Events/EventsTable.cs b/PostgresOutbox/Events/EventsTable.cs deleted file mode 100644 index 737b362..0000000 --- a/PostgresOutbox/Events/EventsTable.cs +++ /dev/null @@ -1,21 +0,0 @@ -using Npgsql; -using PostgresOutbox.Database; - -namespace PostgresOutbox.Events; - -public class EventsTable -{ - public static async Task Create(string connectionString, string tableName, CancellationToken ct) - { - var sql = @$" - CREATE TABLE {tableName} ( - id INTEGER PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - event_type VARCHAR(250) NOT NULL, - data JSONB NOT NULL - ); - "; - - await using var dataSource = NpgsqlDataSource.Create(connectionString); - await dataSource.Execute(sql, ct); - } -} diff --git a/PostgresOutbox/PostgresOutbox.csproj b/PostgresOutbox/PostgresOutbox.csproj deleted file mode 100644 index 0664618..0000000 --- a/PostgresOutbox/PostgresOutbox.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - - net7.0 - - - - - - - diff --git a/PostgresOutbox/Reflection/DictionaryToObjectMapper.cs b/PostgresOutbox/Reflection/DictionaryToObjectMapper.cs deleted file mode 100644 index d5c1304..0000000 --- a/PostgresOutbox/Reflection/DictionaryToObjectMapper.cs +++ /dev/null @@ -1,29 +0,0 @@ -namespace PostgresOutbox.Reflection; - -public static class DictionaryToObjectMapper -{ - public static T Map(this IDictionary dict, Func? transformName = null) - { - var properties = typeof(T).GetProperties(); - var obj = ObjectFactory.GetDefaultOrUninitialized(); - - foreach (var kvp in dict) - { - var property = properties.FirstOrDefault(p => - p.Name.Equals(transformName != null ? transformName(kvp.Key) : kvp.Key, - StringComparison.OrdinalIgnoreCase) - ); - if (property == null) continue; - - var targetType = Nullable.GetUnderlyingType(property.PropertyType) ?? property.PropertyType;; - - var value = kvp.Value is IConvertible && kvp.GetType() != property.PropertyType - ? Convert.ChangeType(kvp.Value, targetType) - : kvp.Value; - - property.SetValue(obj, value); - } - - return obj; - } -} diff --git a/PostgresOutbox/Reflection/GetType.cs b/PostgresOutbox/Reflection/GetType.cs deleted file mode 100644 index 90f9583..0000000 --- a/PostgresOutbox/Reflection/GetType.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace PostgresOutbox.Reflection; - -public class GetType -{ - private static readonly Dictionary Types = new(); - - public static Type ByName(string typeName) - { - if (Types.ContainsKey(typeName)) - return Types[typeName]; - - var type = GetFirstMatchingTypeFromCurrentDomainAssembly(typeName); - - if (type is null) - throw new ArgumentOutOfRangeException(nameof(typeName)); - - return Types[typeName] = type; - } - - private static Type? GetFirstMatchingTypeFromCurrentDomainAssembly(string typeName) - { - return Type.GetType(typeName) ?? AppDomain.CurrentDomain.GetAssemblies() - .SelectMany(a => new[] { a.GetType(typeName) }.Union(a.GetTypes().Where(x => - x.AssemblyQualifiedName == typeName || x.FullName == typeName || x.Name == typeName))) - .FirstOrDefault(); - } -} diff --git a/PostgresOutbox/Reflection/ObjectFactory.cs b/PostgresOutbox/Reflection/ObjectFactory.cs deleted file mode 100644 index 2bc9815..0000000 --- a/PostgresOutbox/Reflection/ObjectFactory.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System.Linq.Expressions; -using System.Reflection; -using System.Runtime.Serialization; - -namespace PostgresOutbox.Reflection; - -public static class ObjectFactory -{ - public static readonly Func GetDefaultOrUninitialized = Creator(); - - private static Func Creator() - { - var t = typeof(T); - if (t == typeof(string)) - return Expression.Lambda>(Expression.Constant(string.Empty)).Compile(); - - if (t.HasDefaultConstructor()) - return Expression.Lambda>(Expression.New(t)).Compile(); - - return () => (T)FormatterServices.GetUninitializedObject(t); - } -} - -public static class ObjectFactory -{ - public static bool HasDefaultConstructor(this Type t) - { - return t.IsValueType || t.GetConstructor(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, - null, Type.EmptyTypes, null) != null; - } -} diff --git a/PostgresOutbox/Serialization/JsonSerialization.cs b/PostgresOutbox/Serialization/JsonSerialization.cs deleted file mode 100644 index bd49354..0000000 --- a/PostgresOutbox/Serialization/JsonSerialization.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Text.Json; -using PostgresOutbox.Streams; - -namespace PostgresOutbox.Serialization; - -public static class JsonSerialization -{ - public static string ToJson(object data, JsonSerializerOptions? options = null) => - JsonSerializer.Serialize(data, options ?? new JsonSerializerOptions()); - - public static ValueTask FromJsonAsync(Type type, Stream stream, CancellationToken ct = default) => - JsonSerializer.DeserializeAsync(stream.ToSOHSkippingStream(), type, new JsonSerializerOptions(), ct); -} diff --git a/PostgresOutbox/Streams/SOHSkippingStream.cs b/PostgresOutbox/Streams/SOHSkippingStream.cs deleted file mode 100644 index 2959d8a..0000000 --- a/PostgresOutbox/Streams/SOHSkippingStream.cs +++ /dev/null @@ -1,152 +0,0 @@ -namespace PostgresOutbox.Streams -{ - internal class SOHSkippingStream: Stream - { - private readonly Stream _inner; - - public SOHSkippingStream(Stream inner) - { - _inner = inner; - } - - public override int ReadByte() - { - var initialPosition = _inner.Position; - var result = _inner.ReadByte(); - if (result == 1 && initialPosition == 0) result = _inner.ReadByte(); - - return result; - } - - public override int Read(byte[] buffer, int offset, int count) - { - var totalRead = 0; - if (_inner.Position == 0) - { - var readBytes = _inner.Read(buffer, 0, 1); - if (readBytes <= 0) return readBytes; - - if (buffer[0] != 1) - { - offset += 1; - count -= 1; - totalRead = 1; - } - } - - return totalRead + _inner.Read(buffer, offset, count); - } - - public override async Task ReadAsync(byte[] buffer, int offset, int count, - CancellationToken cancellationToken) - { - var totalRead = 0; - if (_inner.Position == 0) - { - var readBytes = await _inner.ReadAsync(buffer, 0, 1, cancellationToken).ConfigureAwait(false); - if (readBytes <= 0) return readBytes; - - if (buffer[0] != 1) - { - offset += 1; - count -= 1; - totalRead = 1; - } - } - - return totalRead + await _inner.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); - } - - public override async ValueTask ReadAsync(Memory buffer, - CancellationToken cancellationToken = default) - { - var totalRead = 0; - if (_inner.Position == 0) - { - var singleByteBuffer = buffer[..1]; - - var readBytes = await _inner.ReadAsync(singleByteBuffer, cancellationToken).ConfigureAwait(false); - if (readBytes <= 0) return readBytes; - - if (singleByteBuffer.Span[0] != 1) - { - totalRead = 1; - buffer = buffer[1..]; - } - } - - return totalRead + await _inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); - } - - public override int Read(Span buffer) - { - var totalRead = 0; - if (_inner.Position == 0) - { - var singleByteBuffer = buffer[..1]; - var readBytes = _inner.Read(singleByteBuffer); - if (readBytes <= 0) return readBytes; - - if (singleByteBuffer[0] != 1) - { - totalRead = 1; - buffer = buffer[1..]; - } - } - - return totalRead + _inner.Read(buffer); - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - throw new NotSupportedException(); - } - - public override int EndRead(IAsyncResult asyncResult) - { - throw new NotSupportedException(); - } - - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotSupportedException(); - } - - public override bool CanRead => true; - public override bool CanSeek => false; - public override bool CanWrite => false; - public override long Length => _inner.Length; - - public override long Position - { - get => throw new NotSupportedException(); - set => throw new NotSupportedException(); - } - - public override void Flush() - { - throw new NotSupportedException(); - } - - protected override void Dispose(bool disposing) - { - _inner.Dispose(); - } - - public override ValueTask DisposeAsync() - { - return _inner.DisposeAsync(); - } - } -} diff --git a/PostgresOutbox/Streams/StreamExtensions.cs b/PostgresOutbox/Streams/StreamExtensions.cs deleted file mode 100644 index dadce39..0000000 --- a/PostgresOutbox/Streams/StreamExtensions.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System.Text; - -namespace PostgresOutbox.Streams -{ - internal static class StreamExtensions - { - private const int BufferSize = 81920; - - static readonly Encoding UTF8NoBOM = new UTF8Encoding(false, true); - - public static SOHSkippingStream ToSOHSkippingStream(this Stream stream) - { - return new SOHSkippingStream(stream); - } - - public static async Task CopyStreamSkippingSOHAsync(this Stream input, Stream output, CancellationToken token = default) - { - var sohSkippingStream = new SOHSkippingStream(input); - await sohSkippingStream.CopyToAsync(output, 4096, token).ConfigureAwait(false); - } - - public static StreamReader GetStreamReader(this Stream stream) - { - var streamReader = new StreamReader(stream); - - var firstByte = streamReader.Peek(); - if (firstByte == 1) - { - streamReader.Read(); - } - - return streamReader; - } - } -} diff --git a/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs b/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs deleted file mode 100644 index c564ddb..0000000 --- a/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs +++ /dev/null @@ -1,60 +0,0 @@ -using Npgsql; -using Npgsql.Replication; -using NpgsqlTypes; -using PostgresOutbox.Database; - -namespace PostgresOutbox.Subscriptions.Management; -using static ReplicationSlotManagement.CreateReplicationSlotResult; - -public static class ReplicationSlotManagement -{ - public static async Task SetupReplicationSlot( - this NpgsqlDataSource dataSource, - LogicalReplicationConnection connection, - ReplicationSlotSetupOptions options, - CancellationToken ct - ) - { - var (slotName, createStyle) = options; - - if(createStyle == CreateStyle.Never) - return new None(); - - if (await dataSource.ReplicationSlotExists(slotName, ct)) - { - if (createStyle == CreateStyle.WhenNotExists) - return new AlreadyExists(); - - await connection.DropReplicationSlot(slotName, true, ct); - } - - var result = await connection.CreatePgOutputReplicationSlot( - slotName, - slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, - cancellationToken: ct - ); - - return new Created(result.SnapshotName!, result.ConsistentPoint); - } - - private static Task ReplicationSlotExists( - this NpgsqlDataSource dataSource, - string slotName, - CancellationToken ct - ) => - dataSource.Exists("pg_replication_slots", "slot_name = $1", new object[] { slotName }, ct); - - public record ReplicationSlotSetupOptions( - string SlotName, - CreateStyle CreateStyle = CreateStyle.WhenNotExists - ); - - public abstract record CreateReplicationSlotResult - { - public record None: CreateReplicationSlotResult; - - public record AlreadyExists: CreateReplicationSlotResult; - - public record Created(string SnapshotName, NpgsqlLogSequenceNumber LogSequenceNumber): CreateReplicationSlotResult; - } -} diff --git a/PostgresOutbox/Subscriptions/Replication/EventDataMapper.cs b/PostgresOutbox/Subscriptions/Replication/EventDataMapper.cs deleted file mode 100644 index 1c4d14b..0000000 --- a/PostgresOutbox/Subscriptions/Replication/EventDataMapper.cs +++ /dev/null @@ -1,46 +0,0 @@ -using Npgsql; -using Npgsql.Replication.PgOutput.Messages; -using PostgresOutbox.Serialization; - -namespace PostgresOutbox.Subscriptions.Replication; - -public class EventDataMapper: IReplicationDataMapper -{ - public async Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct) - { - var columnNumber = 0; - var eventTypeName = string.Empty; - - await foreach (var value in insertMessage.NewRow) - { - switch (columnNumber) - { - case 1: - eventTypeName = await value.GetTextReader().ReadToEndAsync(ct); - break; - case 2 when value.GetDataTypeName().ToLower() == "jsonb": - { - var eventType = Reflection.GetType.ByName(eventTypeName); - - var @event = await JsonSerialization.FromJsonAsync(eventType, value.GetStream(), ct); - - return @event!; - } - } - - columnNumber++; - } - - throw new InvalidOperationException("You should not get here"); - } - - public async Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) - { - var eventTypeName = reader.GetString(1); - var eventType = Reflection.GetType.ByName(eventTypeName); - - var @event = await JsonSerialization.FromJsonAsync(eventType, await reader.GetStreamAsync(2, ct), ct); - - return @event!; - } -} diff --git a/PostgresOutbox/Subscriptions/Replication/FlatObjectMapper.cs b/PostgresOutbox/Subscriptions/Replication/FlatObjectMapper.cs deleted file mode 100644 index 8c40dd7..0000000 --- a/PostgresOutbox/Subscriptions/Replication/FlatObjectMapper.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System.Globalization; -using PostgresOutbox.Reflection; - -namespace PostgresOutbox.Subscriptions.Replication; - -public class FlatObjectMapper: DictionaryReplicationDataMapper where T : notnull -{ - public FlatObjectMapper(Func? transformName = null): base(Map(transformName)) - { - } - - private static Func, CancellationToken, ValueTask> Map( - Func? transformName) => - (dictionary, _) => new ValueTask(dictionary.Map(transformName)); -} - -public static class NameTransformations -{ - public static string FromPostgres(string columnName) => - CultureInfo.InvariantCulture.TextInfo.ToTitleCase(columnName.Replace("_", "")).Replace(" ", ""); -} diff --git a/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs b/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs deleted file mode 100644 index 35e4f50..0000000 --- a/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs +++ /dev/null @@ -1,53 +0,0 @@ -using Npgsql; -using Npgsql.Replication.PgOutput.Messages; -using PostgresOutbox.Database; - -namespace PostgresOutbox.Subscriptions.Replication; - -public interface IReplicationDataMapper -{ - Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct); - - Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct); -} - -public interface IReplicationDataMapper: IReplicationDataMapper where T : notnull -{ - new Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct); - - new Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct); - - async Task IReplicationDataMapper.ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) => - await ReadFromSnapshot(reader, ct); - - async Task IReplicationDataMapper.ReadFromReplication(InsertMessage message, CancellationToken ct) => - await ReadFromReplication(message, ct); -} - -public class DictionaryReplicationDataMapper: IReplicationDataMapper where T : notnull -{ - private readonly Func, CancellationToken, ValueTask> map; - - public DictionaryReplicationDataMapper(Func, CancellationToken, ValueTask> map) => - this.map = map; - - public async Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) => - await map(await reader.ToDictionary(ct), ct); - - public async Task ReadFromReplication(InsertMessage message, CancellationToken ct) => - await map(await message.ToDictionary(ct), ct); -} - -public class DictionaryReplicationDataMapper: DictionaryReplicationDataMapper -{ - public static DictionaryReplicationDataMapper For( - Func, CancellationToken, ValueTask> map) where T : notnull => new(map); - - public static DictionaryReplicationDataMapper For( - Func, CancellationToken, ValueTask> map) => new(map); - - public DictionaryReplicationDataMapper(Func, CancellationToken, ValueTask> map): - base(map) - { - } -} diff --git a/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/InsertMessageHandler.cs b/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/InsertMessageHandler.cs deleted file mode 100644 index f7fb038..0000000 --- a/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/InsertMessageHandler.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Npgsql.Replication.PgOutput.Messages; -using PostgresOutbox.Subscriptions.Replication; - -namespace PostgresOutbox.Subscriptions.ReplicationMessageHandlers; - -public static class InsertMessageHandler -{ - public static async Task Handle( - InsertMessage message, - IReplicationDataMapper dataMapper, - CancellationToken ct - ) => - await dataMapper.ReadFromReplication(message, ct); -} diff --git a/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/RelationMessageHandler.cs b/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/RelationMessageHandler.cs deleted file mode 100644 index ec9caca..0000000 --- a/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/RelationMessageHandler.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Collections.Concurrent; -using Npgsql.Replication.PgOutput.Messages; - -namespace PostgresOutbox.Subscriptions.ReplicationMessageHandlers; - -public static class RelationCache -{ - public static ConcurrentDictionary Relations = new(); -} - -public class RelationMessageHandler -{ - - public static void Handle(RelationMessage message) => - RelationCache.Relations[message.RelationId] = message; -} diff --git a/PostgresOutbox/Subscriptions/Subscription.cs b/PostgresOutbox/Subscriptions/Subscription.cs deleted file mode 100644 index c611520..0000000 --- a/PostgresOutbox/Subscriptions/Subscription.cs +++ /dev/null @@ -1,107 +0,0 @@ -using System.Runtime.CompilerServices; -using Npgsql; -using Npgsql.Replication; -using Npgsql.Replication.PgOutput; -using Npgsql.Replication.PgOutput.Messages; -using PostgresOutbox.Subscriptions.Management; -using PostgresOutbox.Subscriptions.Replication; -using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; -using PostgresOutbox.Subscriptions.SnapshotReader; - -namespace PostgresOutbox.Subscriptions; - -using static PublicationManagement; -using static ReplicationSlotManagement; -using static ReplicationSlotManagement.CreateReplicationSlotResult; - -public interface ISubscription -{ - IAsyncEnumerable Subscribe(SubscriptionOptions options, CancellationToken ct); -} - -public record SubscriptionOptions( - string ConnectionString, - PublicationSetupOptions PublicationSetupOptions, - ReplicationSlotSetupOptions SlotSetupOptions, - IReplicationDataMapper DataMapper -); - -public enum CreateStyle -{ - WhenNotExists, - AlwaysRecreate, - Never -} - -public class Subscription: ISubscription -{ - public async IAsyncEnumerable Subscribe( - SubscriptionOptions options, - [EnumeratorCancellation] CancellationToken ct = default - ) - { - var (connectionString, publicationSetupOptions, slotSetupOptions, replicationDataMapper) = options; - var dataSource = NpgsqlDataSource.Create(connectionString); - - await using var conn = new LogicalReplicationConnection(connectionString); - await conn.Open(ct); - - await dataSource.SetupPublication(publicationSetupOptions, ct); - var result = await dataSource.SetupReplicationSlot(conn, slotSetupOptions, ct); - - PgOutputReplicationSlot slot; - - if (result is not Created created) - { - slot = new PgOutputReplicationSlot(slotSetupOptions.SlotName); - } - else - { - slot = new PgOutputReplicationSlot( - new ReplicationSlotOptions( - slotSetupOptions.SlotName, - created.LogSequenceNumber - ) - ); - - await foreach (var @event in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, options, ct)) - { - yield return @event; - } - } - - await foreach (var message in - conn.StartReplication(slot, - new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1), ct)) - { - if (message is InsertMessage insertMessage) - { - yield return await InsertMessageHandler.Handle(insertMessage, replicationDataMapper, ct); - } - - // Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually - // so that Npgsql can inform the server which WAL files can be removed/recycled. - conn.SetReplicationStatus(message.WalEnd); - await conn.SendStatusUpdate(ct); - } - } - - private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( - NpgsqlDataSource dataSource, - string snapshotName, - SubscriptionOptions options, - [EnumeratorCancellation] CancellationToken ct = default - ) - { - await using var connection = await dataSource.OpenConnectionAsync(ct); - - await foreach (var row in connection.GetRowsFromSnapshot( - snapshotName, - options.PublicationSetupOptions.TableName, - options.DataMapper, - ct)) - { - yield return row; - } - } -} diff --git a/PostgresOutboxPattern.sln b/PostgresOutboxPattern.sln new file mode 100644 index 0000000..eff6792 --- /dev/null +++ b/PostgresOutboxPattern.sln @@ -0,0 +1,79 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34622.214 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker", "docker", "{CD59A1A0-F40D-4047-87A3-66C0F1519FA5}" + ProjectSection(SolutionItems) = preProject + docker-compose.yml = docker-compose.yml + EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PostgresOutbox", "src\PostgresOutbox\PostgresOutbox.csproj", "{E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{D0AB0FF4-C8A1-4B4B-A682-64F353A2D248}" + ProjectSection(SolutionItems) = preProject + src\Directory.Build.props = src\Directory.Build.props + .github\workflows\build.dotnet.yml = .github\workflows\build.dotnet.yml + EndProjectSection +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{47B6EDAC-2EBC-4368-BAD6-2B8695E7D067}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Subscriber", "src\Subscriber\Subscriber.csproj", "{06A60918-32EB-4C38-B369-7E1D76B809A0}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Publisher", "src\Publisher\Publisher.csproj", "{F2878625-0919-4C26-8DC9-58CD8FA34050}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Commons", "src\Commons\Commons.csproj", "{F81E2D5B-FC59-4396-A911-56BE65E4FE80}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "src\Tests\Tests.csproj", "{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "pgAdmin", "pgAdmin", "{C050E9E8-3FB6-4581-953F-31826E385FB4}" + ProjectSection(SolutionItems) = preProject + docker\pgAdmin\servers.json = docker\pgAdmin\servers.json + docker\pgAdmin\pgpass = docker\pgAdmin\pgpass + EndProjectSection +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8AAAA344-B5FD-48D9-B2BA-379E374448D4}" + ProjectSection(SolutionItems) = preProject + docker\postgres\init.sql = docker\postgres\init.sql + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Release|Any CPU.Build.0 = Release|Any CPU + {06A60918-32EB-4C38-B369-7E1D76B809A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {06A60918-32EB-4C38-B369-7E1D76B809A0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {06A60918-32EB-4C38-B369-7E1D76B809A0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {06A60918-32EB-4C38-B369-7E1D76B809A0}.Release|Any CPU.Build.0 = Release|Any CPU + {F2878625-0919-4C26-8DC9-58CD8FA34050}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F2878625-0919-4C26-8DC9-58CD8FA34050}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F2878625-0919-4C26-8DC9-58CD8FA34050}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F2878625-0919-4C26-8DC9-58CD8FA34050}.Release|Any CPU.Build.0 = Release|Any CPU + {F81E2D5B-FC59-4396-A911-56BE65E4FE80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F81E2D5B-FC59-4396-A911-56BE65E4FE80}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F81E2D5B-FC59-4396-A911-56BE65E4FE80}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F81E2D5B-FC59-4396-A911-56BE65E4FE80}.Release|Any CPU.Build.0 = Release|Any CPU + {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614} + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} + {8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5} + EndGlobalSection +EndGlobal diff --git a/PostgresOutboxPatternWithCDC.NET.Tests/Events/UserCreated.cs b/PostgresOutboxPatternWithCDC.NET.Tests/Events/UserCreated.cs deleted file mode 100644 index 87a69e4..0000000 --- a/PostgresOutboxPatternWithCDC.NET.Tests/Events/UserCreated.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace PostgresOutboxPatternWithCDC.NET.Tests.Events; - -public record UserCreated( - Guid Id, - string Name -); diff --git a/PostgresOutboxPatternWithCDC.NET.sln b/PostgresOutboxPatternWithCDC.NET.sln deleted file mode 100644 index bcc4ea9..0000000 --- a/PostgresOutboxPatternWithCDC.NET.sln +++ /dev/null @@ -1,48 +0,0 @@ - -Microsoft Visual Studio Solution File, Format Version 12.00 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgresOutboxPatternWithCDC.NET.Tests", "PostgresOutboxPatternWithCDC.NET.Tests\PostgresOutboxPatternWithCDC.NET.Tests.csproj", "{CAC01DAE-078B-474C-8C38-B528F27B2E16}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker", "docker", "{CD59A1A0-F40D-4047-87A3-66C0F1519FA5}" - ProjectSection(SolutionItems) = preProject - docker-compose.yml = docker-compose.yml - EndProjectSection -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sql", "sql", "{9F497531-24EF-4744-B06C-B766562A9852}" - ProjectSection(SolutionItems) = preProject - sql\init.sql = sql\init.sql - EndProjectSection -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgresOutbox", "PostgresOutbox\PostgresOutbox.csproj", "{E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{D0AB0FF4-C8A1-4B4B-A682-64F353A2D248}" - ProjectSection(SolutionItems) = preProject - Directory.Build.props = Directory.Build.props - EndProjectSection -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{47B6EDAC-2EBC-4368-BAD6-2B8695E7D067}" - ProjectSection(SolutionItems) = preProject - README.md = README.md - EndProjectSection -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgresOutbox.Console", "PostgresOutbox.Console\PostgresOutbox.Console.csproj", "{06A60918-32EB-4C38-B369-7E1D76B809A0}" -EndProject -Global - GlobalSection(SolutionConfigurationPlatforms) = preSolution - Debug|Any CPU = Debug|Any CPU - Release|Any CPU = Release|Any CPU - EndGlobalSection - GlobalSection(ProjectConfigurationPlatforms) = postSolution - {CAC01DAE-078B-474C-8C38-B528F27B2E16}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CAC01DAE-078B-474C-8C38-B528F27B2E16}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CAC01DAE-078B-474C-8C38-B528F27B2E16}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CAC01DAE-078B-474C-8C38-B528F27B2E16}.Release|Any CPU.Build.0 = Release|Any CPU - {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Debug|Any CPU.Build.0 = Debug|Any CPU - {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Release|Any CPU.ActiveCfg = Release|Any CPU - {E1AD98FC-42B1-4B7B-AF44-8057B7A61FDD}.Release|Any CPU.Build.0 = Release|Any CPU - {06A60918-32EB-4C38-B369-7E1D76B809A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {06A60918-32EB-4C38-B369-7E1D76B809A0}.Debug|Any CPU.Build.0 = Debug|Any CPU - {06A60918-32EB-4C38-B369-7E1D76B809A0}.Release|Any CPU.ActiveCfg = Release|Any CPU - {06A60918-32EB-4C38-B369-7E1D76B809A0}.Release|Any CPU.Build.0 = Release|Any CPU - EndGlobalSection -EndGlobal diff --git a/README.md b/README.md index 75dd351..e9f9edf 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,14 @@ # Postgres Outbox Pattern with CDC and .NET PoC of doing Outbox Pattern with CDC and .NET +## Features: + +- Publication filter [is enabled](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-WITH) to trigger only INSERTed rows; +- AOT compliant compilation enforced by design + Uses [Postgres logical replication](https://www.postgresql.org/docs/current/logical-replication.html) with [Npgsql integration](https://www.npgsql.org/doc/replication.html). -Main logic is placed in [EventsSubscription](./PostgresOutbox/Subscriptions/EventsSubscription.cs). Check [LogicalReplicationTest](./PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs) for sample usage. +Main logic is placed in [EventsSubscription](./PostgresOutbox/Subscriptions/Subscription.cs). Check [LogicalReplicationTest](./PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs) for sample usage. Read more details in: - [Push-based Outbox Pattern with Postgres Logical Replication](https://event-driven.io/en/push_based_outbox_pattern_with_postgres_logical_replication/?utm_source=github_outbox_cdc). @@ -15,7 +20,11 @@ Read more details in: ```shell docker-compose up ``` -2. Run tests +2. Run(order doesn't matter) Publisher and Subscriber apps from vs-studio and follow Publisher instructions. + +## Testing (against default docker instance) + +Run tests ```shell dotnet test ``` @@ -70,6 +79,7 @@ dotnet test #### Performance - [2ndQuadrant - Performance limits of logical replication solutions](https://www.2ndquadrant.com/en/blog/performance-limits-of-logical-replication-solutions/) +- [Some benchmarking techniques](https://fluca1978.github.io/2021/07/15/PostgreSQLWalTraffic2.html) #### Snapshots - [Christos Christoudias - Creating a Logical Replica from a Snapshot in RDS Postgres](https://tech.instacart.com/creating-a-logical-replica-from-a-snapshot-in-rds-postgres-886d9d2c7343) diff --git a/docker-compose.yml b/docker-compose.yml index 5d8846c..e34c14e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,39 @@ +version: '3.8' # Specify Docker Compose version + services: - postgres: - image: postgres:15.1-alpine - ports: - - "5432:5432" - environment: - - POSTGRES_DB=postgres - - POSTGRES_PASSWORD=Password12! - command: - - "postgres" - - "-c" - - "wal_level=logical" - volumes: - - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql + postgres: + image: postgres:15.1-alpine + ports: + - "5432:5432" + environment: + - POSTGRES_DB=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_USER=postgres + command: + - "postgres" + - "-c" + - "wal_level=logical" + - "-c" + - "wal_compression=on" + volumes: + - ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql + + pgadmin: + container_name: pgadmin_container + image: dpage/pgadmin4 + environment: + - PGADMIN_DEFAULT_EMAIL=${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org} + - PGADMIN_DEFAULT_PASSWORD=${PGADMIN_DEFAULT_PASSWORD:-postgres} + - PGADMIN_CONFIG_SERVER_MODE=False + - PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED=False + ports: + - "${PGADMIN_PORT:-5050}:80" + entrypoint: /bin/sh -c "chmod 600 /pgpass; /entrypoint.sh;" + user: root + volumes: + - ./docker/pgAdmin/pgpass:/pgpass + - ./docker/pgAdmin/servers.json:/pgadmin4/servers.json + depends_on: + - postgres + restart: unless-stopped + diff --git a/docker/pgAdmin/pgpass b/docker/pgAdmin/pgpass new file mode 100644 index 0000000..8755b48 --- /dev/null +++ b/docker/pgAdmin/pgpass @@ -0,0 +1 @@ +postgres:5432:*:postgres:postgres diff --git a/docker/pgAdmin/servers.json b/docker/pgAdmin/servers.json new file mode 100644 index 0000000..0a47106 --- /dev/null +++ b/docker/pgAdmin/servers.json @@ -0,0 +1,14 @@ +{ + "Servers": { + "1": { + "Group": "Servers", + "Name": "docker_postgres", + "Host": "postgres", + "Port": 5432, + "MaintenanceDB": "postgres", + "Username": "postgres", + "PassFile": "/pgpass", + "SSLMode": "prefer" + } + } +} diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql new file mode 100644 index 0000000..5887ecd --- /dev/null +++ b/docker/postgres/init.sql @@ -0,0 +1,6 @@ + +CREATE TABLE outbox ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + message_type VARCHAR(250) NOT NULL, + data JSONB NOT NULL +); diff --git a/sql/init.sql b/sql/init.sql deleted file mode 100644 index 773c9ab..0000000 --- a/sql/init.sql +++ /dev/null @@ -1,6 +0,0 @@ - -CREATE TABLE events ( - id INTEGER PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - event_type VARCHAR(250) NOT NULL, - data JSONB NOT NULL -); diff --git a/src/Commons/Commons.csproj b/src/Commons/Commons.csproj new file mode 100644 index 0000000..f1332f1 --- /dev/null +++ b/src/Commons/Commons.csproj @@ -0,0 +1,13 @@ + + + + net8.0 + enable + enable + + + + + + + diff --git a/PostgresOutbox.Console/Settings.cs b/src/Commons/Settings.cs similarity index 74% rename from PostgresOutbox.Console/Settings.cs rename to src/Commons/Settings.cs index ba054eb..a730f44 100644 --- a/PostgresOutbox.Console/Settings.cs +++ b/src/Commons/Settings.cs @@ -1,8 +1,8 @@ -namespace PostgresForDotnetDev.CLI; +namespace Commons; public static class Settings { - public static string ConnectionString = + public static readonly string ConnectionString = "PORT = 5432; HOST = 127.0.0.1; TIMEOUT = 15; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; Include Error Detail=True; DATABASE = 'postgres'; PASSWORD = 'postgres'; USER ID = 'postgres';"; } diff --git a/Directory.Build.props b/src/Directory.Build.props similarity index 61% rename from Directory.Build.props rename to src/Directory.Build.props index 1220aa9..d23bfcb 100644 --- a/Directory.Build.props +++ b/src/Directory.Build.props @@ -4,5 +4,7 @@ enable true enable + true + false diff --git a/PostgresOutbox/Database/Run.cs b/src/PostgresOutbox/Database/Run.cs similarity index 70% rename from PostgresOutbox/Database/Run.cs rename to src/PostgresOutbox/Database/Run.cs index 07a27fa..f2e7cc1 100644 --- a/PostgresOutbox/Database/Run.cs +++ b/src/PostgresOutbox/Database/Run.cs @@ -1,7 +1,8 @@ -using System.Data; +using System.Data; using System.Runtime.CompilerServices; using Npgsql; using PostgresOutbox.Subscriptions.Replication; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; namespace PostgresOutbox.Database; @@ -16,14 +17,16 @@ public static async Task Execute( await command.ExecuteNonQueryAsync(ct); } - public static async Task Execute( - string connectionString, - string sql, - CancellationToken ct) + public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, string tableName, CancellationToken ct) { - await using var dataSource = NpgsqlDataSource.Create(connectionString); - await using var command = dataSource.CreateCommand(sql); - await command.ExecuteNonQueryAsync(ct); + var sql = @$" + CREATE TABLE IF NOT EXISTS {tableName} ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + message_type VARCHAR(250) NOT NULL, + data JSONB NOT NULL + ); + "; + await dataSource.Execute(sql, ct); } public static async Task Exists( @@ -36,15 +39,12 @@ public static async Task Exists( await using var command = dataSource.CreateCommand( $"SELECT EXISTS(SELECT 1 FROM {table} WHERE {where})" ); - foreach (var parameter in parameters) - { - command.Parameters.AddWithValue(parameter); - } + foreach (var parameter in parameters) command.Parameters.AddWithValue(parameter); return ((await command.ExecuteScalarAsync(ct)) as bool?) == true; } - public static async IAsyncEnumerable QueryTransactionSnapshot( + internal static async IAsyncEnumerable QueryTransactionSnapshot( this NpgsqlConnection connection, string snapshotName, string tableName, @@ -61,8 +61,6 @@ public static async IAsyncEnumerable QueryTransactionSnapshot( await using var reader = await cmd.ExecuteReaderAsync(ct); while (await reader.ReadAsync(ct)) - { yield return await dataMapper.ReadFromSnapshot(reader, ct); - } } } diff --git a/src/PostgresOutbox/PostgresOutbox.csproj b/src/PostgresOutbox/PostgresOutbox.csproj new file mode 100644 index 0000000..b320d40 --- /dev/null +++ b/src/PostgresOutbox/PostgresOutbox.csproj @@ -0,0 +1,13 @@ + + + + net8.0 + + + + + + + + + diff --git a/src/PostgresOutbox/Serialization/INamingPolicy.cs b/src/PostgresOutbox/Serialization/INamingPolicy.cs new file mode 100644 index 0000000..2a0c939 --- /dev/null +++ b/src/PostgresOutbox/Serialization/INamingPolicy.cs @@ -0,0 +1,16 @@ +namespace PostgresOutbox.Serialization; + +public interface INamingPolicy +{ + Func Bind { get; } +} + +public abstract record NamingPolicy(Func Bind):INamingPolicy +{ + public Func Bind { get; } = Bind; +} + +//This should be used in shared kernel scenario where common library is shared between Pub and Sub +public record FQNNamingPolicy(): NamingPolicy(type => type.FullName!); +//This policy is better suited for distributed components +public record AttributeNamingPolicy(): NamingPolicy(MessageUrn.ForTypeString); diff --git a/src/PostgresOutbox/Serialization/ITypeResolver.cs b/src/PostgresOutbox/Serialization/ITypeResolver.cs new file mode 100644 index 0000000..a4959f7 --- /dev/null +++ b/src/PostgresOutbox/Serialization/ITypeResolver.cs @@ -0,0 +1,33 @@ +using System.Collections.Concurrent; +using System.Text.Json.Serialization; +using System.Text.Json.Serialization.Metadata; + +namespace PostgresOutbox.Serialization; + +public interface ITypeResolver +{ + Type Resolve(string value); + (string, JsonTypeInfo) Resolve(Type type); + JsonSerializerContext SerializationContext { get; } +} + +public class TypeResolver(JsonSerializerContext serializationContext, INamingPolicy? namingPolicy=default): ITypeResolver +{ + public JsonSerializerContext SerializationContext { get; } = serializationContext; + private static readonly ConcurrentDictionary TypeDictionary = []; + private static readonly ConcurrentDictionary TypeInfoDictionary = []; + + public TypeResolver WhiteList() where T:class + { + var type = typeof(T); + var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName); + TypeDictionary.AddOrUpdate((namingPolicy ?? new FQNNamingPolicy()).Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type); + TypeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo); + return this; + } + + public (string, JsonTypeInfo) Resolve(Type type) => + (TypeDictionary.Single(kv => kv.Value == type).Key, TypeInfoDictionary[type]); + + public Type Resolve(string type) => TypeDictionary[type]; +} diff --git a/src/PostgresOutbox/Serialization/JsonSerialization.cs b/src/PostgresOutbox/Serialization/JsonSerialization.cs new file mode 100644 index 0000000..0b6e1f5 --- /dev/null +++ b/src/PostgresOutbox/Serialization/JsonSerialization.cs @@ -0,0 +1,16 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Text.Json.Serialization.Metadata; +using PostgresOutbox.Streams; + +namespace PostgresOutbox.Serialization; + +public static class JsonSerialization +{ + public static string ToJson(T data, JsonTypeInfo typeInfo) where T:class=> + JsonSerializer.Serialize(data, typeInfo); + + public static async ValueTask FromJsonAsync(Type type, Stream stream, JsonSerializerContext context, CancellationToken ct = default) + => await JsonSerializer.DeserializeAsync(stream.ToSohSkippingStream(), type, context, ct).ConfigureAwait(false) + ?? throw new InvalidOperationException(); +} diff --git a/src/PostgresOutbox/Serialization/MessageUrnAttribute.cs b/src/PostgresOutbox/Serialization/MessageUrnAttribute.cs new file mode 100644 index 0000000..a384d5c --- /dev/null +++ b/src/PostgresOutbox/Serialization/MessageUrnAttribute.cs @@ -0,0 +1,59 @@ +using System.Collections.Concurrent; + +namespace PostgresOutbox.Serialization; + +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Interface)] +public class MessageUrnAttribute: + Attribute +{ + /// + /// + /// The urn value to use for this message type. + public MessageUrnAttribute(string urn) + { + ArgumentException.ThrowIfNullOrEmpty(urn, nameof(urn)); + + if (urn.StartsWith(MessageUrn.Prefix)) + throw new ArgumentException($"Value should not contain the default prefix '{MessageUrn.Prefix}'.", nameof(urn)); + + Urn = FormatUrn(urn); + } + + public Uri Urn { get; } + + private static Uri FormatUrn(string urn) + { + var fullValue = MessageUrn.Prefix + urn; + + if (Uri.TryCreate(fullValue, UriKind.Absolute, out var uri)) + return uri; + + throw new UriFormatException($"Invalid URN: {fullValue}"); + } +} + + +public static class MessageUrn +{ + public const string Prefix = "urn:message:"; + + private static readonly ConcurrentDictionary Cache = new(); + + + public static string ForTypeString(Type type) => + Cache.GetOrAdd(type,t => + { + var attribute = Attribute.GetCustomAttribute(t, typeof(MessageUrnAttribute)) as MessageUrnAttribute ?? + throw new NotSupportedException($"Attribute not defined fot type '{type}'"); + return new Cached(attribute.Urn, attribute.Urn.ToString()); + }).UrnString; + + + private interface ICached + { + Uri Urn { get; } + string UrnString { get; } + } + + private record Cached(Uri Urn, string UrnString): ICached; +} diff --git a/src/PostgresOutbox/Serialization/SOHSkippingStream.cs b/src/PostgresOutbox/Serialization/SOHSkippingStream.cs new file mode 100644 index 0000000..8e76f75 --- /dev/null +++ b/src/PostgresOutbox/Serialization/SOHSkippingStream.cs @@ -0,0 +1,160 @@ +namespace PostgresOutbox.Serialization; + +internal class SOHSkippingStream(Stream inner): Stream +{ + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => inner.Length; + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override int ReadByte() + { + var initialPosition = inner.Position; + var result = inner.ReadByte(); + if (result == 1 && initialPosition == 0) + { + result = inner.ReadByte(); + } + + return result; + } + + public override int Read(byte[] buffer, int offset, int count) + { + var totalRead = 0; + if (inner.Position == 0) + { + var readBytes = inner.Read(buffer, 0, 1); + if (readBytes <= 0) + { + return readBytes; + } + + if (buffer[0] != 1) + { + offset += 1; + count -= 1; + totalRead = 1; + } + } + + return totalRead + inner.Read(buffer, offset, count); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + var totalRead = 0; + if (inner.Position == 0) + { + var readBytes = await inner.ReadAsync(buffer.AsMemory(0, 1), cancellationToken).ConfigureAwait(false); + if (readBytes <= 0) + { + return readBytes; + } + + if (buffer[0] != 1) + { + offset += 1; + count -= 1; + totalRead = 1; + } + } + + return totalRead + await inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false); + } + + public override async ValueTask ReadAsync(Memory buffer, + CancellationToken cancellationToken = default) + { + var totalRead = 0; + if (inner.Position == 0) + { + var singleByteBuffer = buffer[..1]; + + var readBytes = await inner.ReadAsync(singleByteBuffer, cancellationToken).ConfigureAwait(false); + if (readBytes <= 0) + { + return readBytes; + } + + if (singleByteBuffer.Span[0] != 1) + { + totalRead = 1; + buffer = buffer[1..]; + } + } + + return totalRead + await inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + + public override int Read(Span buffer) + { + var totalRead = 0; + if (inner.Position == 0) + { + var singleByteBuffer = buffer[..1]; + var readBytes = inner.Read(singleByteBuffer); + if (readBytes <= 0) + { + return readBytes; + } + + if (singleByteBuffer[0] != 1) + { + totalRead = 1; + buffer = buffer[1..]; + } + } + + return totalRead + inner.Read(buffer); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, + object? state) + { + throw new NotSupportedException(); + } + + public override int EndRead(IAsyncResult asyncResult) + { + throw new NotSupportedException(); + } + + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override void Flush() + { + throw new NotSupportedException(); + } + + protected override void Dispose(bool disposing) + { + inner.Dispose(); + } + + public override ValueTask DisposeAsync() + { + return inner.DisposeAsync(); + } +} diff --git a/src/PostgresOutbox/Streams/SohSkippingStream.cs b/src/PostgresOutbox/Streams/SohSkippingStream.cs new file mode 100644 index 0000000..9a67f70 --- /dev/null +++ b/src/PostgresOutbox/Streams/SohSkippingStream.cs @@ -0,0 +1,125 @@ +namespace PostgresOutbox.Streams +{ + internal class SohSkippingStream(Stream inner): Stream + { + public override int ReadByte() + { + var result = inner.ReadByte(); + if (result == 1 && inner.Position == 0) result = inner.ReadByte(); + return result; + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (inner.Position != 0) return inner.Read(buffer, offset, count); + var readBytes = inner.Read(buffer, 0, 1); + if (readBytes <= 0) return readBytes; + + if (buffer[0] == 1) return inner.Read(buffer, offset, count); + offset += 1; + count -= 1; + return 1 + inner.Read(buffer, offset, count); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + if (inner.Position != 0) + return await inner.ReadAsync(new Memory(buffer, offset, count), cancellationToken) + .ConfigureAwait(false); + var readBytes = await inner.ReadAsync(new Memory(buffer, 0, 1), cancellationToken) + .ConfigureAwait(false); + if (readBytes <= 0) return readBytes; + + if (buffer[0] == 1) + return await inner.ReadAsync(new Memory(buffer, offset, count), cancellationToken) + .ConfigureAwait(false); + offset += 1; + count -= 1; + return 1 + await inner.ReadAsync(new Memory(buffer, offset, count), cancellationToken) + .ConfigureAwait(false); + } + + public override async ValueTask ReadAsync(Memory buffer, + CancellationToken cancellationToken = default) + { + if (inner.Position != 0) + return await inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + var singleByteBuffer = buffer[..1]; + + var readBytes = await inner.ReadAsync(singleByteBuffer, cancellationToken).ConfigureAwait(false); + if (readBytes <= 0) return readBytes; + + if (singleByteBuffer.Span[0] == 1) + return await inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + buffer = buffer[1..]; + + return 1 + await inner.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + + public override int Read(Span buffer) + { + if (inner.Position != 0) return inner.Read(buffer); + var readBytes = inner.Read(buffer[..1]); + if (readBytes <= 0) return readBytes; + + if (buffer[..1][0] == 1) return inner.Read(buffer); + buffer = buffer[1..]; + + return 1 + inner.Read(buffer); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, + object? state) + { + throw new NotSupportedException(); + } + + public override int EndRead(IAsyncResult asyncResult) + { + throw new NotSupportedException(); + } + + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => inner.Length; + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() + { + throw new NotSupportedException(); + } + + protected override void Dispose(bool disposing) + { + inner.Dispose(); + } + + public override ValueTask DisposeAsync() + { + return inner.DisposeAsync(); + } + } +} diff --git a/src/PostgresOutbox/Streams/StreamExtensions.cs b/src/PostgresOutbox/Streams/StreamExtensions.cs new file mode 100644 index 0000000..d752e54 --- /dev/null +++ b/src/PostgresOutbox/Streams/StreamExtensions.cs @@ -0,0 +1,7 @@ +namespace PostgresOutbox.Streams +{ + internal static class StreamExtensions + { + public static SohSkippingStream ToSohSkippingStream(this Stream stream) => new(inner: stream); + } +} diff --git a/src/PostgresOutbox/Subscriptions/IConsume.cs b/src/PostgresOutbox/Subscriptions/IConsume.cs new file mode 100644 index 0000000..95e6eb9 --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/IConsume.cs @@ -0,0 +1,8 @@ +namespace PostgresOutbox.Subscriptions; + +public interface IConsume; + +public interface IConsumes: IConsume where T : class +{ + Task Handle(T value); +} diff --git a/src/PostgresOutbox/Subscriptions/ISubscriptionOptions.cs b/src/PostgresOutbox/Subscriptions/ISubscriptionOptions.cs new file mode 100644 index 0000000..50aa95f --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/ISubscriptionOptions.cs @@ -0,0 +1,31 @@ +using JetBrains.Annotations; +using PostgresOutbox.Subscriptions.Replication; +using static PostgresOutbox.Subscriptions.Management.PublicationManagement; +using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; + +namespace PostgresOutbox.Subscriptions; + +internal interface ISubscriptionOptions +{ + [UsedImplicitly] string ConnectionString { get; } + IReplicationDataMapper DataMapper { get; } + PublicationSetupOptions PublicationOptions { get; } + [UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; } + [UsedImplicitly] IErrorProcessor ErrorProcessor { get; } + + void Deconstruct( + out string connectionString, + out PublicationSetupOptions publicationSetupOptions, + out ReplicationSlotSetupOptions replicationSlotSetupOptions, + out IErrorProcessor errorProcessor, + out IReplicationDataMapper dataMapper, + out Dictionary registry); +} + +internal record SubscriptionOptions( + string ConnectionString, + PublicationSetupOptions PublicationOptions, + ReplicationSlotSetupOptions ReplicationOptions, + IErrorProcessor ErrorProcessor, + IReplicationDataMapper DataMapper, + Dictionary Registry): ISubscriptionOptions; diff --git a/PostgresOutbox/Subscriptions/Management/PublicationManagement.cs b/src/PostgresOutbox/Subscriptions/Management/PublicationManagement.cs similarity index 57% rename from PostgresOutbox/Subscriptions/Management/PublicationManagement.cs rename to src/PostgresOutbox/Subscriptions/Management/PublicationManagement.cs index 54f650e..16707f3 100644 --- a/PostgresOutbox/Subscriptions/Management/PublicationManagement.cs +++ b/src/PostgresOutbox/Subscriptions/Management/PublicationManagement.cs @@ -1,5 +1,6 @@ using Npgsql; using PostgresOutbox.Database; +#pragma warning disable CA2208 namespace PostgresOutbox.Subscriptions.Management; @@ -15,24 +16,43 @@ CancellationToken ct { var (publicationName, tableName, createStyle, shouldReAddTablesIfWereRecreated) = options; - if (createStyle == CreateStyle.Never) - return new None(); - - if (createStyle == CreateStyle.AlwaysRecreate) + return createStyle switch + { + CreateStyle.Never => new None(), + CreateStyle.AlwaysRecreate => await ReCreate(dataSource, publicationName, tableName, ct), + CreateStyle.WhenNotExists when await dataSource.PublicationExists(publicationName, ct) => await Refresh(dataSource, publicationName, tableName, shouldReAddTablesIfWereRecreated, ct), + CreateStyle.WhenNotExists => await Create(dataSource, publicationName, tableName, ct), + _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) + }; + + static async Task ReCreate( + NpgsqlDataSource dataSource, + string publicationName, + string tableName, CancellationToken ct) + { await dataSource.DropPublication(publicationName, ct); + return await Create(dataSource, publicationName, tableName, ct); + } - if (createStyle == CreateStyle.WhenNotExists - && await dataSource.PublicationExists(publicationName, ct)) + static async Task Create( + NpgsqlDataSource dataSource, + string publicationName, + string tableName, CancellationToken ct) { - if (shouldReAddTablesIfWereRecreated) - await dataSource.RefreshPublicationTables(publicationName, tableName, ct); + await dataSource.CreatePublication(publicationName, tableName, ct); + return new Created(); + } + static async Task Refresh(NpgsqlDataSource dataSource, + string publicationName, + string tableName, + bool shouldReAddTablesIfWereRecreated, + CancellationToken ct) + { + if(shouldReAddTablesIfWereRecreated) + await dataSource.RefreshPublicationTables(publicationName, tableName, ct); return new AlreadyExists(); } - - await dataSource.CreatePublication(publicationName, tableName, ct); - - return new Created(); } private static Task CreatePublication( @@ -41,7 +61,7 @@ private static Task CreatePublication( string tableName, CancellationToken ct ) => - dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName};", ct); + dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct); private static Task DropPublication( this NpgsqlDataSource dataSource, @@ -80,7 +100,7 @@ private static Task PublicationExists( string publicationName, CancellationToken ct ) => - dataSource.Exists("pg_publication", "pubname = $1", new object[] { publicationName }, ct); + dataSource.Exists("pg_publication", "pubname = $1", [publicationName], ct); public abstract record SetupPublicationResult { @@ -91,10 +111,13 @@ public record AlreadyExists: SetupPublicationResult; public record Created: SetupPublicationResult; } - public record PublicationSetupOptions( - string PublicationName, - string TableName, + public sealed record PublicationSetupOptions( + string PublicationName = "pub", + string TableName = PublicationSetupOptions.DefaultTableName, CreateStyle CreateStyle = CreateStyle.WhenNotExists, bool ShouldReAddTablesIfWereRecreated = false - ); + ) + { + internal const string DefaultTableName = "outbox"; + } } diff --git a/src/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs b/src/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs new file mode 100644 index 0000000..a1d8c3d --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/Management/ReplicationSlotManagement.cs @@ -0,0 +1,76 @@ +using Npgsql; +using Npgsql.Replication; +using NpgsqlTypes; +using PostgresOutbox.Database; + +namespace PostgresOutbox.Subscriptions.Management; +using static ReplicationSlotManagement.CreateReplicationSlotResult; + +public static class ReplicationSlotManagement +{ + #pragma warning disable CA2208 + public static async Task SetupReplicationSlot( + this NpgsqlDataSource dataSource, + LogicalReplicationConnection connection, + ReplicationSlotSetupOptions options, + CancellationToken ct + ) + { + var (slotName, createStyle, _) = options; + + return (createStyle, await dataSource.ReplicationSlotExists(slotName, ct)) switch + { + (CreateStyle.Never,_) => new None(), + (CreateStyle.WhenNotExists,true) => new AlreadyExists(), + (CreateStyle.WhenNotExists,false) => await Create(connection, slotName, ct), + (CreateStyle.AlwaysRecreate,true) => await ReCreate(connection, slotName, ct), + (CreateStyle.AlwaysRecreate, false) => await Create(connection, slotName, ct), + + _ => throw new ArgumentOutOfRangeException(nameof(options.CreateStyle)) + }; + + static async Task ReCreate( + LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + await connection.DropReplicationSlot(slotName, true, ct); + return await Create(connection, slotName, ct); + } + + static async Task Create( + LogicalReplicationConnection connection, + string slotName, + CancellationToken ct) + { + var result = await connection.CreatePgOutputReplicationSlot( + slotName, + slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, + cancellationToken: ct + ); + + return new Created(result.SnapshotName!, result.ConsistentPoint); + } + } + + private static Task ReplicationSlotExists( + this NpgsqlDataSource dataSource, + string slotName, + CancellationToken ct + ) => dataSource.Exists("pg_replication_slots", "slot_name = $1", [slotName], ct); + + public record ReplicationSlotSetupOptions( + string SlotName = $"{PublicationManagement.PublicationSetupOptions.DefaultTableName}_slot", + CreateStyle CreateStyle = CreateStyle.WhenNotExists, + bool Binary = false //https://www.postgresql.org/docs/current/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-WITH-BINARY + ); + + public abstract record CreateReplicationSlotResult + { + public record None: CreateReplicationSlotResult; + + public record AlreadyExists: CreateReplicationSlotResult; + + public record Created(string SnapshotName, NpgsqlLogSequenceNumber LogSequenceNumber): CreateReplicationSlotResult; + } +} diff --git a/src/PostgresOutbox/Subscriptions/Replication/IReplicationDataMapper.cs b/src/PostgresOutbox/Subscriptions/Replication/IReplicationDataMapper.cs new file mode 100644 index 0000000..f89883c --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/Replication/IReplicationDataMapper.cs @@ -0,0 +1,12 @@ +using Npgsql; +using Npgsql.Replication.PgOutput.Messages; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; + +namespace PostgresOutbox.Subscriptions.Replication; + +public interface IReplicationDataMapper +{ + Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct); + + Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct); +} diff --git a/src/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs b/src/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs new file mode 100644 index 0000000..6bc4070 --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/Replication/ReplicationDataMapper.cs @@ -0,0 +1,68 @@ +using Npgsql; +using Npgsql.Replication.PgOutput.Messages; +using PostgresOutbox.Serialization; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; +using System.Text.Json; +using Npgsql.Replication.PgOutput; + +namespace PostgresOutbox.Subscriptions.Replication; + +internal sealed class ReplicationDataMapper(ITypeResolver resolver): IReplicationDataMapper +{ + public async Task ReadFromReplication(InsertMessage insertMessage, CancellationToken ct) + { + var id = string.Empty; + var columnNumber = 0; + var typeName = string.Empty; + await foreach (var column in insertMessage.NewRow) + { + try + { + switch (columnNumber) + { + case 0: + id = column.Kind == TupleDataKind.BinaryValue + ? (await column.Get(ct)).ToString() + : await column.Get(ct); + break; + case 1: + using (var textReader = column.GetTextReader()) + { + typeName = await textReader.ReadToEndAsync(ct); + break; + } + case 2 when column.GetDataTypeName().Equals("jsonb", StringComparison.OrdinalIgnoreCase): + { + var type = resolver.Resolve(typeName); + ArgumentNullException.ThrowIfNull(type, typeName); + return new OkEnvelope(await JsonSerialization.FromJsonAsync(type, column.GetStream(), resolver.SerializationContext, ct)); + } + } + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) + { + return new KoEnvelope(ex,id); + } + columnNumber++; + } + throw new InvalidOperationException("You should not get here"); + } + + public async Task ReadFromSnapshot(NpgsqlDataReader reader, CancellationToken ct) + { + long id = default; + try + { + id = reader.GetInt64(0); + var eventTypeName = reader.GetString(1); + var eventType = resolver.Resolve(eventTypeName); + ArgumentNullException.ThrowIfNull(eventType, eventTypeName); + await using var stream = await reader.GetStreamAsync(2, ct); + return new OkEnvelope(await JsonSerialization.FromJsonAsync(eventType, stream, resolver.SerializationContext, ct)); + } + catch (Exception ex) when (ex is ArgumentException or NotSupportedException or InvalidOperationException or JsonException) + { + return new KoEnvelope(ex, id.ToString()); + } + } +} diff --git a/src/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/Envelope.cs b/src/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/Envelope.cs new file mode 100644 index 0000000..944260f --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/ReplicationMessageHandlers/Envelope.cs @@ -0,0 +1,7 @@ +namespace PostgresOutbox.Subscriptions.ReplicationMessageHandlers; + +public interface IEnvelope; + +public sealed record OkEnvelope(object Value): IEnvelope; + +public sealed record KoEnvelope(Exception Error, string Id): IEnvelope; diff --git a/PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs b/src/PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs similarity index 75% rename from PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs rename to src/PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs index 0a23fa3..28af9fb 100644 --- a/PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs +++ b/src/PostgresOutbox/Subscriptions/SnapshotReader/SnapshotReader.cs @@ -1,13 +1,14 @@ -using System.Runtime.CompilerServices; +using System.Runtime.CompilerServices; using Npgsql; using PostgresOutbox.Database; using PostgresOutbox.Subscriptions.Replication; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; namespace PostgresOutbox.Subscriptions.SnapshotReader; public static class SnapshotReader { - public static async IAsyncEnumerable GetRowsFromSnapshot( + internal static async IAsyncEnumerable GetRowsFromSnapshot( this NpgsqlConnection connection, string snapshotName, string tableName, @@ -16,8 +17,6 @@ public static async IAsyncEnumerable GetRowsFromSnapshot( ) { await foreach (var @event in connection.QueryTransactionSnapshot(snapshotName, tableName, dataMapper, ct)) - { yield return @event; - } } } diff --git a/src/PostgresOutbox/Subscriptions/Subscription.cs b/src/PostgresOutbox/Subscriptions/Subscription.cs new file mode 100644 index 0000000..9a0d016 --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/Subscription.cs @@ -0,0 +1,160 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; +using Npgsql; +using Npgsql.Replication; +using Npgsql.Replication.PgOutput; +using Npgsql.Replication.PgOutput.Messages; +using PostgresOutbox.Database; +using PostgresOutbox.Subscriptions.Management; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; +using PostgresOutbox.Subscriptions.SnapshotReader; + +namespace PostgresOutbox.Subscriptions; + +using static PublicationManagement; +using static ReplicationSlotManagement; +using static ReplicationSlotManagement.CreateReplicationSlotResult; + +public interface ISubscription +{ + IAsyncEnumerable Subscribe(Func builder, + ILoggerFactory? loggerFactory, + CancellationToken ct); +} + +public enum CreateStyle +{ + WhenNotExists, + AlwaysRecreate, + Never +} + +public sealed class Subscription: ISubscription, IAsyncDisposable +{ + private static LogicalReplicationConnection? _connection; + private static readonly SubscriptionOptionsBuilder Builder = new(); + private ISubscriptionOptions? _options; + public async IAsyncEnumerable Subscribe( + Func builder, + ILoggerFactory? loggerFactory = null, + [EnumeratorCancellation] CancellationToken ct = default + ) + { + _options = builder(Builder).Build(); + var (connectionString, publicationSetupOptions, slotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options; + var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString); + dataSourceBuilder.UseLoggerFactory(loggerFactory); + + var dataSource = dataSourceBuilder.Build(); + await dataSource.EnsureTableExists(publicationSetupOptions.TableName, ct); + + _connection = new LogicalReplicationConnection(connectionString); + await _connection.Open(ct); + + + await dataSource.SetupPublication(publicationSetupOptions, ct); + var result = await dataSource.SetupReplicationSlot(_connection, slotSetupOptions, ct); + + PgOutputReplicationSlot slot; + + if (result is not Created created) + { + slot = new PgOutputReplicationSlot(slotSetupOptions.SlotName); + } + else + { + slot = new PgOutputReplicationSlot( + new ReplicationSlotOptions( + slotSetupOptions.SlotName, + created.LogSequenceNumber + ) + ); + + await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct)) + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct)) + yield return subscribe; + } + + await foreach (var message in + _connection.StartReplication(slot, + new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, slotSetupOptions.Binary), ct)) + { + if (message is InsertMessage insertMessage) + { + var envelope = await replicationDataMapper.ReadFromReplication(insertMessage, ct); + await foreach (var subscribe in ProcessEnvelope(envelope, registry, errorProcessor).WithCancellation(ct)) + yield return subscribe; + } + // Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually + // so that Npgsql can inform the server which WAL files can be removed/recycled. + _connection.SetReplicationStatus(message.WalEnd); + await _connection.SendStatusUpdate(ct); + } + } + + private static async IAsyncEnumerable ProcessEnvelope( + IEnvelope envelope, + Dictionary registry, + IErrorProcessor errorProcessor + ) where T:class + { + switch (envelope) + { + case KoEnvelope error: + await errorProcessor.Process(error.Error); + yield break; + case OkEnvelope okEnvelope: + { + var obj = okEnvelope.Value; + var objType = obj.GetType(); + var (consumer, methodInfo) = Memoize(registry, objType, Consumer); + await ((Task)methodInfo.Invoke(consumer, [obj])!).ConfigureAwait(false); + yield return (T)envelope; + yield break; + } + } + } + + private static readonly Dictionary Cache = []; + + + private static (IConsume consumer, MethodInfo methodInfo) Memoize + ( + Dictionary registry, + Type objType, + Func, Type, (IConsume consumer, MethodInfo methodInfo)> func + ) + { + if (!Cache.TryGetValue(objType, out var entry)) + entry = func(registry, objType); + Cache[objType] = entry; + return entry; + } + private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionary registry, Type objType) + { + var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); + var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public); + var methodInfo = methodInfos?.SingleOrDefault(mi=>mi.GetParameters().Any(pa => pa.ParameterType == objType)) + ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}"); + return (consumer, methodInfo); + } + + private static async IAsyncEnumerable ReadExistingRowsFromSnapshot( + NpgsqlDataSource dataSource, + string snapshotName, + ISubscriptionOptions options, + [EnumeratorCancellation] CancellationToken ct = default + ) + { + await using var connection = await dataSource.OpenConnectionAsync(ct); + await foreach (var row in connection.GetRowsFromSnapshot( + snapshotName, + options.PublicationOptions.TableName, + options.DataMapper, + ct)) + yield return row; + } + + public async ValueTask DisposeAsync() => await _connection!.DisposeAsync(); +} diff --git a/src/PostgresOutbox/Subscriptions/SubscriptionOptionsBuilder.cs b/src/PostgresOutbox/Subscriptions/SubscriptionOptionsBuilder.cs new file mode 100644 index 0000000..edd776c --- /dev/null +++ b/src/PostgresOutbox/Subscriptions/SubscriptionOptionsBuilder.cs @@ -0,0 +1,106 @@ +using JetBrains.Annotations; +using PostgresOutbox.Serialization; +using PostgresOutbox.Subscriptions.Management; +using PostgresOutbox.Subscriptions.Replication; + +namespace PostgresOutbox.Subscriptions; + +public sealed class SubscriptionOptionsBuilder +{ + private static string? _connectionString; + private static PublicationManagement.PublicationSetupOptions? _publicationSetupOptions; + private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _slotOptions; + private static IReplicationDataMapper? _dataMapper; + + static SubscriptionOptionsBuilder() + { + _connectionString = null; + _publicationSetupOptions = default; + _slotOptions = default; + _dataMapper = default; + } + + public SubscriptionOptionsBuilder ConnectionString(string connectionString) + { + _connectionString = connectionString; + return this; + } + + public SubscriptionOptionsBuilder TypeResolver(ITypeResolver resolver) + { + _dataMapper = new ReplicationDataMapper(resolver); + return this; + } + + [UsedImplicitly] + public SubscriptionOptionsBuilder WithMapper(IReplicationDataMapper dataMapper) + { + _dataMapper = dataMapper; + return this; + } + + [UsedImplicitly] + public SubscriptionOptionsBuilder WithPublicationOptions(PublicationManagement.PublicationSetupOptions publicationSetupOptions) + { + _publicationSetupOptions = publicationSetupOptions; + return this; + } + + [UsedImplicitly] + public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotOptions) + { + _slotOptions = replicationSlotOptions; + return this; + } + + private readonly Dictionary _registry = []; + private IErrorProcessor? _errorProcessor; + + public SubscriptionOptionsBuilder Consumes(TU consumer) where T : class + where TU : class, IConsumes + { + _registry.TryAdd(typeof(T), consumer); + return this; + } + + public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProcessor) + { + _errorProcessor = errorProcessor; + return this; + } + + internal ISubscriptionOptions Build() + { + ArgumentNullException.ThrowIfNull(_connectionString); + ArgumentNullException.ThrowIfNull(_dataMapper); + if(_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer()); + + return new SubscriptionOptions( + _connectionString, + _publicationSetupOptions ?? new PublicationManagement.PublicationSetupOptions(), + _slotOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(), + _errorProcessor ?? new ConsoleOutErrorProcessor(), + _dataMapper, + _registry); + } +} + +public class ObjectTracingConsumer: IConsumes +{ + private static ulong _counter = 0; + public Task Handle(object value) + { + Interlocked.Increment(ref _counter); + return Console.Out.WriteLineAsync(); + } +} + +public interface IErrorProcessor +{ + Func Process { get; } +} + +public record ConsoleOutErrorProcessor: IErrorProcessor +{ + public Func Process => exception => Console.Out.WriteLineAsync($"record id:{0} resulted in error:{exception.Message}"); +} diff --git a/src/PostgresOutbox/Table/MessageAppender.cs b/src/PostgresOutbox/Table/MessageAppender.cs new file mode 100644 index 0000000..c9b4e01 --- /dev/null +++ b/src/PostgresOutbox/Table/MessageAppender.cs @@ -0,0 +1,80 @@ +using System.Collections; +using Npgsql; +using PostgresOutbox.Serialization; + +namespace PostgresOutbox.Table; + +public static class MessageAppender +{ + public static async Task AppendAsync(string tableName, T @event, ITypeResolver resolver, string connectionString, CancellationToken ct) + where T: class + { + var type = typeof(T); + var (typeName, jsonTypeInfo) = resolver.Resolve(type); + var data = JsonSerialization.ToJson(@event, jsonTypeInfo); + + await using var connection = new NpgsqlConnection(connectionString); + await connection.OpenAsync(ct); + var command = connection.CreateCommand(); + command.CommandText = $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; + await command.ExecuteNonQueryAsync(ct); + } + + public static async Task AppendAsync(string tableName, T @input, ITypeResolver resolver, NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken ct) + where T : class + { + switch (@input) + { + case null: + throw new ArgumentNullException(nameof(@input)); + case IEnumerable inputs: + await AppendBatchAsyncOfT(tableName, inputs, resolver, connection, transaction, ct); + break; + default: + await AppendAsyncOfT(tableName, input, resolver, connection, transaction, ct); + break; + } + } + + private static async Task AppendBatchAsyncOfT( + string tableName + , T inputs + , ITypeResolver resolver + , NpgsqlConnection connection + , NpgsqlTransaction transaction + , CancellationToken ct) where T : class, IEnumerable + { + var batch = new NpgsqlBatch(connection, transaction); + foreach (var input in inputs) + { + var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType()); + var batchCommand = batch.CreateBatchCommand(); + var data = JsonSerialization.ToJson(input, jsonTypeInfo); + + + batchCommand.CommandText = + $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')"; + batch.BatchCommands.Add(batchCommand); + } + await batch.ExecuteNonQueryAsync(ct); + } + + private static async Task AppendAsyncOfT( + string tableName + , T @input + , ITypeResolver resolver + , NpgsqlConnection connection + , NpgsqlTransaction transaction + , CancellationToken ct) where T : class + { + + var (typeName, jsonTypeInfo) = resolver.Resolve(typeof(T)); + var data = JsonSerialization.ToJson(@input, jsonTypeInfo); + var command = new NpgsqlCommand( + $"INSERT INTO {tableName}(message_type, data) values ('{typeName}', '{data}')", + connection, + transaction + ); + await command.ExecuteNonQueryAsync(ct); + } +} diff --git a/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs similarity index 98% rename from PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs rename to src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs index 470c11b..93195a2 100644 --- a/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs +++ b/src/PostgresOutboxPatternWithCDC.NET.Tests/LogicalReplicationTest.cs @@ -1,8 +1,8 @@ +using Commons.Events; using PostgresOutbox.Events; using PostgresOutbox.Serialization; using PostgresOutbox.Subscriptions; using PostgresOutbox.Subscriptions.Replication; -using PostgresOutboxPatternWithCDC.NET.Tests.Events; using Xunit.Abstractions; using static PostgresOutbox.Subscriptions.Management.PublicationManagement; using static PostgresOutbox.Subscriptions.Management.ReplicationSlotManagement; diff --git a/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj b/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj similarity index 63% rename from PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj rename to src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj index 56a9f81..9e0d56c 100644 --- a/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj +++ b/src/PostgresOutboxPatternWithCDC.NET.Tests/PostgresOutboxPatternWithCDC.NET.Tests.csproj @@ -1,23 +1,24 @@ - net7.0 + net8.0 - - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all - - runtime; build; native; contentfiles; analyzers; buildtransitive - all + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs b/src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs similarity index 100% rename from PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs rename to src/PostgresOutboxPatternWithCDC.NET.Tests/Usings.cs diff --git a/src/Publisher/Contracts.cs b/src/Publisher/Contracts.cs new file mode 100644 index 0000000..a4e2ef0 --- /dev/null +++ b/src/Publisher/Contracts.cs @@ -0,0 +1,35 @@ +using PostgresOutbox.Serialization; +using System.Text.Json.Serialization.Metadata; +using System.Text.Json.Serialization; + +namespace Publisher; + +[MessageUrn("user-created:v1")] +internal record UserCreated( + Guid Id, + string Name +); + +[MessageUrn("user-deleted:v1")] +internal record UserDeleted( + Guid Id, + string Name +); + +[JsonSourceGenerationOptions(WriteIndented = true)] +[JsonSerializable(typeof(UserCreated))] +[JsonSerializable(typeof(UserDeleted))] +internal partial class SourceGenerationContext: JsonSerializerContext +{ +} + +internal class PublisherTypesResolver: ITypeResolver +{ + private readonly TypeResolver _inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) + .WhiteList() + .WhiteList(); + + public Type Resolve(string value) => _inner.Resolve(value); + public (string, JsonTypeInfo) Resolve(Type type) => _inner.Resolve(type); + public JsonSerializerContext SerializationContext => _inner.SerializationContext; +} diff --git a/src/Publisher/Program.cs b/src/Publisher/Program.cs new file mode 100644 index 0000000..36e3384 --- /dev/null +++ b/src/Publisher/Program.cs @@ -0,0 +1,74 @@ +using Commons; +using Npgsql; +using PostgresOutbox.Table; +using Publisher; +using UserCreated = Publisher.UserCreated; +using UserDeleted = Publisher.UserDeleted; + +Console.Title = typeof(Program).Assembly.GetName().Name!; +Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):"); + +var resolver = new PublisherTypesResolver(); + +do +{ + + var line = Console.ReadLine(); + if (line != null && int.TryParse(line, out var result)) + { + var cts = new CancellationTokenSource(); + + var ct = cts.Token; + await using var connection = new NpgsqlConnection(Settings.ConnectionString); + await connection.OpenAsync(ct); + //use a command for each message + { + var @events = Enumerable.Range(0, result).Select(i => + int.IsEvenInteger(i) + ? new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()) as object + : new UserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString())); + foreach (var @event in @events) + { + var transaction = await connection.BeginTransactionAsync(ct); + try + { + switch (@event) + { + case UserCreated c: + await MessageAppender.AppendAsync("outbox", c, resolver, connection, transaction, ct); + break; + case UserDeleted d: + await MessageAppender.AppendAsync("outbox", d, resolver, connection, transaction, ct); + break; + } + + await transaction.CommitAsync(ct); + } + catch (Exception e) + { + await transaction.RollbackAsync(ct); + Console.WriteLine(e); + throw; + } + } + } + //use a batch command + //{ + // var transaction = await connection.BeginTransactionAsync(ct); + // try + // { + // var @events = Enumerable.Range(0, result) + // .Select(i1 => new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString())); + // await EventsAppender.AppendAsync("outbox", @events, resolver, connection, transaction, ct); + // } + // catch (Exception e) + // { + // Console.WriteLine(e); + // throw; + // } + //} + } +} while (true); + + + diff --git a/src/Publisher/Publisher.csproj b/src/Publisher/Publisher.csproj new file mode 100644 index 0000000..a0aeff5 --- /dev/null +++ b/src/Publisher/Publisher.csproj @@ -0,0 +1,15 @@ + + + + net8.0 + enable + enable + Exe + + + + + + + + diff --git a/src/Subscriber/Contracts.cs b/src/Subscriber/Contracts.cs new file mode 100644 index 0000000..5038759 --- /dev/null +++ b/src/Subscriber/Contracts.cs @@ -0,0 +1,36 @@ +using PostgresOutbox.Serialization; +using System.Text.Json.Serialization; +using System.Text.Json.Serialization.Metadata; + +namespace Subscriber +{ + [MessageUrn("user-created:v1")] + public record UserCreatedContract( + Guid Id, + string Name + ); + + [MessageUrn("user-deleted:v1")] + public record UserDeletedContract( + Guid Id, + string Name + ); + + [JsonSourceGenerationOptions(WriteIndented = true)] + [JsonSerializable(typeof(UserCreatedContract))] + [JsonSerializable(typeof(UserDeletedContract))] + internal partial class SourceGenerationContext: JsonSerializerContext + { + } + + internal class SubscriberTypesResolver: ITypeResolver + { + private readonly TypeResolver _inner = new TypeResolver(SourceGenerationContext.Default, new AttributeNamingPolicy()) + .WhiteList() + .WhiteList(); + + public Type Resolve(string value) => _inner.Resolve(value); + public (string, JsonTypeInfo) Resolve(Type type) => _inner.Resolve(type); + public JsonSerializerContext SerializationContext => _inner.SerializationContext; + } +} diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs new file mode 100644 index 0000000..bd265be --- /dev/null +++ b/src/Subscriber/Program.cs @@ -0,0 +1,44 @@ +using Commons; +using Microsoft.Extensions.Logging; +using PostgresOutbox.Serialization; +using PostgresOutbox.Subscriptions; +using Subscriber; + +#pragma warning disable CS8601 // Possible null reference assignment. +Console.Title = typeof(Program).Assembly.GetName().Name; +#pragma warning restore CS8601 // Possible null reference assignment. +var cancellationTokenSource = new CancellationTokenSource(); + + +AppDomain.CurrentDomain.UnhandledException += (_, e) => Console.Out.WriteLine(e.ExceptionObject.ToString()); +TaskScheduler.UnobservedTaskException += (_,e) => Console.Out.WriteLine(e.Exception.ToString()); + +var ct = cancellationTokenSource.Token; +var consumer = new Consumer(); +await using var subscription = new Subscription(); + +try +{ + await using var cursor = subscription.Subscribe( + builder => builder + .ConnectionString(Settings.ConnectionString) + .TypeResolver(new SubscriberTypesResolver()) + .Consumes(consumer) + .Consumes(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct + ).GetAsyncEnumerator(ct); + while (await cursor.MoveNextAsync() && !ct.IsCancellationRequested); +} +catch (Exception e) +{ + Console.WriteLine(e); +} + +Console.ReadKey(); + +internal class Consumer: + IConsumes, + IConsumes +{ + public Task Handle(UserCreatedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserCreatedContract)); + public Task Handle(UserDeletedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserDeletedContract)); +} diff --git a/PostgresOutbox.Console/PostgresOutbox.Console.csproj b/src/Subscriber/Subscriber.csproj similarity index 53% rename from PostgresOutbox.Console/PostgresOutbox.Console.csproj rename to src/Subscriber/Subscriber.csproj index 04a178a..c349e1c 100644 --- a/PostgresOutbox.Console/PostgresOutbox.Console.csproj +++ b/src/Subscriber/Subscriber.csproj @@ -2,12 +2,18 @@ Exe - net7.0 + net8.0 enable enable + true + + + + + diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs new file mode 100644 index 0000000..05c89e5 --- /dev/null +++ b/src/Tests/DatabaseFixture.cs @@ -0,0 +1,81 @@ +using System.Text.Json.Serialization.Metadata; +using Npgsql; +using PostgresOutbox.Database; +using PostgresOutbox.Serialization; +using PostgresOutbox.Subscriptions; +using PostgresOutbox.Subscriptions.Management; +using Testcontainers.PostgreSql; + +namespace Tests; + +public abstract class DatabaseFixture: IAsyncLifetime +{ + protected class TestConsumer(Action log, JsonTypeInfo info): IConsumes where T : class + { + public async Task Handle(T value) + { + try + { + log(JsonSerialization.ToJson(value, info)); + } + catch (Exception e) + { + Console.WriteLine(e); + } + await Task.CompletedTask; + } + } + + protected readonly PostgreSqlContainer Container = new PostgreSqlBuilder() + .WithCommand("-c", "wal_level=logical") + .Build(); + + public Task InitializeAsync() + { + return Container.StartAsync(); + } + + public async Task DisposeAsync() + { + await Container.DisposeAsync(); + } + + protected static async Task CreateOutboxTable( + NpgsqlDataSource dataSource, + CancellationToken ct + ) + { + var tableName = Randomise("outbox"); + + await dataSource.EnsureTableExists(tableName, ct); + + return tableName; + } + + private static string Randomise(string prefix) => + $"{prefix}_{Guid.NewGuid().ToString().Replace("-", "")}"; + + protected static (TypeResolver typeResolver, TestConsumer consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor( + string connectionString, + string eventsTable, + JsonTypeInfo info, + Action log, + string? publicationName = null, + string? slotName = null) where T : class + { + var typeResolver = new TypeResolver(SourceGenerationContext.Default).WhiteList(); + var consumer = new TestConsumer(log, info); + var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder() + .ConnectionString(connectionString) + .TypeResolver(typeResolver) + .Consumes>(consumer) + .WithPublicationOptions( + new PublicationManagement.PublicationSetupOptions(publicationName ?? Randomise("events_pub"), eventsTable) + ) + .WithReplicationOptions( + new ReplicationSlotManagement.ReplicationSlotSetupOptions(slotName ?? Randomise("events_slot")) + ); + return (typeResolver, consumer, subscriptionOptionsBuilder); + } + +} diff --git a/src/Tests/SourceGenerationContext.cs b/src/Tests/SourceGenerationContext.cs new file mode 100644 index 0000000..9b91127 --- /dev/null +++ b/src/Tests/SourceGenerationContext.cs @@ -0,0 +1,21 @@ +using System.Text.Json.Serialization; +using PostgresOutbox.Serialization; + +namespace Tests; + +[MessageUrn("user-created:v1")] +internal record UserCreated( + Guid Id, + string Name +); + +[MessageUrn("user-deleted:v1")] +internal record UserDeleted( + Guid Id, + string Name +); + +[JsonSourceGenerationOptions(WriteIndented = true)] +[JsonSerializable(typeof(UserCreated))] +[JsonSerializable(typeof(UserDeleted))] +internal partial class SourceGenerationContext: JsonSerializerContext{} diff --git a/src/Tests/Tests.csproj b/src/Tests/Tests.csproj new file mode 100644 index 0000000..f85dc3a --- /dev/null +++ b/src/Tests/Tests.csproj @@ -0,0 +1,28 @@ + + + + net8.0 + enable + enable + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + diff --git a/src/Tests/Usings.cs b/src/Tests/Usings.cs new file mode 100644 index 0000000..8bd2293 --- /dev/null +++ b/src/Tests/Usings.cs @@ -0,0 +1,3 @@ +global using Xunit; + +[assembly: CollectionBehavior(DisableTestParallelization = true)] diff --git a/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs new file mode 100644 index 0000000..90742d1 --- /dev/null +++ b/src/Tests/When_First_Subscription_And_Table_Is_Empty.cs @@ -0,0 +1,33 @@ +using Npgsql; +using PostgresOutbox.Subscriptions; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; +using PostgresOutbox.Table; +using Xunit.Abstractions; + +namespace Tests; + +// ReSharper disable once InconsistentNaming +public class When_First_Subscription_And_Table_Is_Empty(ITestOutputHelper testOutputHelper): DatabaseFixture +{ + [Fact] + public async Task Execute() + { + var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var ct = cancellationTokenSource.Token; + + var connectionString = Container.GetConnectionString(); + var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + + var (typeResolver, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, + SourceGenerationContext.Default.UserCreated, testOutputHelper.WriteLine); + await using var subscription = new Subscription(); + + var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); + await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + { + Assert.Equal(@event, ((OkEnvelope)envelope).Value); + return; + } + } +} diff --git a/src/Tests/When_Subscription_Already_Exists.cs b/src/Tests/When_Subscription_Already_Exists.cs new file mode 100644 index 0000000..e9539ef --- /dev/null +++ b/src/Tests/When_Subscription_Already_Exists.cs @@ -0,0 +1,58 @@ +using Npgsql; +using Npgsql.Replication; +using PostgresOutbox.Database; +using PostgresOutbox.Subscriptions; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; +using PostgresOutbox.Table; +using Xunit.Abstractions; + +namespace Tests; + +// ReSharper disable once InconsistentNaming +public class When_Subscription_Already_Exists(ITestOutputHelper testOutputHelper): DatabaseFixture +{ + [Fact] + public async Task Execute() + { + var cancellationTokenSource = new CancellationTokenSource(); + var ct = cancellationTokenSource.Token; + + var connectionString = Container.GetConnectionString(); + var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + var slotName = "subscription_test"; + var publicationName = "publication_test"; + await SetupReplication(connectionString, slotName, publicationName, eventsTable, ct); + + + var (typeResolver, testConsumer, subscriptionOptions) = SetupFor(connectionString, eventsTable, + SourceGenerationContext.Default.UserCreated, testOutputHelper.WriteLine, publicationName, slotName); + + var @event = new UserCreated(Guid.NewGuid(), Guid.NewGuid().ToString()); + await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); + + await using var subscription = new Subscription(); + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + { + Assert.Equal(@event, ((OkEnvelope)envelope).Value); + return; + } + } + + private static async Task SetupReplication( + string connectionString, + string slotName, + string publicationName, + string tableName, + CancellationToken ct) + { + await using var dataSource = NpgsqlDataSource.Create(connectionString); + await dataSource.Execute($"CREATE PUBLICATION {publicationName} FOR TABLE {tableName} WITH (publish = 'insert');", ct); + await using var connection = new LogicalReplicationConnection(connectionString); + await connection.Open(ct); + await connection.CreatePgOutputReplicationSlot( + slotName, + slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, + cancellationToken: ct + ); + } +} diff --git a/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs new file mode 100644 index 0000000..3481f35 --- /dev/null +++ b/src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs @@ -0,0 +1,38 @@ +using Npgsql; +using PostgresOutbox.Serialization; +using PostgresOutbox.Subscriptions; +using PostgresOutbox.Subscriptions.ReplicationMessageHandlers; +using PostgresOutbox.Table; +using Xunit.Abstractions; + +namespace Tests; + +// ReSharper disable once InconsistentNaming +public class When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty(ITestOutputHelper testOutputHelper) : DatabaseFixture +{ + [Fact] + public async Task Execute() + { + var cancellationTokenSource = new CancellationTokenSource(); + var ct = cancellationTokenSource.Token; + var connectionString = Container.GetConnectionString(); + var eventsTable = await CreateOutboxTable(NpgsqlDataSource.Create(connectionString), ct); + + + var @event = new UserDeleted(Guid.NewGuid(), Guid.NewGuid().ToString()); + var typeResolver = new TypeResolver(SourceGenerationContext.Default).WhiteList(); + + await MessageAppender.AppendAsync(eventsTable, @event, typeResolver, connectionString, ct); + + + var (_, testConsumer, subscriptionOptions) = + SetupFor(connectionString, eventsTable, SourceGenerationContext.Default.UserDeleted, testOutputHelper.WriteLine); + await using var subscription = new Subscription(); + + await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct)) + { + Assert.Equal(@event, ((OkEnvelope)envelope).Value); + return; + } + } +}