Skip to content

Commit

Permalink
Merge pull request #350 from LGouellec/issue/338
Browse files Browse the repository at this point in the history
Issue/338
  • Loading branch information
LGouellec authored Jul 30, 2024
2 parents 5546c5a + 3986d9c commit dbe7eb6
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,54 +1,87 @@
using System;
using Confluent.Kafka;
using Newtonsoft.Json;
using Streamiz.Kafka.Net.Kafka.Internal;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Librdkafka;
using Streamiz.Kafka.Net.Table;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class DefaultKafkaClientBuilder
/// <summary>
/// Default builder used to provide Kafka clients builder
/// </summary>
public class DefaultKafkaClientBuilder
{
/// <summary>
/// Get the consumer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ConsumerBuilder<byte[], byte[]> GetConsumerBuilder(ConsumerConfig config)
=> new(config);

/// <summary>
/// Get the admin client builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config)
=> new(config);

/// <summary>
/// Get the producer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ProducerBuilder<byte[], byte[]> GetProducerBuilder(ProducerConfig config)
=> new(config);
}

internal class DefaultKafkaClientSupplier : IKafkaSupplier
/// <summary>
/// Default <see cref="IKafkaSupplier"/> can be used to provide custom Kafka clients to a <see cref="KafkaStream"/> instance.
/// </summary>
public class DefaultKafkaClientSupplier : IKafkaSupplier
{
private readonly KafkaLoggerAdapter loggerAdapter;
private readonly IStreamConfig streamConfig;
private readonly bool exposeLibrdKafka;
private readonly DefaultKafkaClientBuilder builderKafkaHandler;

public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter)
: this(loggerAdapter, null)
{ }

/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig)
: this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder())
{ }

internal DefaultKafkaClientSupplier(
/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
/// <param name="builderKafkaHandler"></param>
/// <exception cref="ArgumentNullException"></exception>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig,
DefaultKafkaClientBuilder builderKafkaHandler)
{
if (loggerAdapter == null)
throw new ArgumentNullException(nameof(loggerAdapter));

this.loggerAdapter = loggerAdapter;
this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter));
this.streamConfig = streamConfig;
exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false;
this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder();
}

/// <summary>
/// Create an admin kafka client which is used for internal topic management.
/// </summary>
/// <param name="config">Admin configuration can't be null</param>
/// <returns>Return an admin client instance</returns>
public IAdminClient GetAdmin(AdminClientConfig config)
{
AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config);
Expand All @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka consumer with <see cref="ConsumerConfig"/> instance and <see cref="IConsumerRebalanceListener"/> listener.
/// </summary>
/// <param name="config">Consumer configuration can't be null</param>
/// <param name="rebalanceListener">Rebalance listener (Nullable)</param>
/// <returns>Return a kafka consumer built</returns>
public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand Down Expand Up @@ -88,6 +127,11 @@ public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerReb
return builder.Build();
}

/// <summary>
/// Build a kafka producer with <see cref="ProducerConfig"/> instance.
/// </summary>
/// <param name="config">Producer configuration can't be null</param>
/// <returns>Return a kafka producer built</returns>
public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
{
ProducerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetProducerBuilder(config);
Expand All @@ -113,6 +157,11 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka restore consumer with <see cref="ConsumerConfig"/> instance for read record to restore statestore.
/// </summary>
/// <param name="config">Restore consumer configuration can't be null</param>
/// <returns>Return a kafka restore consumer built</returns>
public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand All @@ -121,6 +170,11 @@ public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka global consumer with <see cref="ConsumerConfig"/> which is used to consume records for <see cref="IGlobalKTable{K,V}"/>.
/// </summary>
/// <param name="config">Global consumer configuration can't be null</param>
/// <returns>Return a kafka global consumer built</returns>
public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
{
config.AutoOffsetReset = AutoOffsetReset.Earliest;
Expand Down Expand Up @@ -148,6 +202,10 @@ public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Get or set the metrics registry.
/// This registry will be capture all librdkafka statistics if <see cref="IStreamConfig.ExposeLibrdKafkaStats"/> is enable and forward these into the metrics reporter
/// </summary>
public StreamMetricsRegistry MetricsRegistry { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using Confluent.Kafka;
using Streamiz.Kafka.Net.Crosscutting;
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Errors;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class KafkaLoggerAdapter
/// <summary>
/// Kafka log adapter to intercept librdkafka internal logs
/// </summary>
public class KafkaLoggerAdapter
{
private readonly ILogger log;

/// <summary>
///
/// </summary>
/// <param name="configuration"></param>
public KafkaLoggerAdapter(IStreamConfig configuration)
: this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter)))
{
Expand Down Expand Up @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message)

private string GetName(IClient client)
{
// FOR FIX
string name = "";
try
{
if (client.Handle == null || client.Handle.IsInvalid)
return "Unknown";
name = client.Name;
}
catch (NullReferenceException)
Expand Down
2 changes: 1 addition & 1 deletion core/Mock/Kafka/MockAdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public MockAdminClient(MockCluster cluster, string name)
this.cluster = cluster;
}

public override Handle Handle => throw new NotImplementedException();
public override Handle Handle => null;

public override string Name { get; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Linq;
using System.Reflection;

namespace Streamiz.Kafka.Net.Tests.Helpers;

public static class ReflectionHelperExtensionMethods
{
public static void SetPrivatePropertyValue<T, V>(this T member, string propName, V newValue)
{
PropertyInfo[] propertiesInfo =
typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var propertyInfo = propertiesInfo.FirstOrDefault(p => p.Name.Equals(propName));

if (propertyInfo == null) return;
propertyInfo.SetValue(member, newValue);
}

public static void SetPrivateFieldsValue<V>(this object member, Type type, string propName, V newValue)
{
FieldInfo[] fieldsInfo =
type.GetFields(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var fieldInfo = fieldsInfo.FirstOrDefault(p => p.Name.Equals(propName));

if (fieldInfo == null) return;
fieldInfo.SetValue(member, newValue);
}

public static object GetInstance(string strFullyQualifiedName, ref Type outputType)
{
Type type = Type.GetType(strFullyQualifiedName);
if (type != null)
{
outputType = type;
return Activator.CreateInstance(type);
}

foreach (var asm in AppDomain.CurrentDomain.GetAssemblies())
{
type = asm.GetType(strFullyQualifiedName);
if (type != null)
{
outputType = type;
return Activator.CreateInstance(type);
}
}
return null;
}
}
Loading

0 comments on commit dbe7eb6

Please sign in to comment.