diff --git a/core/Stream/IKStream.cs b/core/Stream/IKStream.cs index 0819f052..52770626 100644 --- a/core/Stream/IKStream.cs +++ b/core/Stream/IKStream.cs @@ -1920,12 +1920,12 @@ void ForeachAsync( Func, ExternalContext, Task> asyncAction, RetryPolicy retryPolicy = null, RequestSerDes requestSerDes = null, - string named = null); - + string named = null); + #endregion - + #region Process - + /// /// Process all records in this stream, one record at a time, by applying a processor (provided by the given /// . @@ -1963,12 +1963,13 @@ void ForeachAsync( /// /// an instance of which contains the processor and a potential state store. Use to build this supplier. /// A config used to name the processor in the topology. Default : null - void Process(ProcessorSupplier processorSupplier, string named = null); - + /// The names of the state stores used by the processor. + void Process(ProcessorSupplier processorSupplier, string named = null, params string[] storeNames); + #endregion - + #region Transform - + /// /// Transform each record of the input stream into zero or one record in the output stream (both key and value type /// can be altered arbitrarily). @@ -2009,11 +2010,12 @@ void ForeachAsync( /// /// an instance of which contains the transformer /// A config used to name the processor in the topology. Default : null + /// The names of the state stores used by the processor. /// the key type of the new stream /// the value type of the new stream /// a that contains more or less records with new key and value (possibly of different type) - IKStream Transform(TransformerSupplier transformerSupplier, string named = null); - + IKStream Transform(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); + /// /// Transform each record of the input stream into zero or one record in the output stream (only value type, the original key will be through to the upstream processors ). /// A (provided by the given ) is applied to each input record and @@ -2053,9 +2055,10 @@ void ForeachAsync( /// /// an instance of which contains the transformer /// A config used to name the processor in the topology. Default : null + /// The names of the state stores used by the processor. /// the value type of the new stream /// a that contains more or less records with new key and value (possibly of different type) - IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null); + IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames); #endregion diff --git a/core/Stream/Internal/Graph/Nodes/StateStoreNode.cs b/core/Stream/Internal/Graph/Nodes/StateStoreNode.cs new file mode 100644 index 00000000..82221a6d --- /dev/null +++ b/core/Stream/Internal/Graph/Nodes/StateStoreNode.cs @@ -0,0 +1,20 @@ +using Streamiz.Kafka.Net.Processors.Internal; +using Streamiz.Kafka.Net.State; + +namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes +{ + internal class StateStoreNode : StreamGraphNode + { + private readonly IStoreBuilder storeBuilder; + + public StateStoreNode(IStoreBuilder storeBuilder, string streamGraphNode) : base(streamGraphNode) + { + this.storeBuilder = storeBuilder; + } + + public override void WriteToTopology(InternalTopologyBuilder builder) + { + builder.AddStateStore(storeBuilder); + } + } +} diff --git a/core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs b/core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs index eff835b4..e4e41e70 100644 --- a/core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs +++ b/core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs @@ -1,5 +1,4 @@ -using Streamiz.Kafka.Net.Processors; -using Streamiz.Kafka.Net.Processors.Internal; +using Streamiz.Kafka.Net.Processors.Internal; using Streamiz.Kafka.Net.State; namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes @@ -7,8 +6,8 @@ namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes internal class StatefulProcessorNode : ProcessorGraphNode { private readonly string[] storeNames; - private readonly IStoreBuilder storeBuilder; - + private readonly IStoreBuilder storeBuilder; + /// /// Create a node representing a stateful processor, /// where the store needs to be built and registered as part of building this node. @@ -16,10 +15,12 @@ internal class StatefulProcessorNode : ProcessorGraphNode /// /// /// - public StatefulProcessorNode(string nameNode, ProcessorParameters parameters, IStoreBuilder storeBuilder) + /// The names of the state stores used by the processor. + + public StatefulProcessorNode(string nameNode, ProcessorParameters parameters, IStoreBuilder storeBuilder, params string[] storeNames) : base(nameNode, parameters) { - storeNames = null; + this.storeNames = storeNames; this.storeBuilder = storeBuilder; } diff --git a/core/Stream/Internal/InternalStreamBuilder.cs b/core/Stream/Internal/InternalStreamBuilder.cs index 8ab13041..31c8d24a 100644 --- a/core/Stream/Internal/InternalStreamBuilder.cs +++ b/core/Stream/Internal/InternalStreamBuilder.cs @@ -137,9 +137,19 @@ internal void AddGraphNode(StreamGraphNode root, StreamGraphNode node) logger.LogDebug("Adding node {Node} in root node {Root}", node, root); root.AppendChild(node); nodes.Add(node); - } - - + } + + #endregion + + #region Build Store + public void AddStateStore(IStoreBuilder storeBuilder) + { + var name = NewStoreName(string.Empty); + + var node = new StateStoreNode(storeBuilder, name); + this.AddGraphNode(root, node); + } + #endregion } } \ No newline at end of file diff --git a/core/Stream/Internal/KStream.cs b/core/Stream/Internal/KStream.cs index 3a1bf186..19f7642d 100644 --- a/core/Stream/Internal/KStream.cs +++ b/core/Stream/Internal/KStream.cs @@ -104,12 +104,12 @@ public IKStream FilterNot(Func predicate, string named = null) #region Process - public void Process(ProcessorSupplier processorSupplier, string named = null) + public void Process(ProcessorSupplier processorSupplier, string named = null, params string[] storeNames) { string name = new Named(named).OrElseGenerateWithPrefix(builder, KStream.PROCESSOR_NAME); ProcessorParameters processorParameters = new ProcessorParameters( new KStreamProcessorSupplier(processorSupplier), name); - StatefulProcessorNode processorNode = new StatefulProcessorNode(name, processorParameters, processorSupplier.StoreBuilder); + StatefulProcessorNode processorNode = new StatefulProcessorNode(name, processorParameters, processorSupplier.StoreBuilder, storeNames); builder.AddGraphNode(Node, processorNode); } @@ -118,19 +118,19 @@ public void Process(ProcessorSupplier processorSupplier, string named = nu #region Transform - public IKStream Transform(TransformerSupplier transformerSupplier, string named = null) - => DoTransform(transformerSupplier, true, named); + public IKStream Transform(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames) + => DoTransform(transformerSupplier, true, named, storeNames); - public IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null) - => DoTransform(transformerSupplier, false, named); + public IKStream TransformValues(TransformerSupplier transformerSupplier, string named = null, params string[] storeNames) + => DoTransform(transformerSupplier, false, named, storeNames); private IKStream DoTransform(TransformerSupplier transformerSupplier, bool changeKey, string named = - null) + null, params string[] storeNames) { string name = new Named(named).OrElseGenerateWithPrefix(builder, changeKey ? KStream.TRANSFORM_NAME : KStream.TRANSFORMVALUES_NAME ); ProcessorParameters processorParameters = new ProcessorParameters( new KStreamTransformerSupplier(transformerSupplier, changeKey), name); - StatefulProcessorNode processorNode = new StatefulProcessorNode(name, processorParameters, transformerSupplier.StoreBuilder); + StatefulProcessorNode processorNode = new StatefulProcessorNode(name, processorParameters, transformerSupplier.StoreBuilder, storeNames); builder.AddGraphNode(Node, processorNode); diff --git a/core/StreamBuilder.cs b/core/StreamBuilder.cs index 1358406f..de129ccc 100644 --- a/core/StreamBuilder.cs +++ b/core/StreamBuilder.cs @@ -730,13 +730,29 @@ public IGlobalKTable GlobalTable(string topic, Materialized< public IGlobalKTable GlobalTable(string topic, Materialized> materialized, string named, ITimestampExtractor extractor) where KS : ISerDes, new() where VS : ISerDes, new() - => GlobalTable(topic, new KS(), new VS(), materialized, named, extractor); - - + => GlobalTable(topic, new KS(), new VS(), materialized, named, extractor); + + #endregion - + #endregion + + #region State Store + + /// + /// Adds a state store to the underlying . + /// It is required to connect state stores to + /// or + /// before they can be used. + /// + /// The builder used to obtain the instance. + public void AddStateStore(IStoreBuilder storeBuilder) + { + internalStreamBuilder.AddStateStore(storeBuilder); + } + #endregion + /// /// Returns the that represents the specified processing logic. /// Note that using this method means no optimizations are performed. diff --git a/test/Streamiz.Kafka.Net.Tests/Public/AddStoreManuallyTests.cs b/test/Streamiz.Kafka.Net.Tests/Public/AddStoreManuallyTests.cs new file mode 100644 index 00000000..83749f9c --- /dev/null +++ b/test/Streamiz.Kafka.Net.Tests/Public/AddStoreManuallyTests.cs @@ -0,0 +1,90 @@ +using System.Collections.Generic; +using System.Linq; +using NUnit.Framework; +using Streamiz.Kafka.Net.Mock; +using Streamiz.Kafka.Net.Processors.Public; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.Stream; + +namespace Streamiz.Kafka.Net.Tests.Public +{ + public class AddStoreManuallyTests + { + private class TTransformer : ITransformer + { + private IKeyValueStore stateStore; + + public void Init(ProcessorContext context) + { + stateStore = (IKeyValueStore)context.GetStateStore("output-store"); + } + + public Record Process(Record record) + { + // De-duplication process + if (stateStore.Get(record.Key) == null) + { + stateStore.Put(record.Key, record.Value); + return record; + } + + return null; + } + + public void Close() + { + + } + } + + [Test] + public void AddOneStoreManually() + { + var config = new StreamConfig(); + config.ApplicationId = "test-manually-state-store"; + + var builder = new StreamBuilder(); + var storeBuilder = State.Stores.KeyValueStoreBuilder( + State.Stores.InMemoryKeyValueStore("output-store"), + new StringSerDes(), + new StringSerDes()) + .WithLoggingDisabled(); + + builder.AddStateStore(storeBuilder); + + builder.Stream("input-topic") + .Transform( + TransformerBuilder + .New() + .Transformer() + .Build(), + storeNames: "output-store") + .To("output-topic"); + + var s = builder.Build().Describe().ToString(); + + Topology t = builder.Build(); + + using (var driver = new TopologyTestDriver(t, config)) + { + var inputTopic = driver.CreateInputTopic("input-topic"); + var outputTopic = driver.CreateOuputTopic("output-topic"); + + inputTopic.PipeInput("key1", "value1"); + inputTopic.PipeInput("key2", "value1"); + inputTopic.PipeInput("key1", "value2"); + inputTopic.PipeInput("key3", "value1"); + + var expected = new List<(string, string)>(); + expected.Add(("key1", "value1")); + expected.Add(("key2", "value1")); + expected.Add(("key3", "value1")); + + var list = outputTopic.ReadKeyValueList().Select(r => (r.Message.Key, r.Message.Value)).ToList(); + + Assert.AreEqual(expected, list); + } + } + } +} \ No newline at end of file