Skip to content

Commit

Permalink
embed ConsumeOptions with typed Consumes
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Aug 1, 2024
1 parent 8fd8041 commit e56c3c1
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
13 changes: 8 additions & 5 deletions src/Blumchen/Subscriber/OptionsBuilder.Consumes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ internal OptionsBuilder NamingPolicy(INamingPolicy namingPolicy)

public interface INamingOptionsContext
{
OptionsBuilder NamingPolicy(INamingPolicy namingPolicy);
OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy);
}

internal class NamingOptionsContext(OptionsBuilder builder): INamingOptionsContext
{
public OptionsBuilder NamingPolicy(INamingPolicy namingPolicy)
public OptionsBuilder AndNamingPolicy(INamingPolicy namingPolicy)
=> builder.NamingPolicy(namingPolicy);
}

public interface IConsumesTypedJsonOptionsContext
{
INamingOptionsContext JsonContext(JsonSerializerContext jsonSerializerContext);
INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext);
IConsumesTypedJsonOptionsContext Consumes<T>(IMessageHandler<T> handler) where T : class;
}

internal class ConsumesTypedJsonTypedJsonOptionsContext(OptionsBuilder builder): IConsumesTypedJsonOptionsContext
{
public INamingOptionsContext JsonContext(JsonSerializerContext jsonSerializerContext)
public INamingOptionsContext WithJsonContext(JsonSerializerContext jsonSerializerContext)
{
builder._jsonSerializerContext = jsonSerializerContext;
return new NamingOptionsContext(builder);
Expand All @@ -49,7 +49,7 @@ public IConsumesTypedJsonOptionsContext Consumes<T>(IMessageHandler<T> handler)
}
}

public IConsumesTypedJsonOptionsContext Consumes<T>(IMessageHandler<T> handler) where T : class
internal IConsumesTypedJsonOptionsContext Consumes<T>(IMessageHandler<T> handler) where T : class
{
Ensure.Empty(_replicationDataMapperSelector, nameof(Consumes));
var methodInfo = handler
Expand All @@ -61,4 +61,7 @@ public IConsumesTypedJsonOptionsContext Consumes<T>(IMessageHandler<T> handler)
_typeRegistry.Add(typeof(T), new Tuple<IMessageHandler, MethodInfo>(handler, methodInfo));
return new ConsumesTypedJsonTypedJsonOptionsContext(this);
}

public OptionsBuilder Consumes<T>(IMessageHandler<T> handler, Func<IConsumesTypedJsonOptionsContext, OptionsBuilder> opts) where T : class
=> opts(Consumes<T>(handler));
}
7 changes: 4 additions & 3 deletions src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@
.MessageType("message_type")
.MessageData("data")
)
.Consumes<UserCreatedContract>(consumer)
.JsonContext(SourceGenerationContext.Default)
.NamingPolicy(new AttributeNamingPolicy())
.Consumes<UserCreatedContract>(consumer, opts =>
opts
.WithJsonContext(SourceGenerationContext.Default)
.AndNamingPolicy(new AttributeNamingPolicy()))
.ConsumesRawString<MessageString>(consumer)
.ConsumesRawObject<MessageObjects>(consumer);
}
Expand Down
38 changes: 22 additions & 16 deletions src/SubscriberWorker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,38 @@
subscriptionOptions
.ConnectionString(Settings.ConnectionString)
.DataSource(provider.GetRequiredService<NpgsqlDataSource>())
.WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl1)}_slot"))
.WithReplicationOptions(
new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl1)}_slot"))
.WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl1)}_pub"))
.WithErrorProcessor(provider.GetRequiredService<IErrorProcessor>())

.Consumes(provider.GetRequiredService<IMessageHandler<UserCreatedContract>>())
.Consumes(provider.GetRequiredService<IMessageHandler<UserModifiedContract>>())
.JsonContext(SourceGenerationContext.Default)
.NamingPolicy(provider.GetRequiredService<INamingPolicy>())
)
.ResiliencyPipeline(provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))

.Consumes(provider.GetRequiredService<IMessageHandler<UserCreatedContract>>(), opts =>
opts
.Consumes(provider.GetRequiredService<IMessageHandler<UserModifiedContract>>())
.WithJsonContext(SourceGenerationContext.Default)
.AndNamingPolicy(provider.GetRequiredService<INamingPolicy>()))
)
.ResiliencyPipeline(
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))
)
.AddBlumchen<HandleImpl2>((provider, workerOptions) =>
workerOptions
.Subscription(subscriptionOptions =>
subscriptionOptions.ConnectionString(Settings.ConnectionString)
.DataSource(provider.GetRequiredService<NpgsqlDataSource>())
.WithReplicationOptions(new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl2)}_slot"))
.WithReplicationOptions(
new ReplicationSlotManagement.ReplicationSlotOptions($"{nameof(HandleImpl2)}_slot"))
.WithPublicationOptions(new PublicationManagement.PublicationOptions($"{nameof(HandleImpl2)}_pub"))
.WithErrorProcessor(provider.GetRequiredService<IErrorProcessor>())

.Consumes(provider.GetRequiredService<IMessageHandler<UserDeletedContract>>())
.JsonContext(SourceGenerationContext.Default)
.NamingPolicy(provider.GetRequiredService<INamingPolicy>())
)
.ResiliencyPipeline(provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))
);

.Consumes(provider.GetRequiredService<IMessageHandler<UserDeletedContract>>(), opts =>
opts
.WithJsonContext(SourceGenerationContext.Default)
.AndNamingPolicy(provider.GetRequiredService<INamingPolicy>())
))
.ResiliencyPipeline(
provider.GetRequiredService<ResiliencePipelineProvider<string>>().GetPipeline("default"))
);

await builder
.Build()
Expand Down
9 changes: 5 additions & 4 deletions src/Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ protected OptionsBuilder SetupFor<T>(
.DataSource(new NpgsqlDataSourceBuilder(connectionString).Build())
.ConnectionString(connectionString)

.Consumes(consumer)
.JsonContext(info)
.NamingPolicy(namingPolicy)
.Consumes(consumer, opts => opts
.WithJsonContext(info)
.AndNamingPolicy(namingPolicy))

.WithTable(o => o.Named(eventsTable))
.WithPublicationOptions(
new PublicationManagement.PublicationOptions(PublicationName: publicationName ?? Randomise("events_pub"))
new PublicationManagement.PublicationOptions(
PublicationName: publicationName ?? Randomise("events_pub"))
)
.WithReplicationOptions(
new ReplicationSlotManagement.ReplicationSlotOptions(slotName ?? Randomise("events_slot"))
Expand Down

0 comments on commit e56c3c1

Please sign in to comment.