Skip to content

Commit

Permalink
fix: Create ConsumerConfig independently in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 5, 2023
1 parent 995719a commit 19b50f0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
19 changes: 3 additions & 16 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,9 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();

// Fazer um clone do ConsumerConfig ???? Ou copiar tudo aqui?
//this.consumerConfig ??= new ConsumerConfig();
//this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers);
//this.consumerConfig.GroupId ??= this.groupId;
//this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
//this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
//this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;
this.consumerConfig ??= new ConsumerConfig();

//this.consumerConfig.EnableAutoOffsetStore = false;
//this.consumerConfig.EnableAutoCommit = false;

//this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration);

var consumerConfigCopy = new ConsumerConfig();
var consumerConfigCopy = new KafkaFlowConsumerConfig(this.consumerConfig);

consumerConfigCopy.BootstrapServers = this.consumerConfig?.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers);
consumerConfigCopy.GroupId = this.consumerConfig?.GroupId ?? this.groupId;
Expand All @@ -259,10 +248,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

this.consumerConfig = consumerConfigCopy;

return new ConsumerConfiguration(
this.consumerConfig,
consumerConfigCopy,
this.topics,
this.topicsPartitions,
this.name,
Expand Down
16 changes: 16 additions & 0 deletions src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

Check warning on line 1 in src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration
using System.Collections.Generic;

Check warning on line 2 in src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration
using System.Text;
using Confluent.Kafka;

namespace KafkaFlow.Configuration
{
internal class KafkaFlowConsumerConfig : ConsumerConfig
{
public KafkaFlowConsumerConfig(ConsumerConfig consumerConfig)
:base(consumerConfig)
{
this.properties = new Dictionary<string, string>(this.properties);
}
}
}

0 comments on commit 19b50f0

Please sign in to comment.