From fb3b005f369d6bfe02e3142aef6b78a0f7e21e2c Mon Sep 17 00:00:00 2001 From: Robert Coltheart Date: Thu, 7 Nov 2024 06:30:16 +1100 Subject: [PATCH] feat: Create topic with additional configuration if present --- .../Configuration/IClusterConfigurationBuilder.cs | 4 +++- src/KafkaFlow/Clusters/ClusterManager.cs | 1 + .../Configuration/ClusterConfigurationBuilder.cs | 5 +++-- src/KafkaFlow/Configuration/TopicConfiguration.cs | 11 ++++++++++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 2c6968baa..78b92c816 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -77,9 +77,11 @@ public interface IClusterConfigurationBuilder /// The topic name /// The number of Topic partitions. Default is to use the cluster-defined partitions. /// The Topic replication factor. Default is to use the cluster-defined replication factor. + /// Additional topic creation configuration values. /// IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions = -1, - short replicationFactor = -1); + short replicationFactor = -1, + Dictionary configs = null); } diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 814088c50..73ba26ba2 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -122,6 +122,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable configu Name = topicConfiguration.Name, ReplicationFactor = topicConfiguration.Replicas, NumPartitions = topicConfiguration.Partitions, + Configs = topicConfiguration.Configs, }) .ToArray(); diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index 4867262cf..135a5b1fd 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -111,9 +111,10 @@ public IClusterConfigurationBuilder OnStarted(Action handle public IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions = -1, - short replicationFactor = -1) + short replicationFactor = -1, + Dictionary configs = null) { - _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor)); + _topicsToCreateIfNotExist.Add(new TopicConfiguration(topicName, numberOfPartitions, replicationFactor, configs)); return this; } } diff --git a/src/KafkaFlow/Configuration/TopicConfiguration.cs b/src/KafkaFlow/Configuration/TopicConfiguration.cs index f2eac31bb..e22d4a3f0 100644 --- a/src/KafkaFlow/Configuration/TopicConfiguration.cs +++ b/src/KafkaFlow/Configuration/TopicConfiguration.cs @@ -1,3 +1,5 @@ +using System.Collections.Generic; + namespace KafkaFlow.Configuration; /// @@ -11,11 +13,13 @@ public class TopicConfiguration /// The topic name /// The number of partitions for the topic /// Replication factor for the topic - public TopicConfiguration(string name, int partitions, short replicas) + /// Additional topic creation configuration values. + public TopicConfiguration(string name, int partitions, short replicas, Dictionary configs) { this.Name = name; this.Partitions = partitions; this.Replicas = replicas; + this.Configs = configs; } /// @@ -32,4 +36,9 @@ public TopicConfiguration(string name, int partitions, short replicas) /// Gets the Topic Replication Factor /// public short Replicas { get; } + + /// + /// Gets the topic creation configuration + /// + public Dictionary Configs { get; } }