From e56848dca66dbf5c3e62a57c08d2b9c93e408753 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Tue, 16 Jul 2024 10:23:47 -0700 Subject: [PATCH 1/6] #346 - Make kafka supplier public --- .../DefaultKafkaClientSupplier.cs | 82 ++++++++++++++++--- .../{Internal => }/KafkaLoggerAdapter.cs | 16 ++-- 2 files changed, 81 insertions(+), 17 deletions(-) rename core/Kafka/{Internal => }/DefaultKafkaClientSupplier.cs (65%) rename core/Kafka/{Internal => }/KafkaLoggerAdapter.cs (88%) diff --git a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs b/core/Kafka/DefaultKafkaClientSupplier.cs similarity index 65% rename from core/Kafka/Internal/DefaultKafkaClientSupplier.cs rename to core/Kafka/DefaultKafkaClientSupplier.cs index df80259a..1026a832 100644 --- a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs +++ b/core/Kafka/DefaultKafkaClientSupplier.cs @@ -1,54 +1,87 @@ using System; using Confluent.Kafka; using Newtonsoft.Json; +using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Librdkafka; +using Streamiz.Kafka.Net.Table; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class DefaultKafkaClientBuilder + /// + /// Default builder used to provide Kafka clients builder + /// + public class DefaultKafkaClientBuilder { + /// + /// Get the consumer builder + /// + /// + /// public virtual ConsumerBuilder GetConsumerBuilder(ConsumerConfig config) => new(config); + /// + /// Get the admin client builder + /// + /// + /// public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config) => new(config); + /// + /// Get the producer builder + /// + /// + /// public virtual ProducerBuilder GetProducerBuilder(ProducerConfig config) => new(config); } - internal class DefaultKafkaClientSupplier : IKafkaSupplier + /// + /// Default can be used to provide custom Kafka clients to a instance. + /// + public class DefaultKafkaClientSupplier : IKafkaSupplier { private readonly KafkaLoggerAdapter loggerAdapter; private readonly IStreamConfig streamConfig; private readonly bool exposeLibrdKafka; private readonly DefaultKafkaClientBuilder builderKafkaHandler; - public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter) - : this(loggerAdapter, null) - { } - + /// + /// + /// + /// + /// public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig) : this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder()) { } - internal DefaultKafkaClientSupplier( + /// + /// + /// + /// + /// + /// + /// + public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig, DefaultKafkaClientBuilder builderKafkaHandler) { - if (loggerAdapter == null) - throw new ArgumentNullException(nameof(loggerAdapter)); - - this.loggerAdapter = loggerAdapter; + this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter)); this.streamConfig = streamConfig; exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false; this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder(); } + /// + /// Create an admin kafka client which is used for internal topic management. + /// + /// Admin configuration can't be null + /// Return an admin client instance public IAdminClient GetAdmin(AdminClientConfig config) { AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config); @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config) return builder.Build(); } + /// + /// Build a kafka consumer with instance and listener. + /// + /// Consumer configuration can't be null + /// Rebalance listener (Nullable) + /// Return a kafka consumer built public IConsumer GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener) { ConsumerBuilder builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -88,6 +127,11 @@ public IConsumer GetConsumer(ConsumerConfig config, IConsumerReb return builder.Build(); } + /// + /// Build a kafka producer with instance. + /// + /// Producer configuration can't be null + /// Return a kafka producer built public IProducer GetProducer(ProducerConfig config) { ProducerBuilder builder = builderKafkaHandler.GetProducerBuilder(config); @@ -113,6 +157,11 @@ public IProducer GetProducer(ProducerConfig config) return builder.Build(); } + /// + /// Build a kafka restore consumer with instance for read record to restore statestore. + /// + /// Restore consumer configuration can't be null + /// Return a kafka restore consumer built public IConsumer GetRestoreConsumer(ConsumerConfig config) { ConsumerBuilder builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -121,6 +170,11 @@ public IConsumer GetRestoreConsumer(ConsumerConfig config) return builder.Build(); } + /// + /// Build a kafka global consumer with which is used to consume records for . + /// + /// Global consumer configuration can't be null + /// Return a kafka global consumer built public IConsumer GetGlobalConsumer(ConsumerConfig config) { config.AutoOffsetReset = AutoOffsetReset.Earliest; @@ -148,6 +202,10 @@ public IConsumer GetGlobalConsumer(ConsumerConfig config) return builder.Build(); } + /// + /// Get or set the metrics registry. + /// This registry will be capture all librdkafka statistics if is enable and forward these into the metrics reporter + /// public StreamMetricsRegistry MetricsRegistry { get; set; } } } \ No newline at end of file diff --git a/core/Kafka/Internal/KafkaLoggerAdapter.cs b/core/Kafka/KafkaLoggerAdapter.cs similarity index 88% rename from core/Kafka/Internal/KafkaLoggerAdapter.cs rename to core/Kafka/KafkaLoggerAdapter.cs index f2b9da9f..4a895ca9 100644 --- a/core/Kafka/Internal/KafkaLoggerAdapter.cs +++ b/core/Kafka/KafkaLoggerAdapter.cs @@ -1,16 +1,21 @@ using Confluent.Kafka; -using Streamiz.Kafka.Net.Crosscutting; using System; using System.Threading; using Microsoft.Extensions.Logging; -using Streamiz.Kafka.Net.Errors; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class KafkaLoggerAdapter + /// + /// Kafka log adapter to intercept librdkafka internal logs + /// + public class KafkaLoggerAdapter { private readonly ILogger log; + /// + /// + /// + /// public KafkaLoggerAdapter(IStreamConfig configuration) : this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter))) { @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message) private string GetName(IClient client) { - // FOR FIX string name = ""; try { + if (client.Handle == null || client.Handle.IsInvalid) + return "Unknown"; name = client.Name; } catch (NullReferenceException) From f6b1da11b5186009591d38562f9ea6e949e86554 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 24 Jul 2024 10:01:37 -0700 Subject: [PATCH 2/6] fix #338 --- core/Mock/Kafka/MockAdminClient.cs | 2 +- .../DefaultKafkaClientSupplierTests.cs | 19 ++++++++++++------- .../Private/KafkaLoggerAdapterTests.cs | 1 + 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/Mock/Kafka/MockAdminClient.cs b/core/Mock/Kafka/MockAdminClient.cs index 17c56e39..a62be9f4 100644 --- a/core/Mock/Kafka/MockAdminClient.cs +++ b/core/Mock/Kafka/MockAdminClient.cs @@ -18,7 +18,7 @@ public MockAdminClient(MockCluster cluster, string name) this.cluster = cluster; } - public override Handle Handle => throw new NotImplementedException(); + public override Handle Handle => null; public override string Name { get; } diff --git a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs index bbd21a20..f400b1c6 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs @@ -1,6 +1,7 @@ using NUnit.Framework; using Streamiz.Kafka.Net.Kafka.Internal; using System; +using Streamiz.Kafka.Net.Kafka; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Processors.Internal; @@ -8,8 +9,6 @@ namespace Streamiz.Kafka.Net.Tests.Private { public class DefaultKafkaClientSupplierTest { - private readonly StreamConfig config = GetConfig(); - private static StreamConfig GetConfig() { var config = new StreamConfig(); @@ -20,13 +19,14 @@ private static StreamConfig GetConfig() [Test] public void ShouldArgumentNullException() { - Assert.Throws(() => new DefaultKafkaClientSupplier(null)); + Assert.Throws(() => new DefaultKafkaClientSupplier(null, null)); } [Test] public void CreateAdminClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var adminClient = supplier.GetAdmin(config.ToAdminConfig("admin")); Assert.IsNotNull(adminClient); Assert.AreEqual("admin", adminClient.Name.Split("#")[0]); @@ -35,7 +35,8 @@ public void CreateAdminClient() [Test] public void CreateConsumerClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var consumer = supplier.GetConsumer(config.ToConsumerConfig("consume"), new StreamsRebalanceListener(null)); Assert.IsNotNull(consumer); Assert.AreEqual("consume", consumer.Name.Split("#")[0]); @@ -44,7 +45,8 @@ public void CreateConsumerClient() [Test] public void CreateRestoreClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var restore = supplier.GetRestoreConsumer(config.ToConsumerConfig("retore")); Assert.IsNotNull(restore); Assert.AreEqual("retore", restore.Name.Split("#")[0]); @@ -53,7 +55,8 @@ public void CreateRestoreClient() [Test] public void CreateProducerClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var produce = supplier.GetProducer(config.ToProducerConfig("produce")); Assert.IsNotNull(produce); Assert.AreEqual("produce", produce.Name.Split("#")[0]); @@ -62,6 +65,7 @@ public void CreateProducerClient() [Test] public void CreateConsumerWithStats() { + var config = GetConfig(); config.ExposeLibrdKafkaStats = true; config.ApplicationId = "test-app"; config.ClientId = "test-client"; @@ -78,6 +82,7 @@ public void CreateConsumerWithStats() [Test] public void CreateProducerWithStats() { + var config = GetConfig(); config.ExposeLibrdKafkaStats = true; config.ApplicationId = "test-app"; config.ClientId = "test-client"; diff --git a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs index f0f92a67..9e412b9c 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NUnit.Framework; +using Streamiz.Kafka.Net.Kafka; using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Mock.Kafka; From df8b181cfa280e6875dc68a2e233f1197f770130 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 24 Jul 2024 14:36:23 -0700 Subject: [PATCH 3/6] Fix Log Adapter unit tests --- .../ReflectionHelperExtensionMethods.cs | 49 +++++++++++ .../DefaultKafkaClientSupplierTests.cs | 32 +++++++- .../Private/KafkaLoggerAdapterTests.cs | 81 ++++++++++++++++++- 3 files changed, 159 insertions(+), 3 deletions(-) create mode 100644 test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs diff --git a/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs b/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs new file mode 100644 index 00000000..ddc85691 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs @@ -0,0 +1,49 @@ +using System; +using System.Linq; +using System.Reflection; + +namespace Streamiz.Kafka.Net.Tests.Helpers; + +public static class ReflectionHelperExtensionMethods +{ + public static void SetPrivatePropertyValue(this T member, string propName, V newValue) + { + PropertyInfo[] propertiesInfo = + typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + var propertyInfo = propertiesInfo.FirstOrDefault(p => p.Name.Equals(propName)); + + if (propertyInfo == null) return; + propertyInfo.SetValue(member, newValue); + } + + public static void SetPrivateFieldsValue(this object member, Type type, string propName, V newValue) + { + FieldInfo[] fieldsInfo = + type.GetFields(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + var fieldInfo = fieldsInfo.FirstOrDefault(p => p.Name.Equals(propName)); + + if (fieldInfo == null) return; + fieldInfo.SetValue(member, newValue); + } + + public static object GetInstance(string strFullyQualifiedName, ref Type outputType) + { + Type type = Type.GetType(strFullyQualifiedName); + if (type != null) + { + outputType = type; + return Activator.CreateInstance(type); + } + + foreach (var asm in AppDomain.CurrentDomain.GetAssemblies()) + { + type = asm.GetType(strFullyQualifiedName); + if (type != null) + { + outputType = type; + return Activator.CreateInstance(type); + } + } + return null; + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs index f400b1c6..b468a582 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs @@ -47,9 +47,9 @@ public void CreateRestoreClient() { var config = GetConfig(); var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); - var restore = supplier.GetRestoreConsumer(config.ToConsumerConfig("retore")); + var restore = supplier.GetRestoreConsumer(config.ToConsumerConfig("restore")); Assert.IsNotNull(restore); - Assert.AreEqual("retore", restore.Name.Split("#")[0]); + Assert.AreEqual("restore", restore.Name.Split("#")[0]); } [Test] @@ -61,6 +61,16 @@ public void CreateProducerClient() Assert.IsNotNull(produce); Assert.AreEqual("produce", produce.Name.Split("#")[0]); } + + [Test] + public void CreateGlobalConsumerClient() + { + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); + var globalConsumer = supplier.GetGlobalConsumer(config.ToGlobalConsumerConfig("global-consumer")); + Assert.IsNotNull(globalConsumer); + Assert.AreEqual("global-consumer", globalConsumer.Name.Split("#")[0]); + } [Test] public void CreateConsumerWithStats() @@ -95,5 +105,23 @@ public void CreateProducerWithStats() var producer = supplier.GetProducer(wrapper); Assert.IsNotNull(producer); } + + [Test] + public void CreateGlobalConsumerWithStats() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToGlobalConsumerConfig("global-consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "global-thread-1"); + + var globalConsumer = supplier.GetGlobalConsumer(wrapper); + Assert.IsNotNull(globalConsumer); + } } } \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs index 9e412b9c..ade7c40c 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs @@ -1,12 +1,16 @@ using System; using System.Collections.Generic; using System.IO; +using System.Reflection; +using Confluent.Kafka; +using Moq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NUnit.Framework; +using Streamiz.Kafka.Net.Crosscutting; using Streamiz.Kafka.Net.Kafka; -using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Mock.Kafka; +using Streamiz.Kafka.Net.Tests.Helpers; namespace Streamiz.Kafka.Net.Tests.Private { @@ -190,5 +194,80 @@ public void TestAdapterLogAdmin() Assert.AreEqual(1, logger.Logs.Count); logger.Logs.Clear(); } + + [Test] + public void TestAdapterGetNameNPE() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock>(); + client + .Setup(c => c.Name) + .Throws(); + + int v = 1233; + Type handleType = typeof(object); + object handle = ReflectionHelperExtensionMethods.GetInstance( + "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); + handle.SetPrivateFieldsValue(handleType, "handle", new IntPtr(v)); + + Handle h = new Handle(); + h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); + + client.Setup(c => c.Handle) + .Returns(() => h); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + } + + [Test] + public void TestAdapterNullHandle() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock>(); + client + .Setup(c => c.Name) + .Throws(); + + client.Setup(c => c.Handle) + .Returns(() => new Handle()); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + } + + [Test] + public void TestAdapterHandleInvalid() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock>(); + client + .Setup(c => c.Name) + .Throws(); + + Type handleType = typeof(object); + object handle = ReflectionHelperExtensionMethods.GetInstance( + "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); + Handle h = new Handle(); + h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); + + client.Setup(c => c.Handle) + .Returns(() => h); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + } } } \ No newline at end of file From 4105157aecdb812cd6d71d3ecee87903ba6e4d19 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 24 Jul 2024 15:48:26 -0700 Subject: [PATCH 4/6] fix unmanaged npe exception --- .../Private/KafkaLoggerAdapterTests.cs | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs index ade7c40c..09cffb6d 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs @@ -195,33 +195,36 @@ public void TestAdapterLogAdmin() logger.Logs.Clear(); } - [Test] - public void TestAdapterGetNameNPE() + [Test] + public void TestAdapterGetNameNPE() { - var config = new StreamConfig(); - config.ApplicationId = "test-logger-adapter"; - var logger = new InMemoryLogger(); - var adapter = new KafkaLoggerAdapter(config, logger); - - var client = new Mock>(); - client - .Setup(c => c.Name) - .Throws(); + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); - int v = 1233; - Type handleType = typeof(object); - object handle = ReflectionHelperExtensionMethods.GetInstance( - "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); - handle.SetPrivateFieldsValue(handleType, "handle", new IntPtr(v)); - - Handle h = new Handle(); - h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); - - client.Setup(c => c.Handle) - .Returns(() => h); - - adapter.LogConsume(client.Object, - new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + var client = new Mock>(); + client + .Setup(c => c.Name) + .Throws(); + + int v = 1233; + Type handleType = typeof(object); + object handle = ReflectionHelperExtensionMethods.GetInstance( + "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); + handle.SetPrivateFieldsValue(handleType, "handle", new IntPtr(v)); + + Handle h = new Handle(); + h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); + + client.Setup(c => c.Handle) + .Returns(() => h); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + + handle.SetPrivateFieldsValue(handleType, "handle", IntPtr.Zero); + handle = null; } [Test] @@ -268,6 +271,8 @@ public void TestAdapterHandleInvalid() adapter.LogConsume(client.Object, new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + + handle = null; } } } \ No newline at end of file From b74b18b83709e91772bddc67e60577b5a40be5ef Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 24 Jul 2024 16:45:11 -0700 Subject: [PATCH 5/6] fix sonar lint --- .../Private/KafkaLoggerAdapterTests.cs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs index 09cffb6d..6ea0247d 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Reflection; +using System.Threading; using Confluent.Kafka; using Moq; using Microsoft.Extensions.Logging; @@ -129,6 +130,34 @@ private static string GetLogLevelString(LogLevel logLevel) public List Logs { get; } = new List(); } + [Test] + public void LogAndErrorWithThreadName() + { + Thread.CurrentThread.Name = "test-adapter"; + + var mockConsumer = new MockConsumer(null, "group", "CONSUMER"); + var mockProducer = new MockProducer(null, "PRODUCER"); + var mockAdmin = new MockAdminClient(null, "ADMIN"); + + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + adapter.LogConsume(mockConsumer, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorConsume(mockConsumer, new Error(ErrorCode.ConcurrentTransactions)); + + adapter.LogProduce(mockProducer, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorProduce(mockProducer, new Error(ErrorCode.RecordListTooLarge)); + + adapter.LogAdmin(mockAdmin, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorAdmin(mockAdmin, + new Confluent.Kafka.Error(Confluent.Kafka.ErrorCode.ClusterAuthorizationFailed)); + } + [Test] public void TestAdapterLogProducer() { From 3986d9c7f9b054d9139fa6101dcfcb78bcbbe079 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Tue, 30 Jul 2024 14:45:03 -0700 Subject: [PATCH 6/6] statistic handler unit test --- .../DefaultKafkaClientSupplierTests.cs | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs index b468a582..e58297b2 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs @@ -1,6 +1,10 @@ using NUnit.Framework; using Streamiz.Kafka.Net.Kafka.Internal; using System; +using System.Linq; +using System.Threading; +using Moq; +using Confluent.Kafka; using Streamiz.Kafka.Net.Kafka; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Processors.Internal; @@ -123,5 +127,109 @@ public void CreateGlobalConsumerWithStats() var globalConsumer = supplier.GetGlobalConsumer(wrapper); Assert.IsNotNull(globalConsumer); } + + [Test] + public void ProducerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var producerConfig = config.ToProducerConfig("producer"); + StreamizProducerConfig wrapper = new StreamizProducerConfig(producerConfig, "thread-1", new TaskId(){Id = 0, Partition = 0}); + var producer = supplier.GetProducer(wrapper); + Assert.IsNotNull(producer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.producer.sensor.messages-produced-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } + + [Test] + public void ConsumerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToConsumerConfig("consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "thread-1"); + + var consumer = supplier.GetConsumer(wrapper, new StreamsRebalanceListener(null)); + Assert.IsNotNull(consumer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.consume.sensor.messages-consumed-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } + + [Test] + public void GlobalConsumerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToGlobalConsumerConfig("consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "thread-1"); + + var consumer = supplier.GetGlobalConsumer(wrapper); + Assert.IsNotNull(consumer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.consume.sensor.messages-consumed-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } } } \ No newline at end of file