From 19b50f074998c0768d8395ab234c0e3f740349b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Tue, 5 Sep 2023 11:06:54 +0100 Subject: [PATCH] fix: Create ConsumerConfig independently in builder --- .../ConsumerConfigurationBuilder.cs | 19 +++---------------- .../Configuration/KafkaFlowConsumerConfig.cs | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 16 deletions(-) create mode 100644 src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 02cb7a2e7..66f41d4ce 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -233,20 +233,9 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) { var middlewareConfiguration = this.middlewareConfigurationBuilder.Build(); - // Fazer um clone do ConsumerConfig ???? Ou copiar tudo aqui? - //this.consumerConfig ??= new ConsumerConfig(); - //this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers); - //this.consumerConfig.GroupId ??= this.groupId; - //this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset; - //this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs; - //this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval; + this.consumerConfig ??= new ConsumerConfig(); - //this.consumerConfig.EnableAutoOffsetStore = false; - //this.consumerConfig.EnableAutoCommit = false; - - //this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration); - - var consumerConfigCopy = new ConsumerConfig(); + var consumerConfigCopy = new KafkaFlowConsumerConfig(this.consumerConfig); consumerConfigCopy.BootstrapServers = this.consumerConfig?.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers); consumerConfigCopy.GroupId = this.consumerConfig?.GroupId ?? this.groupId; @@ -259,10 +248,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); - this.consumerConfig = consumerConfigCopy; - return new ConsumerConfiguration( - this.consumerConfig, + consumerConfigCopy, this.topics, this.topicsPartitions, this.name, diff --git a/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs b/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs new file mode 100644 index 000000000..6bb05c88a --- /dev/null +++ b/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Confluent.Kafka; + +namespace KafkaFlow.Configuration +{ + internal class KafkaFlowConsumerConfig : ConsumerConfig + { + public KafkaFlowConsumerConfig(ConsumerConfig consumerConfig) + :base(consumerConfig) + { + this.properties = new Dictionary(this.properties); + } + } +}