diff --git a/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs b/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs index 297f49e..c37179b 100644 --- a/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs +++ b/src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs @@ -20,24 +20,24 @@ internal OptionsBuilder NamingPolicy(INamingPolicy namingPolicy) public interface INamingOptionsContext { - OptionsBuilder NamingPolicy(INamingPolicy namingPolicy); + OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy); } internal class NamingOptionsContext(OptionsBuilder builder): INamingOptionsContext { - public OptionsBuilder NamingPolicy(INamingPolicy namingPolicy) + public OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy) => builder.NamingPolicy(namingPolicy); } public interface IConsumesTypedJsonOptionsContext { - INamingOptionsContext JsonContext(JsonSerializerContext jsonSerializerContext); + INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext); IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class; } internal class ConsumesTypedJsonTypedJsonOptionsContext(OptionsBuilder builder): IConsumesTypedJsonOptionsContext { - public INamingOptionsContext JsonContext(JsonSerializerContext jsonSerializerContext) + public INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext) { builder._jsonSerializerContext = jsonSerializerContext; return new NamingOptionsContext(builder); @@ -49,7 +49,7 @@ public IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) } } - public IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class + internal IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) where T : class { Ensure.Empty(_replicationDataMapperSelector, nameof(Consumes)); var methodInfo = handler @@ -61,4 +61,7 @@ public IConsumesTypedJsonOptionsContext Consumes(IMessageHandler handler) _typeRegistry.Add(typeof(T), new Tuple(handler, methodInfo)); return new ConsumesTypedJsonTypedJsonOptionsContext(this); } + + public OptionsBuilder Consumes(IMessageHandler handler, Func opts) where T : class + => opts(Consumes(handler)); } diff --git a/src/Subscriber/Program.cs b/src/Subscriber/Program.cs index 590931f..4fa5b27 100644 --- a/src/Subscriber/Program.cs +++ b/src/Subscriber/Program.cs @@ -44,9 +44,10 @@ .MessageType("message_type") .MessageData("data") ) - .Consumes(consumer) - .JsonContext(SourceGenerationContext.Default) - .NamingPolicy(new AttributeNamingPolicy()) + .Consumes(consumer, opts => + opts + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(new AttributeNamingPolicy())) .ConsumesRawString(consumer) .ConsumesRawObject(consumer); } diff --git a/src/SubscriberWorker/Program.cs b/src/SubscriberWorker/Program.cs index 129844e..dee6dca 100644 --- a/src/SubscriberWorker/Program.cs +++ b/src/SubscriberWorker/Program.cs @@ -63,32 +63,38 @@ subscriptionOptions .ConnectionString(Settings.ConnectionString) .DataSource(provider.GetRequiredService()) - .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl1)}_slot")) + .WithReplicationOptions( + new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl1)}_slot")) .WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl1)}_pub")) .WithErrorProcessor(provider.GetRequiredService()) - - .Consumes(provider.GetRequiredService>()) - .Consumes(provider.GetRequiredService>()) - .JsonContext(SourceGenerationContext.Default) - .NamingPolicy(provider.GetRequiredService()) - ) - .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) + + .Consumes(provider.GetRequiredService>(), opts => + opts + .Consumes(provider.GetRequiredService>()) + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(provider.GetRequiredService())) + ) + .ResiliencyPipeline( + provider.GetRequiredService>().GetPipeline("default")) ) .AddBlumchen((provider, workerOptions) => workerOptions .Subscription(subscriptionOptions => subscriptionOptions.ConnectionString(Settings.ConnectionString) .DataSource(provider.GetRequiredService()) - .WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl2)}_slot")) + .WithReplicationOptions( + new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl2)}_slot")) .WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl2)}_pub")) .WithErrorProcessor(provider.GetRequiredService()) - - .Consumes(provider.GetRequiredService>()) - .JsonContext(SourceGenerationContext.Default) - .NamingPolicy(provider.GetRequiredService()) - ) - .ResiliencyPipeline(provider.GetRequiredService>().GetPipeline("default")) - ); + + .Consumes(provider.GetRequiredService>(), opts => + opts + .WithJsonContext(SourceGenerationContext.Default) + .AndNamingPolicy(provider.GetRequiredService()) + )) + .ResiliencyPipeline( + provider.GetRequiredService>().GetPipeline("default")) + ); await builder .Build() diff --git a/src/Tests/DatabaseFixture.cs b/src/Tests/DatabaseFixture.cs index b6ad753..31f6cc4 100644 --- a/src/Tests/DatabaseFixture.cs +++ b/src/Tests/DatabaseFixture.cs @@ -85,13 +85,14 @@ protected OptionsBuilder SetupFor( .DataSource(new NpgsqlDataSourceBuilder(connectionString).Build()) .ConnectionString(connectionString) - .Consumes(consumer) - .JsonContext(info) - .NamingPolicy(namingPolicy) + .Consumes(consumer, opts => opts + .WithJsonContext(info) + .AndNamingPolicy(namingPolicy)) .WithTable(o => o.Named(eventsTable)) .WithPublicationOptions( - new PublicationManagement.PublicationOptions(PublicationName: publicationName ?? Randomise("events_pub")) + new PublicationManagement.PublicationOptions( + PublicationName: publicationName ?? Randomise("events_pub")) ) .WithReplicationOptions( new ReplicationSlotManagement.ReplicationSlotOptions(slotName ?? Randomise("events_slot"))