From e56848dca66dbf5c3e62a57c08d2b9c93e408753 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Tue, 16 Jul 2024 10:23:47 -0700 Subject: [PATCH] #346 - Make kafka supplier public --- .../DefaultKafkaClientSupplier.cs | 82 ++++++++++++++++--- .../{Internal => }/KafkaLoggerAdapter.cs | 16 ++-- 2 files changed, 81 insertions(+), 17 deletions(-) rename core/Kafka/{Internal => }/DefaultKafkaClientSupplier.cs (65%) rename core/Kafka/{Internal => }/KafkaLoggerAdapter.cs (88%) diff --git a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs b/core/Kafka/DefaultKafkaClientSupplier.cs similarity index 65% rename from core/Kafka/Internal/DefaultKafkaClientSupplier.cs rename to core/Kafka/DefaultKafkaClientSupplier.cs index df80259a..1026a832 100644 --- a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs +++ b/core/Kafka/DefaultKafkaClientSupplier.cs @@ -1,54 +1,87 @@ using System; using Confluent.Kafka; using Newtonsoft.Json; +using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Librdkafka; +using Streamiz.Kafka.Net.Table; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class DefaultKafkaClientBuilder + /// + /// Default builder used to provide Kafka clients builder + /// + public class DefaultKafkaClientBuilder { + /// + /// Get the consumer builder + /// + /// + /// public virtual ConsumerBuilder GetConsumerBuilder(ConsumerConfig config) => new(config); + /// + /// Get the admin client builder + /// + /// + /// public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config) => new(config); + /// + /// Get the producer builder + /// + /// + /// public virtual ProducerBuilder GetProducerBuilder(ProducerConfig config) => new(config); } - internal class DefaultKafkaClientSupplier : IKafkaSupplier + /// + /// Default can be used to provide custom Kafka clients to a instance. + /// + public class DefaultKafkaClientSupplier : IKafkaSupplier { private readonly KafkaLoggerAdapter loggerAdapter; private readonly IStreamConfig streamConfig; private readonly bool exposeLibrdKafka; private readonly DefaultKafkaClientBuilder builderKafkaHandler; - public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter) - : this(loggerAdapter, null) - { } - + /// + /// + /// + /// + /// public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig) : this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder()) { } - internal DefaultKafkaClientSupplier( + /// + /// + /// + /// + /// + /// + /// + public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig, DefaultKafkaClientBuilder builderKafkaHandler) { - if (loggerAdapter == null) - throw new ArgumentNullException(nameof(loggerAdapter)); - - this.loggerAdapter = loggerAdapter; + this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter)); this.streamConfig = streamConfig; exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false; this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder(); } + /// + /// Create an admin kafka client which is used for internal topic management. + /// + /// Admin configuration can't be null + /// Return an admin client instance public IAdminClient GetAdmin(AdminClientConfig config) { AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config); @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config) return builder.Build(); } + /// + /// Build a kafka consumer with instance and listener. + /// + /// Consumer configuration can't be null + /// Rebalance listener (Nullable) + /// Return a kafka consumer built public IConsumer GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener) { ConsumerBuilder builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -88,6 +127,11 @@ public IConsumer GetConsumer(ConsumerConfig config, IConsumerReb return builder.Build(); } + /// + /// Build a kafka producer with instance. + /// + /// Producer configuration can't be null + /// Return a kafka producer built public IProducer GetProducer(ProducerConfig config) { ProducerBuilder builder = builderKafkaHandler.GetProducerBuilder(config); @@ -113,6 +157,11 @@ public IProducer GetProducer(ProducerConfig config) return builder.Build(); } + /// + /// Build a kafka restore consumer with instance for read record to restore statestore. + /// + /// Restore consumer configuration can't be null + /// Return a kafka restore consumer built public IConsumer GetRestoreConsumer(ConsumerConfig config) { ConsumerBuilder builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -121,6 +170,11 @@ public IConsumer GetRestoreConsumer(ConsumerConfig config) return builder.Build(); } + /// + /// Build a kafka global consumer with which is used to consume records for . + /// + /// Global consumer configuration can't be null + /// Return a kafka global consumer built public IConsumer GetGlobalConsumer(ConsumerConfig config) { config.AutoOffsetReset = AutoOffsetReset.Earliest; @@ -148,6 +202,10 @@ public IConsumer GetGlobalConsumer(ConsumerConfig config) return builder.Build(); } + /// + /// Get or set the metrics registry. + /// This registry will be capture all librdkafka statistics if is enable and forward these into the metrics reporter + /// public StreamMetricsRegistry MetricsRegistry { get; set; } } } \ No newline at end of file diff --git a/core/Kafka/Internal/KafkaLoggerAdapter.cs b/core/Kafka/KafkaLoggerAdapter.cs similarity index 88% rename from core/Kafka/Internal/KafkaLoggerAdapter.cs rename to core/Kafka/KafkaLoggerAdapter.cs index f2b9da9f..4a895ca9 100644 --- a/core/Kafka/Internal/KafkaLoggerAdapter.cs +++ b/core/Kafka/KafkaLoggerAdapter.cs @@ -1,16 +1,21 @@ using Confluent.Kafka; -using Streamiz.Kafka.Net.Crosscutting; using System; using System.Threading; using Microsoft.Extensions.Logging; -using Streamiz.Kafka.Net.Errors; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class KafkaLoggerAdapter + /// + /// Kafka log adapter to intercept librdkafka internal logs + /// + public class KafkaLoggerAdapter { private readonly ILogger log; + /// + /// + /// + /// public KafkaLoggerAdapter(IStreamConfig configuration) : this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter))) { @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message) private string GetName(IClient client) { - // FOR FIX string name = ""; try { + if (client.Handle == null || client.Handle.IsInvalid) + return "Unknown"; name = client.Name; } catch (NullReferenceException)