Skip to content

Commit

Permalink
Merge branch 'ppilev-streambuilder-state-store' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Oct 30, 2023
2 parents 72b4735 + 5cab163 commit d09845a
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 32 deletions.
25 changes: 14 additions & 11 deletions core/Stream/IKStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1920,12 +1920,12 @@ void ForeachAsync(
Func<ExternalRecord<K, V>, ExternalContext, Task> asyncAction,
RetryPolicy retryPolicy = null,
RequestSerDes<K, V> requestSerDes = null,
string named = null);
string named = null);

#endregion


#region Process


/// <summary>
/// Process all records in this stream, one record at a time, by applying a processor (provided by the given
/// <see cref="ProcessorSupplier{K,V}"/>.
Expand Down Expand Up @@ -1963,12 +1963,13 @@ void ForeachAsync(
/// </summary>
/// <param name="processorSupplier">an instance of <see cref="ProcessorSupplier{K,V}"/> which contains the processor and a potential state store. Use <see cref="ProcessorBuilder"/> to build this supplier.</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null);

/// <param name="storeNames">The names of the state stores used by the processor.</param>
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames);

#endregion


#region Transform


/// <summary>
/// Transform each record of the input stream into zero or one record in the output stream (both key and value type
/// can be altered arbitrarily).
Expand Down Expand Up @@ -2009,11 +2010,12 @@ void ForeachAsync(
/// </summary>
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K1,V1}"/> which contains the transformer</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
/// <param name="storeNames">The names of the state stores used by the processor.</param>
/// <typeparam name="K1">the key type of the new stream</typeparam>
/// <typeparam name="V1">the value type of the new stream</typeparam>
/// <returns>a <see cref="IKStream{K1,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null);
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames);

/// <summary>
/// Transform each record of the input stream into zero or one record in the output stream (only value type, the original key will be through to the upstream processors ).
/// A <see cref="Transform{K,V1}"/> (provided by the given <see cref="TransformerSupplier{K,V,K,V1}"/>) is applied to each input record and
Expand Down Expand Up @@ -2053,9 +2055,10 @@ void ForeachAsync(
/// </summary>
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K,V1}"/> which contains the transformer</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
/// <param name="storeNames">The names of the state stores used by the processor.</param>
/// <typeparam name="V1">the value type of the new stream</typeparam>
/// <returns>a <see cref="IKStream{K,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null);
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames);

#endregion

Expand Down
20 changes: 20 additions & 0 deletions core/Stream/Internal/Graph/Nodes/StateStoreNode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;

namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes
{
internal class StateStoreNode : StreamGraphNode
{
private readonly IStoreBuilder storeBuilder;

public StateStoreNode(IStoreBuilder storeBuilder, string streamGraphNode) : base(streamGraphNode)
{
this.storeBuilder = storeBuilder;
}

public override void WriteToTopology(InternalTopologyBuilder builder)
{
builder.AddStateStore(storeBuilder);
}
}
}
13 changes: 7 additions & 6 deletions core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;

namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes
{
internal class StatefulProcessorNode<K, V> : ProcessorGraphNode<K, V>
{
private readonly string[] storeNames;
private readonly IStoreBuilder storeBuilder;

private readonly IStoreBuilder storeBuilder;

/// <summary>
/// Create a node representing a stateful processor,
/// where the store needs to be built and registered as part of building this node.
/// </summary>
/// <param name="nameNode"></param>
/// <param name="parameters"></param>
/// <param name="storeBuilder"></param>
public StatefulProcessorNode(string nameNode, ProcessorParameters<K, V> parameters, IStoreBuilder storeBuilder)
/// <param name="storeNames">The names of the state stores used by the processor.</param>

public StatefulProcessorNode(string nameNode, ProcessorParameters<K, V> parameters, IStoreBuilder storeBuilder, params string[] storeNames)
: base(nameNode, parameters)
{
storeNames = null;
this.storeNames = storeNames;
this.storeBuilder = storeBuilder;
}

Expand Down
16 changes: 13 additions & 3 deletions core/Stream/Internal/InternalStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,19 @@ internal void AddGraphNode(StreamGraphNode root, StreamGraphNode node)
logger.LogDebug("Adding node {Node} in root node {Root}", node, root);
root.AppendChild(node);
nodes.Add(node);
}


}


#endregion

#region Build Store
public void AddStateStore(IStoreBuilder storeBuilder)
{
var name = NewStoreName(string.Empty);

var node = new StateStoreNode(storeBuilder, name);
this.AddGraphNode(root, node);
}
#endregion
}
}
16 changes: 8 additions & 8 deletions core/Stream/Internal/KStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public IKStream<K, V> FilterNot(Func<K, V, bool> predicate, string named = null)

#region Process

public void Process(ProcessorSupplier<K, V> processorSupplier, string named = null)
public void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames)
{
string name = new Named(named).OrElseGenerateWithPrefix(builder, KStream.PROCESSOR_NAME);
ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(
new KStreamProcessorSupplier<K, V>(processorSupplier), name);
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, processorSupplier.StoreBuilder);
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, processorSupplier.StoreBuilder, storeNames);

builder.AddGraphNode(Node, processorNode);
}
Expand All @@ -118,19 +118,19 @@ public void Process(ProcessorSupplier<K, V> processorSupplier, string named = nu

#region Transform

public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null)
=> DoTransform(transformerSupplier, true, named);
public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames)
=> DoTransform(transformerSupplier, true, named, storeNames);

public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null)
=> DoTransform(transformerSupplier, false, named);
public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames)
=> DoTransform(transformerSupplier, false, named, storeNames);

private IKStream<K1, V1> DoTransform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, bool changeKey, string named =
null)
null, params string[] storeNames)
{
string name = new Named(named).OrElseGenerateWithPrefix(builder, changeKey ? KStream.TRANSFORM_NAME : KStream.TRANSFORMVALUES_NAME );
ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(
new KStreamTransformerSupplier<K, V, K1, V1>(transformerSupplier, changeKey), name);
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder);
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder, storeNames);

builder.AddGraphNode(Node, processorNode);

Expand Down
24 changes: 20 additions & 4 deletions core/StreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -730,13 +730,29 @@ public IGlobalKTable<K, V> GlobalTable<K, V, KS, VS>(string topic, Materialized<
public IGlobalKTable<K, V> GlobalTable<K, V, KS, VS>(string topic, Materialized<K, V, IKeyValueStore<Bytes, byte[]>> materialized, string named, ITimestampExtractor extractor)
where KS : ISerDes<K>, new()
where VS : ISerDes<V>, new()
=> GlobalTable(topic, new KS(), new VS(), materialized, named, extractor);


=> GlobalTable(topic, new KS(), new VS(), materialized, named, extractor);


#endregion


#endregion

#region State Store

/// <summary>
/// Adds a state store to the underlying <see cref="Topology"/>.
/// It is required to connect state stores to <see cref="Streamiz.Kafka.Net.Processors.Public.IProcessor{K, V}"/>
/// or <see cref="Streamiz.Kafka.Net.Processors.Public.ITransformer{K, V, K1, V1}"/>
/// before they can be used.
/// </summary>
/// <param name="storeBuilder">The builder used to obtain the <see cref="IStateStore"/> instance.</param>
public void AddStateStore(IStoreBuilder storeBuilder)
{
internalStreamBuilder.AddStateStore(storeBuilder);
}

#endregion

/// <summary>
/// Returns the <see cref="Topology"/> that represents the specified processing logic.
/// Note that using this method means no optimizations are performed.
Expand Down
90 changes: 90 additions & 0 deletions test/Streamiz.Kafka.Net.Tests/Public/AddStoreManuallyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System.Collections.Generic;
using System.Linq;
using NUnit.Framework;
using Streamiz.Kafka.Net.Mock;
using Streamiz.Kafka.Net.Processors.Public;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.Stream;

namespace Streamiz.Kafka.Net.Tests.Public
{
public class AddStoreManuallyTests
{
private class TTransformer : ITransformer<string, string, string, string>
{
private IKeyValueStore<string,string> stateStore;

public void Init(ProcessorContext<string, string> context)
{
stateStore = (IKeyValueStore<string, string>)context.GetStateStore("output-store");
}

public Record<string, string> Process(Record<string, string> record)
{
// De-duplication process
if (stateStore.Get(record.Key) == null)
{
stateStore.Put(record.Key, record.Value);
return record;
}

return null;
}

public void Close()
{

}
}

[Test]
public void AddOneStoreManually()
{
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-manually-state-store";

var builder = new StreamBuilder();
var storeBuilder = State.Stores.KeyValueStoreBuilder(
State.Stores.InMemoryKeyValueStore("output-store"),
new StringSerDes(),
new StringSerDes())
.WithLoggingDisabled();

builder.AddStateStore(storeBuilder);

builder.Stream<string, string>("input-topic")
.Transform(
TransformerBuilder
.New<string, string, string, string>()
.Transformer<TTransformer>()
.Build(),
storeNames: "output-store")
.To("output-topic");

var s = builder.Build().Describe().ToString();

Topology t = builder.Build();

using (var driver = new TopologyTestDriver(t, config))
{
var inputTopic = driver.CreateInputTopic<string, string>("input-topic");
var outputTopic = driver.CreateOuputTopic<string, string>("output-topic");

inputTopic.PipeInput("key1", "value1");
inputTopic.PipeInput("key2", "value1");
inputTopic.PipeInput("key1", "value2");
inputTopic.PipeInput("key3", "value1");

var expected = new List<(string, string)>();
expected.Add(("key1", "value1"));
expected.Add(("key2", "value1"));
expected.Add(("key3", "value1"));

var list = outputTopic.ReadKeyValueList().Select(r => (r.Message.Key, r.Message.Value)).ToList();

Assert.AreEqual(expected, list);
}
}
}
}

0 comments on commit d09845a

Please sign in to comment.