Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic record timestamp extractor #329

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/Processors/IRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Streamiz.Kafka.Net.Processors
{
/// <summary>
/// An interface that allows to dynamically determine the timestamp of the record stored in the Kafka topic.
/// </summary>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
public interface IRecordTimestampExtractor<K,V>
{
/// <summary>
/// Extracts the timestamp of the record stored in the Kafka topic.
/// </summary>
/// <param name="key">the record key</param>
/// <param name="value">the record value</param>
/// <param name="recordContext">current context metadata of the record</param>
/// <returns>the timestamp of the record</returns>
long Extract(K key, V value, IRecordContext recordContext);

}
}
16 changes: 16 additions & 0 deletions core/Processors/Internal/DefaultRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class DefaultRecordTimestampExtractor<K, V> : IRecordTimestampExtractor<K, V>
{
private readonly Func<K, V, IRecordContext, long> timestampExtractor;

public DefaultRecordTimestampExtractor()
{
this.timestampExtractor = (k, v, ctx) => ctx.Timestamp;
}

public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value, recordContext);
}
}
4 changes: 2 additions & 2 deletions core/Processors/Internal/InternalTopologyBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ internal void AddSourceOperator<K, V>(string topic, string nameNode, ConsumedInt
nodeGroups = null;
}

internal void AddSinkOperator<K, V>(ITopicNameExtractor<K, V> topicNameExtractor, string nameNode, Produced<K, V> produced, params string[] previousProcessorNames)
internal void AddSinkOperator<K, V>(ITopicNameExtractor<K, V> topicNameExtractor, IRecordTimestampExtractor<K, V> timestampExtractor, string nameNode, Produced<K, V> produced, params string[] previousProcessorNames)
{
if (nodeFactories.ContainsKey(nameNode))
{
throw new TopologyException($"Sink processor {nameNode} is already added.");
}

nodeFactories.Add(nameNode,
new SinkNodeFactory<K, V>(nameNode, previousProcessorNames, topicNameExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
new SinkNodeFactory<K, V>(nameNode, previousProcessorNames, topicNameExtractor, timestampExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
nodeGrouper.Add(nameNode);
nodeGrouper.Unite(nameNode, previousProcessorNames);
nodeGroups = null;
Expand Down
21 changes: 12 additions & 9 deletions core/Processors/Internal/NodeFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ internal interface ISinkNodeFactory : INodeFactory

internal class SinkNodeFactory<K, V> : NodeFactory, ISinkNodeFactory
{
public ITopicNameExtractor<K, V> Extractor { get; }
public ITopicNameExtractor<K, V> TopicExtractor { get; }
public IRecordTimestampExtractor<K, V> TimestampExtractor { get; }
public ISerDes<K> KeySerdes { get; }
public ISerDes<V> ValueSerdes { get; }
public Func<string, K, V, int> ProducedPartitioner { get; }
Expand All @@ -85,29 +86,31 @@ public string Topic
{
get
{
return Extractor is StaticTopicNameExtractor<K, V> ?
((StaticTopicNameExtractor<K, V>)Extractor).TopicName :
return TopicExtractor is StaticTopicNameExtractor<K, V> ?
((StaticTopicNameExtractor<K, V>)TopicExtractor).TopicName :
null;
}
}

public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor<K, V> topicExtractor,
public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor<K, V> topicExtractor,
IRecordTimestampExtractor<K, V> timestampExtractor,
ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> producedPartitioner)
: base(name, previous)
{
Extractor = topicExtractor;
TopicExtractor = topicExtractor;
TimestampExtractor = timestampExtractor;
KeySerdes = keySerdes;
ValueSerdes = valueSerdes;
ProducedPartitioner = producedPartitioner;
}

public override IProcessor Build()
=> new SinkProcessor<K, V>(Name, Extractor, KeySerdes, ValueSerdes, ProducedPartitioner);
=> new SinkProcessor<K, V>(Name, TopicExtractor, TimestampExtractor, KeySerdes, ValueSerdes, ProducedPartitioner);

public override NodeDescription Describe()
=> Extractor is StaticTopicNameExtractor<K, V> ?
new SinkNodeDescription(Name, ((StaticTopicNameExtractor<K, V>)Extractor).TopicName) :
new SinkNodeDescription(Name, Extractor?.GetType());
=> TopicExtractor is StaticTopicNameExtractor<K, V> ?
new SinkNodeDescription(Name, ((StaticTopicNameExtractor<K, V>)TopicExtractor).TopicName) :
new SinkNodeDescription(Name, TopicExtractor?.GetType());
}

#endregion
Expand Down
16 changes: 16 additions & 0 deletions core/Processors/Internal/WrapperRecordTimestampExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class WrapperRecordTimestampExtractor<K, V> : IRecordTimestampExtractor<K, V>
{
private readonly Func<K, V, IRecordContext, long> timestampExtractor;

public WrapperRecordTimestampExtractor(Func<K,V,IRecordContext,long> timestampExtractor)
{
this.timestampExtractor = timestampExtractor;
}

public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value ,recordContext);
}
}
7 changes: 4 additions & 3 deletions core/Processors/SinkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ internal interface ISinkProcessor
internal class SinkProcessor<K, V> : AbstractProcessor<K, V>, ISinkProcessor
{
private ITopicNameExtractor<K, V> topicNameExtractor;
private readonly IRecordTimestampExtractor<K, V> timestampExtractor;
private readonly Func<string, K, V, int> partitioner;

internal SinkProcessor(string name, ITopicNameExtractor<K, V> topicNameExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> partitioner = null)
internal SinkProcessor(string name, ITopicNameExtractor<K, V> topicNameExtractor, IRecordTimestampExtractor<K, V> timestampExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<string, K, V, int> partitioner = null)
: base(name, keySerdes, valueSerdes)
{
this.topicNameExtractor = topicNameExtractor;
this.timestampExtractor = timestampExtractor;
this.partitioner = partitioner;
}

Expand Down Expand Up @@ -55,8 +57,7 @@ public override void Process(K key, V value)
}

var topicName = topicNameExtractor.Extract(key, value, Context.RecordContext);

var timestamp = Context.Timestamp;
var timestamp = timestampExtractor.Extract(key, value, Context.RecordContext);
if (timestamp < 0)
{
throw new StreamsException($"Invalid (negative) timestamp of {timestamp} for output record <{key}:{value}>.");
Expand Down
74 changes: 68 additions & 6 deletions core/Stream/IKStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Streamiz.Kafka.Net.Processors.Public;
using Streamiz.Kafka.Net.Processors.Internal;

namespace Streamiz.Kafka.Net.Stream
{
Expand Down Expand Up @@ -160,8 +161,47 @@ public interface IKStream<K, V>
/// <param name="keySerdes">Key serializer</param>
/// <param name="valueSerdes">Value serializer</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(ITopicNameExtractor<K, V> topicExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);

void To(ITopicNameExtractor<K, V> topicExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
/// The topic names for each record to send to is dynamically determined based on the <code>Func&lt;K, V, IRecordContext, string&gt;</code>.
/// </summary>
/// <param name="topicExtractor">Extractor function to determine the name of the Kafka topic to write to for each record</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
/// The topic names for each record to send to is dynamically determined based on the <code>Func&lt;K, V, IRecordContext, string&gt;</code>.
/// </summary>
/// <param name="topicExtractor">Extractor function to determine the name of the Kafka topic to write to for each record</param>
/// <param name="keySerdes">Key serializer</param>
/// <param name="valueSerdes">Value serializer</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(Func<K, V, IRecordContext, string> topicExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, Func<K, V, IRecordContext, long> recordTimestampExtractor, string named = null);

/// <summary>
/// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
/// The topic names for each record to send to is dynamically determined based on the <see cref="ITopicNameExtractor&lt;K, V&gt;"/>}.
/// </summary>
/// <param name="topicExtractor">The extractor to determine the name of the Kafka topic to write to for each record</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(ITopicNameExtractor<K, V> topicExtractor, IRecordTimestampExtractor<K, V> recordTimestampExtractor, string named = null);

/// <summary>
/// Dynamically materialize this stream to a topic using serializers specified in the method parameters.
/// The topic names for each record to send to is dynamically determined based on the <see cref="ITopicNameExtractor&lt;K, V&gt;"/>}.
/// </summary>
/// <param name="topicExtractor">The extractor to determine the name of the Kafka topic to write to for each record</param>4
/// <param name="keySerdes">Key serializer</param>
/// <param name="valueSerdes">Value serializer</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To(ITopicNameExtractor<K, V> topicExtractor, ISerDes<K> keySerdes, ISerDes<V> valueSerdes, IRecordTimestampExtractor<K, V> recordTimestampExtractor, string named = null);

/// <summary>
/// Materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
Expand Down Expand Up @@ -192,12 +232,34 @@ public interface IKStream<K, V>
/// <typeparam name="VS">New type value serializer</typeparam>
/// <param name="topicExtractor">The extractor to determine the name of the Kafka topic to write to for each record</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(ITopicNameExtractor<K, V> topicExtractor, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

#endregion
void To<KS, VS>(ITopicNameExtractor<K, V> topicExtractor, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

/// <summary>
/// Dynamically materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
/// The topic names for each record to send to is dynamically determined based on the <code>Func&lt;K, V, IRecordContext, string&gt;</code>.
/// </summary>
/// <typeparam name="KS">New type key serializer</typeparam>
/// <typeparam name="VS">New type value serializer</typeparam>
/// <param name="topicExtractor">Extractor function to determine the name of the Kafka topic to write to for each record</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(Func<K, V, IRecordContext, string> topicExtractor, Func<K, V, IRecordContext, long> recordTimestampExtractor, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

/// <summary>
/// Dynamically materialize this stream to a topic using <typeparamref name="KS"/> and <typeparamref name="VS"/> serializers specified in the method parameters.
/// The topic names for each record to send to is dynamically determined based on the <see cref="ITopicNameExtractor&lt;K, V&gt;"/>}.
/// </summary>
/// <typeparam name="KS">New type key serializer</typeparam>
/// <typeparam name="VS">New type value serializer</typeparam>
/// <param name="topicExtractor">The extractor to determine the name of the Kafka topic to write to for each record</param>
/// <param name="recordTimestampExtractor">Extractor function to determine the timestamp of the record stored in the Kafka topic</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void To<KS, VS>(ITopicNameExtractor<K, V> topicExtractor, IRecordTimestampExtractor<K, V> recordTimestampExtractor, string named = null) where KS : ISerDes<K>, new() where VS : ISerDes<V>, new();

#endregion

#region FlatMap

/// <summary>
/// Transform each record of the input stream into zero or more records in the output stream (bot
/// can be altered arbitrarily).
Expand Down
3 changes: 3 additions & 0 deletions core/Stream/Internal/Graph/Nodes/AsyncNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, null);
builder.AddSinkOperator(
new StaticTopicNameExtractor<TK, TV>(RepartitionTopic),
new DefaultRecordTimestampExtractor<TK, TV>(),
SinkName,
Produced<TK, TV>.Create(KeySerdes, ValueSerdes),
ParentNodeNames());
Expand Down Expand Up @@ -70,6 +71,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, null);
builder.AddProcessor(ProcessorParameters.ProcessorName, ProcessorParameters.Processor, ParentNodeNames());
builder.AddSinkOperator(new StaticTopicNameExtractor<TK1, TV1>(RepartitionTopic),
new DefaultRecordTimestampExtractor<TK1, TV1>(),
SinkName,
Produced<TK1, TV1>.Create(KeySerdes, ValueSerdes),
ProcessorParameters.ProcessorName);
Expand Down Expand Up @@ -112,6 +114,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
{
builder.AddInternalTopic(RequestTopic, null);
builder.AddSinkOperator(new StaticTopicNameExtractor<TK, TV>(RequestTopic),
new DefaultRecordTimestampExtractor<TK, TV>(),
SinkName,
Produced<TK, TV>.Create(KeySerdes, ValueSerdes),
ParentNodeNames());
Expand Down
2 changes: 2 additions & 0 deletions core/Stream/Internal/Graph/Nodes/RepartitionNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
ParentNodeNames());
builder.AddSinkOperator(
new StaticTopicNameExtractor<K, V>(RepartitionTopic),
new DefaultRecordTimestampExtractor<K, V>(),
SinkName,
Produced<K, V>.Create(KeySerdes, ValueSerdes).WithPartitioner(StreamPartitioner),
ProcessorParameters.ProcessorName);
Expand All @@ -36,6 +37,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, NumberOfPartition);
builder.AddSinkOperator(
new StaticTopicNameExtractor<K, V>(RepartitionTopic),
new DefaultRecordTimestampExtractor<K, V>(),
SinkName,
Produced<K, V>.Create(KeySerdes, ValueSerdes).WithPartitioner(StreamPartitioner),
ParentNodeNames());
Expand Down
6 changes: 4 additions & 2 deletions core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ internal class StreamSinkNode<K, V> : StreamSinkNode
{
private readonly ITopicNameExtractor<K, V> topicNameExtractor;
private readonly Produced<K, V> produced;
private readonly IRecordTimestampExtractor<K, V> timestampExtractor;

public StreamSinkNode(ITopicNameExtractor<K, V> topicNameExtractor, string streamGraphNode, Produced<K, V> produced)
public StreamSinkNode(ITopicNameExtractor<K, V> topicNameExtractor, IRecordTimestampExtractor<K, V> timestampExtractor, string streamGraphNode, Produced<K, V> produced)
: base(streamGraphNode)
{
this.topicNameExtractor = topicNameExtractor;
this.timestampExtractor = timestampExtractor;
this.produced = produced;
}

public override void WriteToTopology(InternalTopologyBuilder builder)
{
builder.AddSinkOperator(topicNameExtractor, this.streamGraphNode, produced, ParentNodeNames());
builder.AddSinkOperator(topicNameExtractor, timestampExtractor, this.streamGraphNode, produced, ParentNodeNames());
}
}
}
Loading
Loading