diff --git a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs b/core/Kafka/DefaultKafkaClientSupplier.cs similarity index 65% rename from core/Kafka/Internal/DefaultKafkaClientSupplier.cs rename to core/Kafka/DefaultKafkaClientSupplier.cs index df80259a..1026a832 100644 --- a/core/Kafka/Internal/DefaultKafkaClientSupplier.cs +++ b/core/Kafka/DefaultKafkaClientSupplier.cs @@ -1,54 +1,87 @@ using System; using Confluent.Kafka; using Newtonsoft.Json; +using Streamiz.Kafka.Net.Kafka.Internal; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Metrics.Librdkafka; +using Streamiz.Kafka.Net.Table; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class DefaultKafkaClientBuilder + /// <summary> + /// Default builder used to provide Kafka clients builder + /// </summary> + public class DefaultKafkaClientBuilder { + /// <summary> + /// Get the consumer builder + /// </summary> + /// <param name="config"></param> + /// <returns></returns> public virtual ConsumerBuilder<byte[], byte[]> GetConsumerBuilder(ConsumerConfig config) => new(config); + /// <summary> + /// Get the admin client builder + /// </summary> + /// <param name="config"></param> + /// <returns></returns> public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config) => new(config); + /// <summary> + /// Get the producer builder + /// </summary> + /// <param name="config"></param> + /// <returns></returns> public virtual ProducerBuilder<byte[], byte[]> GetProducerBuilder(ProducerConfig config) => new(config); } - internal class DefaultKafkaClientSupplier : IKafkaSupplier + /// <summary> + /// Default <see cref="IKafkaSupplier"/> can be used to provide custom Kafka clients to a <see cref="KafkaStream"/> instance. + /// </summary> + public class DefaultKafkaClientSupplier : IKafkaSupplier { private readonly KafkaLoggerAdapter loggerAdapter; private readonly IStreamConfig streamConfig; private readonly bool exposeLibrdKafka; private readonly DefaultKafkaClientBuilder builderKafkaHandler; - public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter) - : this(loggerAdapter, null) - { } - + /// <summary> + /// + /// </summary> + /// <param name="loggerAdapter"></param> + /// <param name="streamConfig"></param> public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig) : this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder()) { } - internal DefaultKafkaClientSupplier( + /// <summary> + /// + /// </summary> + /// <param name="loggerAdapter"></param> + /// <param name="streamConfig"></param> + /// <param name="builderKafkaHandler"></param> + /// <exception cref="ArgumentNullException"></exception> + public DefaultKafkaClientSupplier( KafkaLoggerAdapter loggerAdapter, IStreamConfig streamConfig, DefaultKafkaClientBuilder builderKafkaHandler) { - if (loggerAdapter == null) - throw new ArgumentNullException(nameof(loggerAdapter)); - - this.loggerAdapter = loggerAdapter; + this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter)); this.streamConfig = streamConfig; exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false; this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder(); } + /// <summary> + /// Create an admin kafka client which is used for internal topic management. + /// </summary> + /// <param name="config">Admin configuration can't be null</param> + /// <returns>Return an admin client instance</returns> public IAdminClient GetAdmin(AdminClientConfig config) { AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config); @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config) return builder.Build(); } + /// <summary> + /// Build a kafka consumer with <see cref="ConsumerConfig"/> instance and <see cref="IConsumerRebalanceListener"/> listener. + /// </summary> + /// <param name="config">Consumer configuration can't be null</param> + /// <param name="rebalanceListener">Rebalance listener (Nullable)</param> + /// <returns>Return a kafka consumer built</returns> public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener) { ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -88,6 +127,11 @@ public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerReb return builder.Build(); } + /// <summary> + /// Build a kafka producer with <see cref="ProducerConfig"/> instance. + /// </summary> + /// <param name="config">Producer configuration can't be null</param> + /// <returns>Return a kafka producer built</returns> public IProducer<byte[], byte[]> GetProducer(ProducerConfig config) { ProducerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetProducerBuilder(config); @@ -113,6 +157,11 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config) return builder.Build(); } + /// <summary> + /// Build a kafka restore consumer with <see cref="ConsumerConfig"/> instance for read record to restore statestore. + /// </summary> + /// <param name="config">Restore consumer configuration can't be null</param> + /// <returns>Return a kafka restore consumer built</returns> public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config) { ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config); @@ -121,6 +170,11 @@ public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config) return builder.Build(); } + /// <summary> + /// Build a kafka global consumer with <see cref="ConsumerConfig"/> which is used to consume records for <see cref="IGlobalKTable{K,V}"/>. + /// </summary> + /// <param name="config">Global consumer configuration can't be null</param> + /// <returns>Return a kafka global consumer built</returns> public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config) { config.AutoOffsetReset = AutoOffsetReset.Earliest; @@ -148,6 +202,10 @@ public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config) return builder.Build(); } + /// <summary> + /// Get or set the metrics registry. + /// This registry will be capture all librdkafka statistics if <see cref="IStreamConfig.ExposeLibrdKafkaStats"/> is enable and forward these into the metrics reporter + /// </summary> public StreamMetricsRegistry MetricsRegistry { get; set; } } } \ No newline at end of file diff --git a/core/Kafka/Internal/KafkaLoggerAdapter.cs b/core/Kafka/KafkaLoggerAdapter.cs similarity index 88% rename from core/Kafka/Internal/KafkaLoggerAdapter.cs rename to core/Kafka/KafkaLoggerAdapter.cs index f2b9da9f..4a895ca9 100644 --- a/core/Kafka/Internal/KafkaLoggerAdapter.cs +++ b/core/Kafka/KafkaLoggerAdapter.cs @@ -1,16 +1,21 @@ using Confluent.Kafka; -using Streamiz.Kafka.Net.Crosscutting; using System; using System.Threading; using Microsoft.Extensions.Logging; -using Streamiz.Kafka.Net.Errors; -namespace Streamiz.Kafka.Net.Kafka.Internal +namespace Streamiz.Kafka.Net.Kafka { - internal class KafkaLoggerAdapter + /// <summary> + /// Kafka log adapter to intercept librdkafka internal logs + /// </summary> + public class KafkaLoggerAdapter { private readonly ILogger log; + /// <summary> + /// + /// </summary> + /// <param name="configuration"></param> public KafkaLoggerAdapter(IStreamConfig configuration) : this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter))) { @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message) private string GetName(IClient client) { - // FOR FIX string name = ""; try { + if (client.Handle == null || client.Handle.IsInvalid) + return "Unknown"; name = client.Name; } catch (NullReferenceException) diff --git a/core/Mock/Kafka/MockAdminClient.cs b/core/Mock/Kafka/MockAdminClient.cs index 17c56e39..a62be9f4 100644 --- a/core/Mock/Kafka/MockAdminClient.cs +++ b/core/Mock/Kafka/MockAdminClient.cs @@ -18,7 +18,7 @@ public MockAdminClient(MockCluster cluster, string name) this.cluster = cluster; } - public override Handle Handle => throw new NotImplementedException(); + public override Handle Handle => null; public override string Name { get; } diff --git a/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs b/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs new file mode 100644 index 00000000..ddc85691 --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Helpers/ReflectionHelperExtensionMethods.cs @@ -0,0 +1,49 @@ +using System; +using System.Linq; +using System.Reflection; + +namespace Streamiz.Kafka.Net.Tests.Helpers; + +public static class ReflectionHelperExtensionMethods +{ + public static void SetPrivatePropertyValue<T, V>(this T member, string propName, V newValue) + { + PropertyInfo[] propertiesInfo = + typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + var propertyInfo = propertiesInfo.FirstOrDefault(p => p.Name.Equals(propName)); + + if (propertyInfo == null) return; + propertyInfo.SetValue(member, newValue); + } + + public static void SetPrivateFieldsValue<V>(this object member, Type type, string propName, V newValue) + { + FieldInfo[] fieldsInfo = + type.GetFields(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + var fieldInfo = fieldsInfo.FirstOrDefault(p => p.Name.Equals(propName)); + + if (fieldInfo == null) return; + fieldInfo.SetValue(member, newValue); + } + + public static object GetInstance(string strFullyQualifiedName, ref Type outputType) + { + Type type = Type.GetType(strFullyQualifiedName); + if (type != null) + { + outputType = type; + return Activator.CreateInstance(type); + } + + foreach (var asm in AppDomain.CurrentDomain.GetAssemblies()) + { + type = asm.GetType(strFullyQualifiedName); + if (type != null) + { + outputType = type; + return Activator.CreateInstance(type); + } + } + return null; + } +} \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs index bbd21a20..e58297b2 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/DefaultKafkaClientSupplierTests.cs @@ -1,6 +1,11 @@ using NUnit.Framework; using Streamiz.Kafka.Net.Kafka.Internal; using System; +using System.Linq; +using System.Threading; +using Moq; +using Confluent.Kafka; +using Streamiz.Kafka.Net.Kafka; using Streamiz.Kafka.Net.Metrics; using Streamiz.Kafka.Net.Processors.Internal; @@ -8,8 +13,6 @@ namespace Streamiz.Kafka.Net.Tests.Private { public class DefaultKafkaClientSupplierTest { - private readonly StreamConfig config = GetConfig(); - private static StreamConfig GetConfig() { var config = new StreamConfig(); @@ -20,13 +23,14 @@ private static StreamConfig GetConfig() [Test] public void ShouldArgumentNullException() { - Assert.Throws<ArgumentNullException>(() => new DefaultKafkaClientSupplier(null)); + Assert.Throws<ArgumentNullException>(() => new DefaultKafkaClientSupplier(null, null)); } [Test] public void CreateAdminClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var adminClient = supplier.GetAdmin(config.ToAdminConfig("admin")); Assert.IsNotNull(adminClient); Assert.AreEqual("admin", adminClient.Name.Split("#")[0]); @@ -35,7 +39,8 @@ public void CreateAdminClient() [Test] public void CreateConsumerClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var consumer = supplier.GetConsumer(config.ToConsumerConfig("consume"), new StreamsRebalanceListener(null)); Assert.IsNotNull(consumer); Assert.AreEqual("consume", consumer.Name.Split("#")[0]); @@ -44,24 +49,37 @@ public void CreateConsumerClient() [Test] public void CreateRestoreClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); - var restore = supplier.GetRestoreConsumer(config.ToConsumerConfig("retore")); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); + var restore = supplier.GetRestoreConsumer(config.ToConsumerConfig("restore")); Assert.IsNotNull(restore); - Assert.AreEqual("retore", restore.Name.Split("#")[0]); + Assert.AreEqual("restore", restore.Name.Split("#")[0]); } [Test] public void CreateProducerClient() { - var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config)); + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); var produce = supplier.GetProducer(config.ToProducerConfig("produce")); Assert.IsNotNull(produce); Assert.AreEqual("produce", produce.Name.Split("#")[0]); } + + [Test] + public void CreateGlobalConsumerClient() + { + var config = GetConfig(); + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); + var globalConsumer = supplier.GetGlobalConsumer(config.ToGlobalConsumerConfig("global-consumer")); + Assert.IsNotNull(globalConsumer); + Assert.AreEqual("global-consumer", globalConsumer.Name.Split("#")[0]); + } [Test] public void CreateConsumerWithStats() { + var config = GetConfig(); config.ExposeLibrdKafkaStats = true; config.ApplicationId = "test-app"; config.ClientId = "test-client"; @@ -78,6 +96,7 @@ public void CreateConsumerWithStats() [Test] public void CreateProducerWithStats() { + var config = GetConfig(); config.ExposeLibrdKafkaStats = true; config.ApplicationId = "test-app"; config.ClientId = "test-client"; @@ -90,5 +109,127 @@ public void CreateProducerWithStats() var producer = supplier.GetProducer(wrapper); Assert.IsNotNull(producer); } + + [Test] + public void CreateGlobalConsumerWithStats() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + + var supplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(config), config); + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToGlobalConsumerConfig("global-consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "global-thread-1"); + + var globalConsumer = supplier.GetGlobalConsumer(wrapper); + Assert.IsNotNull(globalConsumer); + } + + [Test] + public void ProducerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var producerConfig = config.ToProducerConfig("producer"); + StreamizProducerConfig wrapper = new StreamizProducerConfig(producerConfig, "thread-1", new TaskId(){Id = 0, Partition = 0}); + var producer = supplier.GetProducer(wrapper); + Assert.IsNotNull(producer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.producer.sensor.messages-produced-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } + + [Test] + public void ConsumerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToConsumerConfig("consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "thread-1"); + + var consumer = supplier.GetConsumer(wrapper, new StreamsRebalanceListener(null)); + Assert.IsNotNull(consumer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.consume.sensor.messages-consumed-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } + + [Test] + public void GlobalConsumerStatisticHandler() + { + var config = GetConfig(); + config.ExposeLibrdKafkaStats = true; + config.ApplicationId = "test-app"; + config.ClientId = "test-client"; + config.StatisticsIntervalMs = 10; + + var kafkaClientBuilder = new DefaultKafkaClientBuilder(); + + var supplier = new DefaultKafkaClientSupplier( + new KafkaLoggerAdapter(config), + config, + kafkaClientBuilder); + + supplier.MetricsRegistry = new StreamMetricsRegistry(); + + var consumerConfig = config.ToGlobalConsumerConfig("consume"); + StreamizConsumerConfig wrapper = new StreamizConsumerConfig(consumerConfig, "thread-1"); + + var consumer = supplier.GetGlobalConsumer(wrapper); + Assert.IsNotNull(consumer); + + Thread.Sleep(150); + + var sensor = supplier + .MetricsRegistry + .GetSensors() + .FirstOrDefault(s => s.Name.Equals("librdkafka.consume.sensor.messages-consumed-total")); + + Assert.IsNotNull(sensor); + Assert.AreEqual(0, sensor.Metrics.Values.Sum(v => (double)v.Value)); + } } } \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs index f0f92a67..6ea0247d 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/KafkaLoggerAdapterTests.cs @@ -1,11 +1,17 @@ using System; using System.Collections.Generic; using System.IO; +using System.Reflection; +using System.Threading; +using Confluent.Kafka; +using Moq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NUnit.Framework; -using Streamiz.Kafka.Net.Kafka.Internal; +using Streamiz.Kafka.Net.Crosscutting; +using Streamiz.Kafka.Net.Kafka; using Streamiz.Kafka.Net.Mock.Kafka; +using Streamiz.Kafka.Net.Tests.Helpers; namespace Streamiz.Kafka.Net.Tests.Private { @@ -124,6 +130,34 @@ private static string GetLogLevelString(LogLevel logLevel) public List<string> Logs { get; } = new List<string>(); } + [Test] + public void LogAndErrorWithThreadName() + { + Thread.CurrentThread.Name = "test-adapter"; + + var mockConsumer = new MockConsumer(null, "group", "CONSUMER"); + var mockProducer = new MockProducer(null, "PRODUCER"); + var mockAdmin = new MockAdminClient(null, "ADMIN"); + + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + adapter.LogConsume(mockConsumer, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorConsume(mockConsumer, new Error(ErrorCode.ConcurrentTransactions)); + + adapter.LogProduce(mockProducer, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorProduce(mockProducer, new Error(ErrorCode.RecordListTooLarge)); + + adapter.LogAdmin(mockAdmin, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + adapter.ErrorAdmin(mockAdmin, + new Confluent.Kafka.Error(Confluent.Kafka.ErrorCode.ClusterAuthorizationFailed)); + } + [Test] public void TestAdapterLogProducer() { @@ -189,5 +223,85 @@ public void TestAdapterLogAdmin() Assert.AreEqual(1, logger.Logs.Count); logger.Logs.Clear(); } + + [Test] + public void TestAdapterGetNameNPE() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock<IConsumer<byte[], byte[]>>(); + client + .Setup(c => c.Name) + .Throws<NullReferenceException>(); + + int v = 1233; + Type handleType = typeof(object); + object handle = ReflectionHelperExtensionMethods.GetInstance( + "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); + handle.SetPrivateFieldsValue(handleType, "handle", new IntPtr(v)); + + Handle h = new Handle(); + h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); + + client.Setup(c => c.Handle) + .Returns(() => h); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + + handle.SetPrivateFieldsValue(handleType, "handle", IntPtr.Zero); + handle = null; + } + + [Test] + public void TestAdapterNullHandle() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock<IConsumer<byte[], byte[]>>(); + client + .Setup(c => c.Name) + .Throws<NullReferenceException>(); + + client.Setup(c => c.Handle) + .Returns(() => new Handle()); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + } + + [Test] + public void TestAdapterHandleInvalid() + { + var config = new StreamConfig(); + config.ApplicationId = "test-logger-adapter"; + var logger = new InMemoryLogger(); + var adapter = new KafkaLoggerAdapter(config, logger); + + var client = new Mock<IConsumer<byte[], byte[]>>(); + client + .Setup(c => c.Name) + .Throws<NullReferenceException>(); + + Type handleType = typeof(object); + object handle = ReflectionHelperExtensionMethods.GetInstance( + "Confluent.Kafka.Impl.SafeKafkaHandle", ref handleType); + Handle h = new Handle(); + h.SetPrivatePropertyValue("LibrdkafkaHandle", handle); + + client.Setup(c => c.Handle) + .Returns(() => h); + + adapter.LogConsume(client.Object, + new Confluent.Kafka.LogMessage("error", Confluent.Kafka.SyslogLevel.Critical, "", "error")); + + handle = null; + } } } \ No newline at end of file