Skip to content

Commit

Permalink
#346 - Make kafka supplier public
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Jul 16, 2024
1 parent 0991e07 commit e56848d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,54 +1,87 @@
using System;
using Confluent.Kafka;
using Newtonsoft.Json;
using Streamiz.Kafka.Net.Kafka.Internal;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Librdkafka;
using Streamiz.Kafka.Net.Table;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class DefaultKafkaClientBuilder
/// <summary>
/// Default builder used to provide Kafka clients builder
/// </summary>
public class DefaultKafkaClientBuilder
{
/// <summary>
/// Get the consumer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ConsumerBuilder<byte[], byte[]> GetConsumerBuilder(ConsumerConfig config)
=> new(config);

/// <summary>
/// Get the admin client builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config)
=> new(config);

/// <summary>
/// Get the producer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ProducerBuilder<byte[], byte[]> GetProducerBuilder(ProducerConfig config)
=> new(config);
}

internal class DefaultKafkaClientSupplier : IKafkaSupplier
/// <summary>
/// Default <see cref="IKafkaSupplier"/> can be used to provide custom Kafka clients to a <see cref="KafkaStream"/> instance.
/// </summary>
public class DefaultKafkaClientSupplier : IKafkaSupplier
{
private readonly KafkaLoggerAdapter loggerAdapter;
private readonly IStreamConfig streamConfig;
private readonly bool exposeLibrdKafka;
private readonly DefaultKafkaClientBuilder builderKafkaHandler;

public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter)
: this(loggerAdapter, null)
{ }

/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig)
: this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder())
{ }

internal DefaultKafkaClientSupplier(
/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
/// <param name="builderKafkaHandler"></param>
/// <exception cref="ArgumentNullException"></exception>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig,
DefaultKafkaClientBuilder builderKafkaHandler)
{
if (loggerAdapter == null)
throw new ArgumentNullException(nameof(loggerAdapter));

this.loggerAdapter = loggerAdapter;
this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter));
this.streamConfig = streamConfig;
exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false;
this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder();
}

/// <summary>
/// Create an admin kafka client which is used for internal topic management.
/// </summary>
/// <param name="config">Admin configuration can't be null</param>
/// <returns>Return an admin client instance</returns>
public IAdminClient GetAdmin(AdminClientConfig config)
{
AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config);
Expand All @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka consumer with <see cref="ConsumerConfig"/> instance and <see cref="IConsumerRebalanceListener"/> listener.
/// </summary>
/// <param name="config">Consumer configuration can't be null</param>
/// <param name="rebalanceListener">Rebalance listener (Nullable)</param>
/// <returns>Return a kafka consumer built</returns>
public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand Down Expand Up @@ -88,6 +127,11 @@ public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerReb
return builder.Build();
}

/// <summary>
/// Build a kafka producer with <see cref="ProducerConfig"/> instance.
/// </summary>
/// <param name="config">Producer configuration can't be null</param>
/// <returns>Return a kafka producer built</returns>
public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
{
ProducerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetProducerBuilder(config);
Expand All @@ -113,6 +157,11 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka restore consumer with <see cref="ConsumerConfig"/> instance for read record to restore statestore.
/// </summary>
/// <param name="config">Restore consumer configuration can't be null</param>
/// <returns>Return a kafka restore consumer built</returns>
public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand All @@ -121,6 +170,11 @@ public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka global consumer with <see cref="ConsumerConfig"/> which is used to consume records for <see cref="IGlobalKTable{K,V}"/>.
/// </summary>
/// <param name="config">Global consumer configuration can't be null</param>
/// <returns>Return a kafka global consumer built</returns>
public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
{
config.AutoOffsetReset = AutoOffsetReset.Earliest;
Expand Down Expand Up @@ -148,6 +202,10 @@ public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Get or set the metrics registry.
/// This registry will be capture all librdkafka statistics if <see cref="IStreamConfig.ExposeLibrdKafkaStats"/> is enable and forward these into the metrics reporter
/// </summary>
public StreamMetricsRegistry MetricsRegistry { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using Confluent.Kafka;
using Streamiz.Kafka.Net.Crosscutting;
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Errors;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class KafkaLoggerAdapter
/// <summary>
/// Kafka log adapter to intercept librdkafka internal logs
/// </summary>
public class KafkaLoggerAdapter
{
private readonly ILogger log;

/// <summary>
///
/// </summary>
/// <param name="configuration"></param>
public KafkaLoggerAdapter(IStreamConfig configuration)
: this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter)))
{
Expand Down Expand Up @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message)

private string GetName(IClient client)
{
// FOR FIX
string name = "";
try
{
if (client.Handle == null || client.Handle.IsInvalid)
return "Unknown";
name = client.Name;
}
catch (NullReferenceException)
Expand Down

0 comments on commit e56848d

Please sign in to comment.