Skip to content

Commit

Permalink
Merge branch 'develop' into feature/197
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec authored Jun 20, 2024
2 parents 51faddc + 54608e4 commit 86c7deb
Show file tree
Hide file tree
Showing 48 changed files with 604 additions and 132 deletions.
19 changes: 18 additions & 1 deletion core/Kafka/Internal/DefaultKafkaClientSupplier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config, IConsumerReb
{
builder.SetPartitionsAssignedHandler(rebalanceListener.PartitionsAssigned);
builder.SetPartitionsRevokedHandler(rebalanceListener.PartitionsRevoked);
builder.SetPartitionsLostHandler(rebalanceListener.PartitionsLost);
builder.SetLogHandler(loggerAdapter.LogConsume);
builder.SetErrorHandler(loggerAdapter.ErrorConsume);
}
Expand Down Expand Up @@ -129,6 +128,24 @@ public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
ConsumerBuilder<byte[], byte[]> builder = builderKafkaHandler.GetConsumerBuilder(config);
builder.SetLogHandler(loggerAdapter.LogConsume);
builder.SetErrorHandler(loggerAdapter.ErrorConsume);

if (exposeLibrdKafka)
{
var consumerStatisticsHandler = new ConsumerStatisticsHandler(
config.ClientId,
config is StreamizConsumerConfig streamizConsumerConfig && streamizConsumerConfig.Config != null ?
streamizConsumerConfig.Config.ApplicationId :
streamConfig.ApplicationId,
(config as StreamizConsumerConfig)?.ThreadId,
true);
consumerStatisticsHandler.Register(MetricsRegistry);
builder.SetStatisticsHandler((c, stat) =>
{
var statistics = JsonConvert.DeserializeObject<Statistics>(stat);
consumerStatisticsHandler.Publish(statistics);
});
}

return builder.Build();
}

Expand Down
4 changes: 2 additions & 2 deletions core/Kafka/Internal/RecordCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Streamiz.Kafka.Net.Kafka.Internal
// TODO : Need to refactor, not necessary now to have one producer by thread if EOS is enable, see EOS_V2
internal class RecordCollector : IRecordCollector
{
private class RetryRecord
private sealed class RetryRecord
{
public byte[] Key { get; set; }
public byte[] Value { get; set; }
Expand All @@ -26,7 +26,7 @@ private class RetryRecord
public string Topic { get; set; }
}

private class RetryRecordContext
private sealed class RetryRecordContext
{
private readonly Dictionary<string, RetryRecord> records;
private readonly Queue<RetryRecord> queueRecords;
Expand Down
4 changes: 2 additions & 2 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ string Protect(string str)
GlobalStreamThreadFactory globalStreamThreadFactory = new GlobalStreamThreadFactory(
globalTaskTopology,
globalThreadId,
kafkaSupplier.GetGlobalConsumer(configuration.ToGlobalConsumerConfig(globalThreadId)),
kafkaSupplier.GetGlobalConsumer(configuration.ToGlobalConsumerConfig(globalThreadId).Wrap(globalThreadId)),
configuration,
kafkaSupplier.GetAdmin(configuration.ToAdminConfig(clientId)),
metricsRegistry);
Expand Down Expand Up @@ -444,7 +444,7 @@ await Task.Factory.StartNew(async () =>
{
foreach (var innerE in e.InnerExceptions)
{
logger.LogError($"{logPrefix}Error during initializing internal topics : {innerE.Message}");
logger.Log(LogLevel.Error, innerE, $"{logPrefix}Error during initializing internal topics");
SetState(State.PENDING_SHUTDOWN);
SetState(State.ERROR);
}
Expand Down
14 changes: 11 additions & 3 deletions core/Metrics/Librdkafka/ConsumerStatisticsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Streamiz.Kafka.Net.Metrics.Librdkafka
{
internal class ConsumerStatisticsHandler : LibrdKafkaStatisticsHandler
{
private readonly bool isGlobalConsumer;

private Sensor TotalNumberOfMessagesConsumedSensor;
private Sensor TotalNumberOfMessageBytesConsumedSensor;
private Sensor NumberOfOpsWaitinInQueueSensor; // Sensor
Expand All @@ -32,8 +34,13 @@ internal class ConsumerStatisticsHandler : LibrdKafkaStatisticsHandler
public ConsumerStatisticsHandler(
string clientId,
string streamAppId,
string threadId = null) : base(clientId, streamAppId, threadId)
{ }
string threadId = null,
bool isGlobalConsumer = false)
: base(clientId, streamAppId, threadId)
{
this.isGlobalConsumer = isGlobalConsumer;
}


public override void Register(StreamMetricsRegistry metricsRegistry)
{
Expand Down Expand Up @@ -118,7 +125,8 @@ private void PublishTopicsStats(Dictionary<string, TopicStatistic> statisticsTop
(LibrdKafkaBaseMetrics.TOPIC_TAG, topic.Value.TopicName),
(LibrdKafkaBaseMetrics.BROKER_ID_TAG, partition.Value.BrokerId.ToString()),
(LibrdKafkaBaseMetrics.PARTITION_ID_TAG, partition.Value.PartitionId.ToString()))
, partition.Value.ConsumerLag, now);
, !isGlobalConsumer ? partition.Value.ConsumerLag : partition.Value.ConsumerLagStored,
now);

LibrdKafkaSensor.ScopedLibrdKafkaSensor.Record(TotalNumberOfMessagesConsumedByPartitionSensor
.Scoped(
Expand Down
3 changes: 3 additions & 0 deletions core/Metrics/Librdkafka/PartitionStatistic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public enum PartitionState

[JsonProperty(PropertyName = "consumer_lag")]
public long ConsumerLag; // Gauge

[JsonProperty(PropertyName = "consumer_lag_stored")]
public long ConsumerLagStored; // Gauge

[JsonProperty(PropertyName = "txmsgs")]
public long TotalNumberOfMessagesProduced; // Gauge
Expand Down
4 changes: 0 additions & 4 deletions core/Mock/Kafka/MockConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,10 @@ public void Seek(TopicPartitionOffset tpo)

public void StoreOffset(TopicPartitionOffset offset)
{
// TODO
throw new NotImplementedException();
}

public void StoreOffset(ConsumeResult<byte[], byte[]> result)
{
// TODO
throw new NotImplementedException();
}

public void Subscribe(IEnumerable<string> topics)
Expand Down
4 changes: 0 additions & 4 deletions core/Mock/Sync/SyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,10 @@ public void Seek(TopicPartitionOffset tpo)

public void StoreOffset(TopicPartitionOffset offset)
{
// TODO
throw new NotImplementedException();
}

public void StoreOffset(ConsumeResult<byte[], byte[]> result)
{
// TODO
throw new NotImplementedException();
}

public void Subscribe(IEnumerable<string> topics)
Expand Down
35 changes: 27 additions & 8 deletions core/Processors/GlobalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Metrics;

namespace Streamiz.Kafka.Net.Processors
{
Expand All @@ -22,6 +23,7 @@ private class StateConsumer
private readonly long maxPollRecords;
private DateTime lastFlush;


public StateConsumer(
IConsumer<byte[], byte[]> globalConsumer,
IGlobalStateMaintainer globalStateMaintainer,
Expand All @@ -39,13 +41,16 @@ public StateConsumer(
public void Initialize()
{
IDictionary<TopicPartition, long> partitionOffsets = globalStateMaintainer.Initialize();
globalConsumer.Assign(
partitionOffsets
.Keys
.Select(
x => partitionOffsets[x] >= 0 ?
new TopicPartitionOffset(x, partitionOffsets[x] + 1 )
: new TopicPartitionOffset(x, Offset.Beginning)));
var mappedPartitions = partitionOffsets
.Keys
.Select(
x => partitionOffsets[x] >= 0
? new TopicPartitionOffset(x, partitionOffsets[x] + 1)
: new TopicPartitionOffset(x, Offset.Beginning)).ToList();

globalConsumer.Assign(mappedPartitions);
foreach(var tpo in mappedPartitions)
globalConsumer.StoreOffset(tpo);

lastFlush = DateTime.Now;
}
Expand All @@ -58,6 +63,7 @@ public void PollAndUpdate()
foreach (var record in received)
{
globalStateMaintainer.Update(record);
globalConsumer.StoreOffset(record);
}

DateTime dt = DateTime.Now;
Expand Down Expand Up @@ -101,23 +107,29 @@ public void Close()
private readonly ILogger log = Logger.GetLogger(typeof(GlobalStreamThread));
private readonly Thread thread;
private readonly string logPrefix;
private readonly string threadClientId;
private readonly IConsumer<byte[], byte[]> globalConsumer;
private CancellationToken token;
private readonly object stateLock = new object();
private readonly IStreamConfig configuration;
private StateConsumer stateConsumer;
private readonly IGlobalStateMaintainer globalStateMaintainer;
private readonly StreamMetricsRegistry metricsRegistry;
private DateTime lastMetrics = DateTime.Now;

public GlobalStreamThread(string threadClientId,
IConsumer<byte[], byte[]> globalConsumer,
IStreamConfig configuration,
IGlobalStateMaintainer globalStateMaintainer)
IGlobalStateMaintainer globalStateMaintainer,
StreamMetricsRegistry metricsRegistry)
{
logPrefix = $"global-stream-thread {threadClientId} ";

this.threadClientId = threadClientId;
this.globalConsumer = globalConsumer;
this.configuration = configuration;
this.globalStateMaintainer = globalStateMaintainer;
this.metricsRegistry = metricsRegistry;

thread = new Thread(Run);
State = GlobalThreadState.CREATED;
Expand All @@ -131,6 +143,13 @@ private void Run()
while (!token.IsCancellationRequested && State.IsRunning())
{
stateConsumer.PollAndUpdate();

if (lastMetrics.Add(TimeSpan.FromMilliseconds(configuration.MetricsIntervalMs)) <
DateTime.Now)
{
MetricUtils.ExportMetrics(metricsRegistry, configuration, threadClientId);
lastMetrics = DateTime.Now;
}
}
}
finally
Expand Down
2 changes: 1 addition & 1 deletion core/Processors/GlobalStreamThreadFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public GlobalStreamThread GetGlobalStreamThread()
stateManager.SetGlobalProcessorContext(context);
var globalStateUpdateTask = new GlobalStateUpdateTask(stateManager, topology, context);

return new GlobalStreamThread(threadClientId, globalConsumer, configuration, globalStateUpdateTask);
return new GlobalStreamThread(threadClientId, globalConsumer, configuration, globalStateUpdateTask, streamMetricsRegistry);
}
}
}
20 changes: 20 additions & 0 deletions core/Processors/IRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Streamiz.Kafka.Net.Processors
{
/// <summary>
/// An interface that allows to dynamically determine the timestamp of the record stored in the Kafka topic.
/// </summary>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
public interface IRecordTimestampExtractor<K,V>
{
/// <summary>
/// Extracts the timestamp of the record stored in the Kafka topic.
/// </summary>
/// <param name="key">the record key</param>
/// <param name="value">the record value</param>
/// <param name="recordContext">current context metadata of the record</param>
/// <returns>the timestamp of the record</returns>
long Extract(K key, V value, IRecordContext recordContext);

}
}
16 changes: 16 additions & 0 deletions core/Processors/Internal/DefaultRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class DefaultRecordTimestampExtractor<K, V> : IRecordTimestampExtractor<K, V>
{
private readonly Func<K, V, IRecordContext, long> timestampExtractor;

public DefaultRecordTimestampExtractor()
{
this.timestampExtractor = (k, v, ctx) => ctx.Timestamp;
}

public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value, recordContext);
}
}
4 changes: 2 additions & 2 deletions core/Processors/Internal/InternalTopologyBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ internal void AddSourceOperator<K, V>(string topic, string nameNode, ConsumedInt
nodeGroups = null;
}

internal void AddSinkOperator<K, V>(ITopicNameExtractor<K, V> topicNameExtractor, string nameNode, Produced<K, V> produced, params string[] previousProcessorNames)
internal void AddSinkOperator<K, V>(ITopicNameExtractor<K, V> topicNameExtractor, IRecordTimestampExtractor<K, V> timestampExtractor, string nameNode, Produced<K, V> produced, params string[] previousProcessorNames)
{
if (nodeFactories.ContainsKey(nameNode))
{
throw new TopologyException($"Sink processor {nameNode} is already added.");
}

nodeFactories.Add(nameNode,
new SinkNodeFactory<K, V>(nameNode, previousProcessorNames, topicNameExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
new SinkNodeFactory<K, V>(nameNode, previousProcessorNames, topicNameExtractor, timestampExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
nodeGrouper.Add(nameNode);
nodeGrouper.Unite(nameNode, previousProcessorNames);
nodeGroups = null;
Expand Down
21 changes: 12 additions & 9 deletions core/Processors/Internal/NodeFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ internal interface ISinkNodeFactory : INodeFactory

internal class SinkNodeFactory<K, V> : NodeFactory, ISinkNodeFactory
{
public ITopicNameExtractor<K, V> Extractor { get; }
public ITopicNameExtractor<K, V> TopicExtractor { get; }
public IRecordTimestampExtractor<K, V> TimestampExtractor { get; }
public ISerDes<K> KeySerdes { get; }
public ISerDes<V> ValueSerdes { get; }
public Func<string, K, V, int> ProducedPartitioner { get; }
Expand All @@ -85,29 +86,31 @@ public string Topic
{
get
{
return Extractor is StaticTopicNameExtractor<K, V> ?
((StaticTopicNameExtractor<K, V>)Extractor).TopicName :
return TopicExtractor is StaticTopicNameExtractor<K, V> ?
((StaticTopicNameExtractor<K, V>)TopicExtractor).TopicName :
null;
}
}

public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor<K, V> topicExtractor,
public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor<K, V> topicExtractor,
IRecordTimestampExtractor<K, V> timestampExtractor,
ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> producedPartitioner)
: base(name, previous)
{
Extractor = topicExtractor;
TopicExtractor = topicExtractor;
TimestampExtractor = timestampExtractor;
KeySerdes = keySerdes;
ValueSerdes = valueSerdes;
ProducedPartitioner = producedPartitioner;
}

public override IProcessor Build()
=> new SinkProcessor<K, V>(Name, Extractor, KeySerdes, ValueSerdes, ProducedPartitioner);
=> new SinkProcessor<K, V>(Name, TopicExtractor, TimestampExtractor, KeySerdes, ValueSerdes, ProducedPartitioner);

public override NodeDescription Describe()
=> Extractor is StaticTopicNameExtractor<K, V> ?
new SinkNodeDescription(Name, ((StaticTopicNameExtractor<K, V>)Extractor).TopicName) :
new SinkNodeDescription(Name, Extractor?.GetType());
=> TopicExtractor is StaticTopicNameExtractor<K, V> ?
new SinkNodeDescription(Name, ((StaticTopicNameExtractor<K, V>)TopicExtractor).TopicName) :
new SinkNodeDescription(Name, TopicExtractor?.GetType());
}

#endregion
Expand Down
16 changes: 16 additions & 0 deletions core/Processors/Internal/WrapperRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class WrapperRecordTimestampExtractor<K, V> : IRecordTimestampExtractor<K, V>
{
private readonly Func<K, V, IRecordContext, long> timestampExtractor;

public WrapperRecordTimestampExtractor(Func<K,V,IRecordContext,long> timestampExtractor)
{
this.timestampExtractor = timestampExtractor;
}

public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value ,recordContext);
}
}
7 changes: 4 additions & 3 deletions core/Processors/SinkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ internal interface ISinkProcessor
internal class SinkProcessor<K, V> : AbstractProcessor<K, V>, ISinkProcessor
{
private ITopicNameExtractor<K, V> topicNameExtractor;
private readonly IRecordTimestampExtractor<K, V> timestampExtractor;
private readonly Func<string, K, V, int> partitioner;

internal SinkProcessor(string name, ITopicNameExtractor<K, V> topicNameExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> partitioner = null)
internal SinkProcessor(string name, ITopicNameExtractor<K, V> topicNameExtractor, IRecordTimestampExtractor<K, V> timestampExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> partitioner = null)
: base(name, keySerdes, valueSerdes)
{
this.topicNameExtractor = topicNameExtractor;
this.timestampExtractor = timestampExtractor;
this.partitioner = partitioner;
}

Expand Down Expand Up @@ -55,8 +57,7 @@ public override void Process(K key, V value)
}

var topicName = topicNameExtractor.Extract(key, value, Context.RecordContext);

var timestamp = Context.Timestamp;
var timestamp = timestampExtractor.Extract(key, value, Context.RecordContext);
if (timestamp < 0)
{
throw new StreamsException($"Invalid (negative) timestamp of {timestamp} for output record <{key}:{value}>.");
Expand Down
Loading

0 comments on commit 86c7deb

Please sign in to comment.