diff --git a/core/Processors/IRecordTimestampExtractor.cs b/core/Processors/IRecordTimestampExtractor.cs
new file mode 100644
index 00000000..1cd44dae
--- /dev/null
+++ b/core/Processors/IRecordTimestampExtractor.cs
@@ -0,0 +1,20 @@
+namespace Streamiz.Kafka.Net.Processors
+{
+ ///
+ /// An interface that allows to dynamically determine the timestamp of the record stored in the Kafka topic.
+ ///
+ /// Key type
+ /// Value type
+ public interface IRecordTimestampExtractor
+ {
+ ///
+ /// Extracts the timestamp of the record stored in the Kafka topic.
+ ///
+ /// the record key
+ /// the record value
+ /// current context metadata of the record
+ /// the timestamp of the record
+ long Extract(K key, V value, IRecordContext recordContext);
+
+ }
+}
diff --git a/core/Processors/Internal/DefaultRecordTimestampExtractor.cs b/core/Processors/Internal/DefaultRecordTimestampExtractor.cs
new file mode 100644
index 00000000..942b1df3
--- /dev/null
+++ b/core/Processors/Internal/DefaultRecordTimestampExtractor.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace Streamiz.Kafka.Net.Processors.Internal
+{
+ internal class DefaultRecordTimestampExtractor : IRecordTimestampExtractor
+ {
+ private readonly Func timestampExtractor;
+
+ public DefaultRecordTimestampExtractor()
+ {
+ this.timestampExtractor = (k, v, ctx) => ctx.Timestamp;
+ }
+
+ public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value, recordContext);
+ }
+}
diff --git a/core/Processors/Internal/InternalTopologyBuilder.cs b/core/Processors/Internal/InternalTopologyBuilder.cs
index b779204a..e426295a 100644
--- a/core/Processors/Internal/InternalTopologyBuilder.cs
+++ b/core/Processors/Internal/InternalTopologyBuilder.cs
@@ -153,7 +153,7 @@ internal void AddSourceOperator(string topic, string nameNode, ConsumedInt
nodeGroups = null;
}
- internal void AddSinkOperator(ITopicNameExtractor topicNameExtractor, string nameNode, Produced produced, params string[] previousProcessorNames)
+ internal void AddSinkOperator(ITopicNameExtractor topicNameExtractor, IRecordTimestampExtractor timestampExtractor, string nameNode, Produced produced, params string[] previousProcessorNames)
{
if (nodeFactories.ContainsKey(nameNode))
{
@@ -161,7 +161,7 @@ internal void AddSinkOperator(ITopicNameExtractor topicNameExtractor
}
nodeFactories.Add(nameNode,
- new SinkNodeFactory(nameNode, previousProcessorNames, topicNameExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
+ new SinkNodeFactory(nameNode, previousProcessorNames, topicNameExtractor, timestampExtractor, produced.KeySerdes, produced.ValueSerdes, produced.Partitioner));
nodeGrouper.Add(nameNode);
nodeGrouper.Unite(nameNode, previousProcessorNames);
nodeGroups = null;
diff --git a/core/Processors/Internal/NodeFactory.cs b/core/Processors/Internal/NodeFactory.cs
index f1461736..3527a292 100644
--- a/core/Processors/Internal/NodeFactory.cs
+++ b/core/Processors/Internal/NodeFactory.cs
@@ -76,7 +76,8 @@ internal interface ISinkNodeFactory : INodeFactory
internal class SinkNodeFactory : NodeFactory, ISinkNodeFactory
{
- public ITopicNameExtractor Extractor { get; }
+ public ITopicNameExtractor TopicExtractor { get; }
+ public IRecordTimestampExtractor TimestampExtractor { get; }
public ISerDes KeySerdes { get; }
public ISerDes ValueSerdes { get; }
public Func ProducedPartitioner { get; }
@@ -85,29 +86,31 @@ public string Topic
{
get
{
- return Extractor is StaticTopicNameExtractor ?
- ((StaticTopicNameExtractor)Extractor).TopicName :
+ return TopicExtractor is StaticTopicNameExtractor ?
+ ((StaticTopicNameExtractor)TopicExtractor).TopicName :
null;
}
}
- public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor topicExtractor,
+ public SinkNodeFactory(string name, string[] previous, ITopicNameExtractor topicExtractor,
+ IRecordTimestampExtractor timestampExtractor,
ISerDes keySerdes, ISerDes valueSerdes, Func producedPartitioner)
: base(name, previous)
{
- Extractor = topicExtractor;
+ TopicExtractor = topicExtractor;
+ TimestampExtractor = timestampExtractor;
KeySerdes = keySerdes;
ValueSerdes = valueSerdes;
ProducedPartitioner = producedPartitioner;
}
public override IProcessor Build()
- => new SinkProcessor(Name, Extractor, KeySerdes, ValueSerdes, ProducedPartitioner);
+ => new SinkProcessor(Name, TopicExtractor, TimestampExtractor, KeySerdes, ValueSerdes, ProducedPartitioner);
public override NodeDescription Describe()
- => Extractor is StaticTopicNameExtractor ?
- new SinkNodeDescription(Name, ((StaticTopicNameExtractor)Extractor).TopicName) :
- new SinkNodeDescription(Name, Extractor?.GetType());
+ => TopicExtractor is StaticTopicNameExtractor ?
+ new SinkNodeDescription(Name, ((StaticTopicNameExtractor)TopicExtractor).TopicName) :
+ new SinkNodeDescription(Name, TopicExtractor?.GetType());
}
#endregion
diff --git a/core/Processors/Internal/WrapperRecordTimestampExtractor.cs b/core/Processors/Internal/WrapperRecordTimestampExtractor.cs
new file mode 100644
index 00000000..39c969c4
--- /dev/null
+++ b/core/Processors/Internal/WrapperRecordTimestampExtractor.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace Streamiz.Kafka.Net.Processors.Internal
+{
+ internal class WrapperRecordTimestampExtractor : IRecordTimestampExtractor
+ {
+ private readonly Func timestampExtractor;
+
+ public WrapperRecordTimestampExtractor(Func timestampExtractor)
+ {
+ this.timestampExtractor = timestampExtractor;
+ }
+
+ public long Extract(K key, V value, IRecordContext recordContext) => timestampExtractor(key, value ,recordContext);
+ }
+}
diff --git a/core/Processors/SinkProcessor.cs b/core/Processors/SinkProcessor.cs
index a7ca8077..63c4a128 100644
--- a/core/Processors/SinkProcessor.cs
+++ b/core/Processors/SinkProcessor.cs
@@ -14,12 +14,14 @@ internal interface ISinkProcessor
internal class SinkProcessor : AbstractProcessor, ISinkProcessor
{
private ITopicNameExtractor topicNameExtractor;
+ private readonly IRecordTimestampExtractor timestampExtractor;
private readonly Func partitioner;
- internal SinkProcessor(string name, ITopicNameExtractor topicNameExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func partitioner = null)
+ internal SinkProcessor(string name, ITopicNameExtractor topicNameExtractor, IRecordTimestampExtractor timestampExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func partitioner = null)
: base(name, keySerdes, valueSerdes)
{
this.topicNameExtractor = topicNameExtractor;
+ this.timestampExtractor = timestampExtractor;
this.partitioner = partitioner;
}
@@ -55,8 +57,7 @@ public override void Process(K key, V value)
}
var topicName = topicNameExtractor.Extract(key, value, Context.RecordContext);
-
- var timestamp = Context.Timestamp;
+ var timestamp = timestampExtractor.Extract(key, value, Context.RecordContext);
if (timestamp < 0)
{
throw new StreamsException($"Invalid (negative) timestamp of {timestamp} for output record <{key}:{value}>.");
diff --git a/core/Stream/IKStream.cs b/core/Stream/IKStream.cs
index 52770626..f2972074 100644
--- a/core/Stream/IKStream.cs
+++ b/core/Stream/IKStream.cs
@@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Streamiz.Kafka.Net.Processors.Public;
+using Streamiz.Kafka.Net.Processors.Internal;
namespace Streamiz.Kafka.Net.Stream
{
@@ -160,8 +161,47 @@ public interface IKStream
/// Key serializer
/// Value serializer
/// A config used to name the processor in the topology. Default : null
- void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null);
-
+ void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null);
+
+ ///
+ /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
+ /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>.
+ ///
+ /// Extractor function to determine the name of the Kafka topic to write to for each record
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(Func topicExtractor, Func recordTimestampExtractor, string named = null);
+
+ ///
+ /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
+ /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>.
+ ///
+ /// Extractor function to determine the name of the Kafka topic to write to for each record
+ /// Key serializer
+ /// Value serializer
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func recordTimestampExtractor, string named = null);
+
+ ///
+ /// Dynamically materialize this stream to topics using default serializers specified in the config and producer's.
+ /// The topic names for each record to send to is dynamically determined based on the }.
+ ///
+ /// The extractor to determine the name of the Kafka topic to write to for each record
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null);
+
+ ///
+ /// Dynamically materialize this stream to a topic using serializers specified in the method parameters.
+ /// The topic names for each record to send to is dynamically determined based on the }.
+ ///
+ /// The extractor to determine the name of the Kafka topic to write to for each record4
+ /// Key serializer
+ /// Value serializer
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, string named = null);
///
/// Materialize this stream to a topic using and serializers specified in the method parameters.
@@ -192,12 +232,34 @@ public interface IKStream
/// New type value serializer
/// The extractor to determine the name of the Kafka topic to write to for each record
/// A config used to name the processor in the topology. Default : null
- void To(ITopicNameExtractor topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new();
-
- #endregion
+ void To(ITopicNameExtractor topicExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new();
+
+ ///
+ /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters.
+ /// The topic names for each record to send to is dynamically determined based on the Func<K, V, IRecordContext, string>.
+ ///
+ /// New type key serializer
+ /// New type value serializer
+ /// Extractor function to determine the name of the Kafka topic to write to for each record
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new();
+
+ ///
+ /// Dynamically materialize this stream to a topic using and serializers specified in the method parameters.
+ /// The topic names for each record to send to is dynamically determined based on the }.
+ ///
+ /// New type key serializer
+ /// New type value serializer
+ /// The extractor to determine the name of the Kafka topic to write to for each record
+ /// Extractor function to determine the timestamp of the record stored in the Kafka topic
+ /// A config used to name the processor in the topology. Default : null
+ void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) where KS : ISerDes, new() where VS : ISerDes, new();
+ #endregion
+
#region FlatMap
-
+
///
/// Transform each record of the input stream into zero or more records in the output stream (bot
/// can be altered arbitrarily).
diff --git a/core/Stream/Internal/Graph/Nodes/AsyncNode.cs b/core/Stream/Internal/Graph/Nodes/AsyncNode.cs
index ce894a84..b58e5a13 100644
--- a/core/Stream/Internal/Graph/Nodes/AsyncNode.cs
+++ b/core/Stream/Internal/Graph/Nodes/AsyncNode.cs
@@ -34,6 +34,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, null);
builder.AddSinkOperator(
new StaticTopicNameExtractor(RepartitionTopic),
+ new DefaultRecordTimestampExtractor(),
SinkName,
Produced.Create(KeySerdes, ValueSerdes),
ParentNodeNames());
@@ -70,6 +71,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, null);
builder.AddProcessor(ProcessorParameters.ProcessorName, ProcessorParameters.Processor, ParentNodeNames());
builder.AddSinkOperator(new StaticTopicNameExtractor(RepartitionTopic),
+ new DefaultRecordTimestampExtractor(),
SinkName,
Produced.Create(KeySerdes, ValueSerdes),
ProcessorParameters.ProcessorName);
@@ -112,6 +114,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
{
builder.AddInternalTopic(RequestTopic, null);
builder.AddSinkOperator(new StaticTopicNameExtractor(RequestTopic),
+ new DefaultRecordTimestampExtractor(),
SinkName,
Produced.Create(KeySerdes, ValueSerdes),
ParentNodeNames());
diff --git a/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs b/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs
index 8c87d51d..ce4f14e9 100644
--- a/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs
+++ b/core/Stream/Internal/Graph/Nodes/RepartitionNode.cs
@@ -23,6 +23,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
ParentNodeNames());
builder.AddSinkOperator(
new StaticTopicNameExtractor(RepartitionTopic),
+ new DefaultRecordTimestampExtractor(),
SinkName,
Produced.Create(KeySerdes, ValueSerdes).WithPartitioner(StreamPartitioner),
ProcessorParameters.ProcessorName);
@@ -36,6 +37,7 @@ public override void WriteToTopology(InternalTopologyBuilder builder)
builder.AddInternalTopic(RepartitionTopic, NumberOfPartition);
builder.AddSinkOperator(
new StaticTopicNameExtractor(RepartitionTopic),
+ new DefaultRecordTimestampExtractor(),
SinkName,
Produced.Create(KeySerdes, ValueSerdes).WithPartitioner(StreamPartitioner),
ParentNodeNames());
diff --git a/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs b/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs
index 743892a9..2ed39ac2 100644
--- a/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs
+++ b/core/Stream/Internal/Graph/Nodes/StreamSinkNode.cs
@@ -15,17 +15,19 @@ internal class StreamSinkNode : StreamSinkNode
{
private readonly ITopicNameExtractor topicNameExtractor;
private readonly Produced produced;
+ private readonly IRecordTimestampExtractor timestampExtractor;
- public StreamSinkNode(ITopicNameExtractor topicNameExtractor, string streamGraphNode, Produced produced)
+ public StreamSinkNode(ITopicNameExtractor topicNameExtractor, IRecordTimestampExtractor timestampExtractor, string streamGraphNode, Produced produced)
: base(streamGraphNode)
{
this.topicNameExtractor = topicNameExtractor;
+ this.timestampExtractor = timestampExtractor;
this.produced = produced;
}
public override void WriteToTopology(InternalTopologyBuilder builder)
{
- builder.AddSinkOperator(topicNameExtractor, this.streamGraphNode, produced, ParentNodeNames());
+ builder.AddSinkOperator(topicNameExtractor, timestampExtractor, this.streamGraphNode, produced, ParentNodeNames());
}
}
}
diff --git a/core/Stream/Internal/KStream.cs b/core/Stream/Internal/KStream.cs
index 19f7642d..b3bb2076 100644
--- a/core/Stream/Internal/KStream.cs
+++ b/core/Stream/Internal/KStream.cs
@@ -206,15 +206,27 @@ public void To(string topicName, ISerDes keySerdes, ISerDes valueSerdes, s
To(new StaticTopicNameExtractor(topicName), keySerdes, valueSerdes, named);
}
- public void To(ITopicNameExtractor topicExtractor, string named = null) => DoTo(topicExtractor, Produced.Create(KeySerdes, ValueSerdes).WithName(named));
+ public void To(ITopicNameExtractor topicExtractor, string named = null) => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create(KeySerdes, ValueSerdes).WithName(named));
public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null)
- => DoTo(topicExtractor, Produced.Create(keySerdes, valueSerdes).WithName(named));
+ => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create(keySerdes, valueSerdes).WithName(named));
public void To(Func topicExtractor, string named = null) => To(new WrapperTopicNameExtractor(topicExtractor), named);
public void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, string named = null)
- => To(new WrapperTopicNameExtractor(topicExtractor), keySerdes, valueSerdes, named);
+ => To(new WrapperTopicNameExtractor(topicExtractor), keySerdes, valueSerdes, named);
+
+ public void To(Func topicExtractor, Func recordTimestampExtractor, string named = null) =>
+ DoTo(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), Produced.Create(KeySerdes, ValueSerdes).WithName(named));
+
+ public void To(Func topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, Func recordTimestampExtractor, string named = null) =>
+ DoTo(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), Produced.Create(keySerdes, valueSerdes).WithName(named));
+
+ public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null) =>
+ DoTo(topicExtractor, recordTimestampExtractor, Produced.Create(KeySerdes, ValueSerdes).WithName(named));
+
+ public void To(ITopicNameExtractor topicExtractor, ISerDes keySerdes, ISerDes valueSerdes, IRecordTimestampExtractor recordTimestampExtractor, string named = null) =>
+ DoTo(topicExtractor, recordTimestampExtractor, Produced.Create(keySerdes, valueSerdes).WithName(named));
public void To(Func topicExtractor, string named = null)
where KS : ISerDes, new()
@@ -229,7 +241,17 @@ public void To(string topicName, string named = null)
public void To(ITopicNameExtractor topicExtractor, string named = null)
where KS : ISerDes, new()
where VS : ISerDes, new()
- => DoTo(topicExtractor, Produced.Create().WithName(named));
+ => DoTo(topicExtractor, new DefaultRecordTimestampExtractor(), Produced.Create().WithName(named));
+
+ public void To(Func topicExtractor, Func recordTimestampExtractor, string named = null)
+ where KS : ISerDes, new()
+ where VS : ISerDes, new()
+ => To(new WrapperTopicNameExtractor(topicExtractor), new WrapperRecordTimestampExtractor(recordTimestampExtractor), named);
+
+ public void To(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor recordTimestampExtractor, string named = null)
+ where KS : ISerDes, new()
+ where VS : ISerDes, new()
+ => DoTo(topicExtractor, recordTimestampExtractor, Produced.Create().WithName(named));
#endregion
@@ -1036,13 +1058,13 @@ private IKStream DoFilter(Func predicate, string named, bool n
builder.AddGraphNode(Node, filterProcessorNode);
return new KStream(name, KeySerdes, ValueSerdes, SetSourceNodes, RepartitionRequired, filterProcessorNode, builder);
- }
-
- private void DoTo(ITopicNameExtractor topicExtractor, Produced produced)
+ }
+
+ private void DoTo(ITopicNameExtractor topicExtractor, IRecordTimestampExtractor timestampExtractor, Produced produced)
{
string name = new Named(produced.Named).OrElseGenerateWithPrefix(builder, KStream.SINK_NAME);
- StreamSinkNode sinkNode = new StreamSinkNode(topicExtractor, name, produced);
+ StreamSinkNode sinkNode = new StreamSinkNode(topicExtractor, timestampExtractor, name, produced);
builder.AddGraphNode(Node, sinkNode);
}
@@ -1320,8 +1342,8 @@ string asyncProcessorName
responseSinkProcessorName,
responseSourceProcessorName,
asyncProcessorName);
- }
-
+ }
+
#endregion
}
}
\ No newline at end of file
diff --git a/test/Streamiz.Kafka.Net.Tests/Private/StreamGraphNodeTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/StreamGraphNodeTests.cs
index e099bdab..4a8dc36d 100644
--- a/test/Streamiz.Kafka.Net.Tests/Private/StreamGraphNodeTests.cs
+++ b/test/Streamiz.Kafka.Net.Tests/Private/StreamGraphNodeTests.cs
@@ -77,7 +77,9 @@ public void WriteTopologyTest()
nodes.Add(filter);
var to = new StreamSinkNode(
- new StaticTopicNameExtractor("topic2"), "to-03",
+ new StaticTopicNameExtractor("topic2"),
+ new DefaultRecordTimestampExtractor(),
+ "to-03",
new Stream.Internal.Produced(
new StringSerDes(),
new StringSerDes())
diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRecordTimestampExtractorTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRecordTimestampExtractorTests.cs
new file mode 100644
index 00000000..2c4cbed4
--- /dev/null
+++ b/test/Streamiz.Kafka.Net.Tests/Processors/KStreamRecordTimestampExtractorTests.cs
@@ -0,0 +1,120 @@
+using Confluent.Kafka;
+using NUnit.Framework;
+using Streamiz.Kafka.Net.Mock;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.Stream;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Streamiz.Kafka.Net.Tests.Processors
+{
+ public class KStreamRecordTimestampExtractorTests
+ {
+ private class MyTimestampExtractor : ITimestampExtractor
+ {
+ public long Extract(ConsumeResult