Skip to content

Commit

Permalink
fix: create consumerconfig independently in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 5, 2023
1 parent 3561fe6 commit e09732f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 20 deletions.
17 changes: 17 additions & 0 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace KafkaFlow.IntegrationTests
using AutoFixture;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using KafkaFlow.Consumers;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
Expand Down Expand Up @@ -141,5 +142,21 @@ await Task.WhenAll(
await MessageStorage.AssertMessageAsync(message);
}
}

[TestMethod]
public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndependently()
{
// Act
var consumers = this.provider.GetRequiredService<IConsumerAccessor>().All;

// Assert
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.AvroGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.GzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.PauseResumeGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId)));
}
}
}
34 changes: 25 additions & 9 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ internal static class Bootstrapper
public const string PauseResumeTopicName = "test-pause-resume";
public const int MaxPollIntervalMs = 7000;

internal const string ProtobufGroupId = "consumer-protobuf";
internal const string GzipGroupId = "consumer-gzip";
internal const string JsonGzipGroupId = "consumer-json-gzip";
internal const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
internal const string PauseResumeGroupId = "consumer-pause-resume";
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";

private const string ProtobufTopicName = "test-protobuf";
private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr";
private const string JsonSchemaRegistryTopicName = "test-json-sr";
Expand All @@ -35,12 +43,6 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";

private const string ProtobufGroupId = "consumer-protobuf";
private const string GzipGroupId = "consumer-gzip";
private const string JsonGzipGroupId = "consumer-json-gzip";
private const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
private const string PauseResumeGroupId = "consumer-pause-resume";

private static readonly Lazy<IServiceProvider> LazyProvider = new(SetupProvider);

public static IServiceProvider GetServiceProvider() => LazyProvider.Value;
Expand Down Expand Up @@ -83,6 +85,17 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
var kafkaBrokers = context.Configuration.GetValue<string>("Kafka:Brokers");
var schemaRegistryUrl = context.Configuration.GetValue<string>("SchemaRegistry:Url");

ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
LogConnectionClose = false,
ReconnectBackoffMs = 1000,
ReconnectBackoffMaxMs = 6000
};

services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
Expand All @@ -109,10 +122,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(AvroTopicName)
.WithGroupId("consumer-avro")
.WithGroupId(AvroGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentAvroSerializer>()
Expand All @@ -136,10 +150,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(ProtobufSchemaRegistryTopicName)
.WithGroupId("consumer-protobuf")
.WithGroupId(ProtobufGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentProtobufSerializer>()
Expand All @@ -163,10 +178,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(JsonSchemaRegistryTopicName)
.WithGroupId("consumer-json")
.WithGroupId(JsonGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentJsonSerializer>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ public void Build_AllCalls_ReturnPassedValues()
Action<IDependencyResolver, List<TopicPartition>> partitionsAssignedHandler = (_, _) => { };
Action<IDependencyResolver, List<TopicPartitionOffset>> partitionsRevokedHandler = (_, _) => { };
const int statisticsIntervalMs = 100;
var consumerConfig = new ConsumerConfig();
var consumerConfig = new ConsumerConfig
{
ClientId = "testeclient"
};

this.target
.Topics(topic1)
Expand Down Expand Up @@ -128,7 +131,7 @@ public void Build_AllCalls_ReturnPassedValues()
configuration.StatisticsHandlers.Should().HaveElementAt(0, statisticsHandler);
configuration.PartitionsAssignedHandlers.Should().HaveElementAt(0, partitionsAssignedHandler);
configuration.PartitionsRevokedHandlers.Should().HaveElementAt(0, partitionsRevokedHandler);
configuration.GetKafkaConfig().Should().BeSameAs(consumerConfig);
configuration.GetKafkaConfig().ClientId.Should().Be(consumerConfig.ClientId);
configuration.MiddlewaresConfigurations.Should().HaveCount(1);
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,22 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();

this.consumerConfig ??= new ConsumerConfig();
this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers);
this.consumerConfig.GroupId ??= this.groupId;
this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;

this.consumerConfig.EnableAutoOffsetStore = false;
this.consumerConfig.EnableAutoCommit = false;
var consumerConfigCopy = new KafkaFlowConsumerConfig(this.consumerConfig);

this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration);
consumerConfigCopy.BootstrapServers = this.consumerConfig.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers);
consumerConfigCopy.GroupId = this.consumerConfig.GroupId ?? this.groupId;
consumerConfigCopy.AutoOffsetReset = this.consumerConfig.AutoOffsetReset ?? this.autoOffsetReset;
consumerConfigCopy.MaxPollIntervalMs = this.consumerConfig.MaxPollIntervalMs ?? this.maxPollIntervalMs;
consumerConfigCopy.StatisticsIntervalMs = this.consumerConfig.StatisticsIntervalMs ?? this.statisticsInterval;

consumerConfigCopy.EnableAutoOffsetStore = false;
consumerConfigCopy.EnableAutoCommit = false;

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

return new ConsumerConfiguration(
this.consumerConfig,
consumerConfigCopy,
this.topics,
this.topicsPartitions,
this.name,
Expand Down
14 changes: 14 additions & 0 deletions src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace KafkaFlow.Configuration
{
using System.Collections.Generic;
using Confluent.Kafka;

internal class KafkaFlowConsumerConfig : ConsumerConfig
{
public KafkaFlowConsumerConfig(ConsumerConfig consumerConfig)
: base(consumerConfig)
{
this.properties = new Dictionary<string, string>(this.properties);
}
}
}

0 comments on commit e09732f

Please sign in to comment.