Skip to content

Commit

Permalink
Test is passing. Need to find a better way to solve this
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 4, 2023
1 parent 3561fe6 commit 995719a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 18 deletions.
17 changes: 17 additions & 0 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace KafkaFlow.IntegrationTests
using AutoFixture;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using KafkaFlow.Consumers;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
Expand Down Expand Up @@ -141,5 +142,21 @@ await Task.WhenAll(
await MessageStorage.AssertMessageAsync(message);
}
}

[TestMethod]
public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndependently()
{
// Act
var consumers = this.provider.GetRequiredService<IConsumerAccessor>().All;

// Assert
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.AvroGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.GzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.PauseResumeGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId)));
}
}
}
34 changes: 25 additions & 9 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ internal static class Bootstrapper
public const string PauseResumeTopicName = "test-pause-resume";
public const int MaxPollIntervalMs = 7000;

internal const string ProtobufGroupId = "consumer-protobuf";
internal const string GzipGroupId = "consumer-gzip";
internal const string JsonGzipGroupId = "consumer-json-gzip";
internal const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
internal const string PauseResumeGroupId = "consumer-pause-resume";
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";

private const string ProtobufTopicName = "test-protobuf";
private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr";
private const string JsonSchemaRegistryTopicName = "test-json-sr";
Expand All @@ -35,12 +43,6 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";

private const string ProtobufGroupId = "consumer-protobuf";
private const string GzipGroupId = "consumer-gzip";
private const string JsonGzipGroupId = "consumer-json-gzip";
private const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
private const string PauseResumeGroupId = "consumer-pause-resume";

private static readonly Lazy<IServiceProvider> LazyProvider = new(SetupProvider);

public static IServiceProvider GetServiceProvider() => LazyProvider.Value;
Expand Down Expand Up @@ -83,6 +85,17 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
var kafkaBrokers = context.Configuration.GetValue<string>("Kafka:Brokers");
var schemaRegistryUrl = context.Configuration.GetValue<string>("SchemaRegistry:Url");

ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
LogConnectionClose = false,
ReconnectBackoffMs = 1000,
ReconnectBackoffMaxMs = 6000
};

services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
Expand All @@ -109,10 +122,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(AvroTopicName)
.WithGroupId("consumer-avro")
.WithGroupId(AvroGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentAvroSerializer>()
Expand All @@ -136,10 +150,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(ProtobufSchemaRegistryTopicName)
.WithGroupId("consumer-protobuf")
.WithGroupId(ProtobufGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentProtobufSerializer>()
Expand All @@ -163,10 +178,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(JsonSchemaRegistryTopicName)
.WithGroupId("consumer-json")
.WithGroupId(JsonGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentJsonSerializer>()
Expand Down
34 changes: 25 additions & 9 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,33 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();

this.consumerConfig ??= new ConsumerConfig();
this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers);
this.consumerConfig.GroupId ??= this.groupId;
this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;
// Fazer um clone do ConsumerConfig ???? Ou copiar tudo aqui?
//this.consumerConfig ??= new ConsumerConfig();
//this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers);
//this.consumerConfig.GroupId ??= this.groupId;
//this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
//this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
//this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;

this.consumerConfig.EnableAutoOffsetStore = false;
this.consumerConfig.EnableAutoCommit = false;
//this.consumerConfig.EnableAutoOffsetStore = false;
//this.consumerConfig.EnableAutoCommit = false;

this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration);
//this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration);

var consumerConfigCopy = new ConsumerConfig();

consumerConfigCopy.BootstrapServers = this.consumerConfig?.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers);
consumerConfigCopy.GroupId = this.consumerConfig?.GroupId ?? this.groupId;
consumerConfigCopy.AutoOffsetReset = this.consumerConfig?.AutoOffsetReset ?? this.autoOffsetReset;
consumerConfigCopy.MaxPollIntervalMs = this.consumerConfig?.MaxPollIntervalMs ?? this.maxPollIntervalMs;
consumerConfigCopy.StatisticsIntervalMs = this.consumerConfig?.StatisticsIntervalMs ?? this.statisticsInterval;

consumerConfigCopy.EnableAutoOffsetStore = false;
consumerConfigCopy.EnableAutoCommit = false;

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

this.consumerConfig = consumerConfigCopy;

return new ConsumerConfiguration(
this.consumerConfig,
Expand Down

0 comments on commit 995719a

Please sign in to comment.