Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/338 #350

Merged
merged 8 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,54 +1,87 @@
using System;
using Confluent.Kafka;
using Newtonsoft.Json;
using Streamiz.Kafka.Net.Kafka.Internal;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Librdkafka;
using Streamiz.Kafka.Net.Table;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class DefaultKafkaClientBuilder
/// <summary>
/// Default builder used to provide Kafka clients builder
/// </summary>
public class DefaultKafkaClientBuilder
{
/// <summary>
/// Get the consumer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ConsumerBuilder<byte[], byte[]> GetConsumerBuilder(ConsumerConfig config)
=> new(config);

/// <summary>
/// Get the admin client builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual AdminClientBuilder GetAdminBuilder(AdminClientConfig config)
=> new(config);

/// <summary>
/// Get the producer builder
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public virtual ProducerBuilder<byte[], byte[]> GetProducerBuilder(ProducerConfig config)
=> new(config);
}

internal class DefaultKafkaClientSupplier : IKafkaSupplier
/// <summary>
/// Default <see cref="IKafkaSupplier"/> can be used to provide custom Kafka clients to a <see cref="KafkaStream"/> instance.
/// </summary>
public class DefaultKafkaClientSupplier : IKafkaSupplier
{
private readonly KafkaLoggerAdapter loggerAdapter;
private readonly IStreamConfig streamConfig;
private readonly bool exposeLibrdKafka;
private readonly DefaultKafkaClientBuilder builderKafkaHandler;

public DefaultKafkaClientSupplier(KafkaLoggerAdapter loggerAdapter)
: this(loggerAdapter, null)
{ }

/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig)
: this(loggerAdapter, streamConfig, new DefaultKafkaClientBuilder())
{ }

internal DefaultKafkaClientSupplier(
/// <summary>
///
/// </summary>
/// <param name="loggerAdapter"></param>
/// <param name="streamConfig"></param>
/// <param name="builderKafkaHandler"></param>
/// <exception cref="ArgumentNullException"></exception>
public DefaultKafkaClientSupplier(
KafkaLoggerAdapter loggerAdapter,
IStreamConfig streamConfig,
DefaultKafkaClientBuilder builderKafkaHandler)
{
if (loggerAdapter == null)
throw new ArgumentNullException(nameof(loggerAdapter));

this.loggerAdapter = loggerAdapter;
this.loggerAdapter = loggerAdapter ?? throw new ArgumentNullException(nameof(loggerAdapter));
this.streamConfig = streamConfig;
exposeLibrdKafka = streamConfig?.ExposeLibrdKafkaStats ?? false;
this.builderKafkaHandler = builderKafkaHandler ?? new DefaultKafkaClientBuilder();
}

/// <summary>
/// Create an admin kafka client which is used for internal topic management.
/// </summary>
/// <param name="config">Admin configuration can't be null</param>
/// <returns>Return an admin client instance</returns>
public IAdminClient GetAdmin(AdminClientConfig config)
{
AdminClientBuilder builder = builderKafkaHandler.GetAdminBuilder(config);
Expand All @@ -57,6 +90,12 @@ public IAdminClient GetAdmin(AdminClientConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka consumer with <see cref="ConsumerConfig"/> instance and <see cref="IConsumerRebalanceListener"/> listener.
/// </summary>
/// <param name="config">Consumer configuration can't be null</param>
/// <param name="rebalanceListener">Rebalance listener (Nullable)</param>
/// <returns>Return a kafka consumer built</returns>
public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerRebalanceListener rebalanceListener)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand Down Expand Up @@ -88,6 +127,11 @@ public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerReb
return builder.Build();
}

/// <summary>
/// Build a kafka producer with <see cref="ProducerConfig"/> instance.
/// </summary>
/// <param name="config">Producer configuration can't be null</param>
/// <returns>Return a kafka producer built</returns>
public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
{
ProducerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetProducerBuilder(config);
Expand All @@ -113,6 +157,11 @@ public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka restore consumer with <see cref="ConsumerConfig"/> instance for read record to restore statestore.
/// </summary>
/// <param name="config">Restore consumer configuration can't be null</param>
/// <returns>Return a kafka restore consumer built</returns>
public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
{
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
Expand All @@ -121,6 +170,11 @@ public IConsumer<byte[], byte[]> GetRestoreConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Build a kafka global consumer with <see cref="ConsumerConfig"/> which is used to consume records for <see cref="IGlobalKTable{K,V}"/>.
/// </summary>
/// <param name="config">Global consumer configuration can't be null</param>
/// <returns>Return a kafka global consumer built</returns>
public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
{
config.AutoOffsetReset = AutoOffsetReset.Earliest;
Expand Down Expand Up @@ -148,6 +202,10 @@ public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
return builder.Build();
}

/// <summary>
/// Get or set the metrics registry.
/// This registry will be capture all librdkafka statistics if <see cref="IStreamConfig.ExposeLibrdKafkaStats"/> is enable and forward these into the metrics reporter
/// </summary>
public StreamMetricsRegistry MetricsRegistry { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
using Confluent.Kafka;
using Streamiz.Kafka.Net.Crosscutting;
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Errors;

namespace Streamiz.Kafka.Net.Kafka.Internal
namespace Streamiz.Kafka.Net.Kafka
{
internal class KafkaLoggerAdapter
/// <summary>
/// Kafka log adapter to intercept librdkafka internal logs
/// </summary>
public class KafkaLoggerAdapter
{
private readonly ILogger log;

/// <summary>
///
/// </summary>
/// <param name="configuration"></param>
public KafkaLoggerAdapter(IStreamConfig configuration)
: this(configuration, configuration.Logger.CreateLogger(typeof(KafkaLoggerAdapter)))
{
Expand Down Expand Up @@ -71,10 +76,11 @@ internal void LogAdmin(IAdminClient admin, LogMessage message)

private string GetName(IClient client)
{
// FOR FIX
string name = "";
try
{
if (client.Handle == null || client.Handle.IsInvalid)
return "Unknown";
name = client.Name;
}
catch (NullReferenceException)
Expand Down
2 changes: 1 addition & 1 deletion core/Mock/Kafka/MockAdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public MockAdminClient(MockCluster cluster, string name)
this.cluster = cluster;
}

public override Handle Handle => throw new NotImplementedException();
public override Handle Handle => null;

public override string Name { get; }

Expand Down
9 changes: 5 additions & 4 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ internal ConsumeResult<byte[], byte[]> Consume(MockConsumer mockConsumer, TimeSp
Key = record.Key,
Value = record.Value,
Timestamp = new Timestamp(
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable)
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable),
Headers = record.Headers
}
};
++offset.OffsetConsumed;
Expand Down Expand Up @@ -800,7 +801,7 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by
else
partition = Math.Abs(MurMurHash3.Hash(new MemoryStream(message.Key))) % topics[topic].PartitionNumber;

topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs);
topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs, message.Headers);

r.Message = message;
r.Partition = partition;
Expand All @@ -822,13 +823,13 @@ internal DeliveryReport<byte[], byte[]> Produce(TopicPartition topicPartition, M
CreateTopic(topicPartition.Topic);
if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition)
{
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}
else
{
topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}

Expand Down
9 changes: 5 additions & 4 deletions core/Mock/Kafka/MockPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockPartition
{
private readonly List<(byte[], byte[], long)> log = new();
private readonly List<(byte[], byte[], long, Headers)> log = new();
private readonly Dictionary<long, long> mappingOffsets = new();

public MockPartition(int indice)
Expand All @@ -21,10 +21,10 @@ public MockPartition(int indice)
public long LowOffset { get; private set; } = Offset.Unset;
public long HighOffset { get; private set; } = Offset.Unset;

internal void AddMessageInLog(byte[] key, byte[] value, long timestamp)
internal void AddMessageInLog(byte[] key, byte[] value, long timestamp, Headers headers)
{
mappingOffsets.Add(Size, log.Count);
log.Add((key, value, timestamp));
log.Add((key, value, timestamp, headers));
++Size;
UpdateOffset();
}
Expand All @@ -47,7 +47,8 @@ internal TestRecord<byte[], byte[]> GetMessage(long offset)
{
Key = record.Item1,
Value = record.Item2,
Timestamp = record.Item3.FromMilliseconds()
Timestamp = record.Item3.FromMilliseconds(),
Headers = record.Item4
};
}

Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Kafka/MockTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public MockTopic(string topic, int part)
public int PartitionNumber { get; private set; }
public IEnumerable<MockPartition> Partitions => partitions.AsReadOnly();

public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0)
public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0, Headers headers = null)
{
partitions[partition].AddMessageInLog(key, value, timestamp);
partitions[partition].AddMessageInLog(key, value, timestamp, headers);
}

public TestRecord<byte[], byte[]> GetMessage(int partition, long consumerOffset)
Expand Down
14 changes: 8 additions & 6 deletions core/Mock/Sync/SyncPipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ public StreamTaskPublisher(StreamTask task)
private int offset = 0;

public void PublishRecord(string topic, byte[] key, byte[] value, DateTime timestamp, Headers headers)
=> task.AddRecord(new ConsumeResult<byte[], byte[]>
{
Topic = topic,
TopicPartitionOffset = new TopicPartitionOffset(new TopicPartition(topic, task.Id.Partition), offset++),
Message = new Message<byte[], byte[]> { Key = key, Value = value, Timestamp = new Timestamp(timestamp), Headers = headers }
});
=> task.AddRecord(new ConsumeResult<byte[], byte[]>
{
Topic = topic,
TopicPartitionOffset = new TopicPartitionOffset(new TopicPartition(topic, task.Id.Partition), offset++),
Message = new Message<byte[], byte[]> { Key = key, Value = value, Timestamp = new Timestamp(timestamp), Headers = headers }
});

public void Flush()
{
long now = DateTime.Now.GetMilliseconds();
TaskManager.CurrentTask = task;
while (task.CanProcess(now))
task.Process();

task.PunctuateStreamTime();
task.PunctuateSystemTime();
TaskManager.CurrentTask = null;
}

public void Close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Linq;
using System.Reflection;

namespace Streamiz.Kafka.Net.Tests.Helpers;

public static class ReflectionHelperExtensionMethods
{
public static void SetPrivatePropertyValue<T, V>(this T member, string propName, V newValue)
{
PropertyInfo[] propertiesInfo =
typeof(T).GetProperties(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var propertyInfo = propertiesInfo.FirstOrDefault(p => p.Name.Equals(propName));

if (propertyInfo == null) return;
propertyInfo.SetValue(member, newValue);
}

public static void SetPrivateFieldsValue<V>(this object member, Type type, string propName, V newValue)
{
FieldInfo[] fieldsInfo =
type.GetFields(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var fieldInfo = fieldsInfo.FirstOrDefault(p => p.Name.Equals(propName));

if (fieldInfo == null) return;
fieldInfo.SetValue(member, newValue);
}

public static object GetInstance(string strFullyQualifiedName, ref Type outputType)
{
Type type = Type.GetType(strFullyQualifiedName);
if (type != null)
{
outputType = type;
return Activator.CreateInstance(type);
}

foreach (var asm in AppDomain.CurrentDomain.GetAssemblies())
{
type = asm.GetType(strFullyQualifiedName);
if (type != null)
{
outputType = type;
return Activator.CreateInstance(type);
}
}
return null;
}
}
Loading
Loading