diff --git a/core/Metrics/Internal/ProcessorNodeMetrics.cs b/core/Metrics/Internal/ProcessorNodeMetrics.cs index f97fe5c6..6f000846 100644 --- a/core/Metrics/Internal/ProcessorNodeMetrics.cs +++ b/core/Metrics/Internal/ProcessorNodeMetrics.cs @@ -28,7 +28,6 @@ internal class ProcessorNodeMetrics internal static readonly string RETRY_AVG_DESCRIPTION = StreamMetricsRegistry.RATE_DESCRIPTION_PREFIX + RETRY_DESCRIPTION; internal static readonly string RETRY_MAX_DESCRIPTION = StreamMetricsRegistry.TOTAL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX; - // NOT USE FOR MOMENT public static Sensor SuppressionEmitSensor(string threadId, TaskId taskId, string processorNodeId, diff --git a/core/Processors/KTableSuppressProcessor.cs b/core/Processors/KTableSuppressProcessor.cs new file mode 100644 index 00000000..8b210113 --- /dev/null +++ b/core/Processors/KTableSuppressProcessor.cs @@ -0,0 +1,107 @@ +using System; +using System.Threading; +using Streamiz.Kafka.Net.Errors; +using Streamiz.Kafka.Net.Metrics; +using Streamiz.Kafka.Net.Metrics.Internal; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.State.Suppress; +using Streamiz.Kafka.Net.Table; +using Streamiz.Kafka.Net.Table.Internal; + +namespace Streamiz.Kafka.Net.Processors +{ + internal class KTableSuppressProcessor : AbstractProcessor> + { + private readonly long maxRecords; + private readonly long maxBytes; + private readonly long suppressionDurationMs; + private readonly BUFFER_FULL_STRATEGY bufferFullStrategy; + private readonly bool safeToDropTombstone; + private readonly string storeName; + private readonly Func timeDefinition; + + private Sensor suppressionEmitSensor; + private long observedStreamTime = -1; + private ITimeOrderedKeyValueBuffer> buffer; + private readonly ISerDes keySerdes; + private readonly ISerDes valueSerdes; + + private bool OverCapacity => buffer?.BufferSize > maxBytes || buffer?.NumRecords > maxRecords; + + public KTableSuppressProcessor(Suppressed suppressed, string storeName) + { + maxRecords = suppressed.BufferConfig.MaxRecords; + maxBytes = suppressed.BufferConfig.MaxBytes; + bufferFullStrategy = suppressed.BufferConfig.BufferFullStrategy; + suppressionDurationMs = (long)suppressed.SuppressionTime.TotalMilliseconds; + safeToDropTombstone = suppressed.SafeToDropTombstones; + this.storeName = storeName; + timeDefinition = suppressed.TimeDefinition; + keySerdes = suppressed.KeySerdes; + valueSerdes = suppressed.ValueSerdes; + } + + public override void Init(ProcessorContext context) + { + base.Init(context); + suppressionEmitSensor = ProcessorNodeMetrics.SuppressionEmitSensor( + Thread.CurrentThread.Name, + context.Id, + Name, + context.Metrics); + + buffer = (ITimeOrderedKeyValueBuffer>)context.GetStateStore(storeName); + buffer.SetSerdesIfNull(keySerdes, valueSerdes); + } + + public override void Process(K key, Change value) + { + observedStreamTime = Math.Max(observedStreamTime, Context.Timestamp); + Buffer(key, value); + EnforceConstraints(); + } + + private void Buffer(K key, Change value) + { + var bufferTime = timeDefinition(Context, key); + buffer.Put(bufferTime, key, value, Context.RecordContext); + } + + private void EnforceConstraints() + { + long expiryTime = observedStreamTime - suppressionDurationMs; + buffer.EvictWhile(() => buffer.MinTimestamp <= expiryTime, Emit); + + if (OverCapacity) + { + switch (bufferFullStrategy) + { + case BUFFER_FULL_STRATEGY.EMIT: + buffer.EvictWhile(() => OverCapacity, Emit); + break; + case BUFFER_FULL_STRATEGY.SHUTDOWN: + throw new StreamsException( + $"{Name} buffer exceeded its max capacity. Currently [{buffer.NumRecords}/{maxRecords}] records and [{buffer.BufferSize}/{maxBytes}] bytes."); + } + } + } + + private void Emit(K key, Change value, IRecordContext context) + { + if (value.NewValue != null || !safeToDropTombstone) + { + var currentRecordContext = Context.RecordContext; + try + { + Context.SetRecordMetaData(context); + Forward(key, value); + suppressionEmitSensor.Record(); + } + finally + { + Context.SetRecordMetaData(currentRecordContext); + } + } + } + } +} \ No newline at end of file diff --git a/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs b/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs index 72a8137d..6e228bb8 100644 --- a/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs +++ b/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs @@ -1,5 +1,6 @@ using System; using Streamiz.Kafka.Net.Processors; +using Streamiz.Kafka.Net.SerDes; namespace Streamiz.Kafka.Net.State.Suppress { @@ -12,5 +13,6 @@ internal interface ITimeOrderedKeyValueBuffer : IStateStore bool Put(long timestamp, K key, T value, IRecordContext recordContext); ValueAndTimestamp PriorValueForBuffered(K key); void EvictWhile(Func predicate, Action evictHandler); + void SetSerdesIfNull(ISerDes contextKeySerdes, ISerDes contextValueSerdes); } } \ No newline at end of file diff --git a/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs b/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs index 5875f0ab..e9f5ad45 100644 --- a/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs +++ b/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs @@ -20,7 +20,7 @@ internal class InMemoryTimeOrderedKeyValueChangeBuffer : ITimeOrderedKeyVa public long MinTimestamp { get; private set; } = Int64.MaxValue; public string Name { get; } public bool Persistent => false; - public bool IsLocally { get; } + public bool IsLocally => true; public bool IsOpen { get; private set; } private Sensor bufferSizeSensor; @@ -88,7 +88,7 @@ public InMemoryTimeOrderedKeyValueChangeBuffer( }; } - internal void SetSerdesIfNull(ISerDes contextKeySerdes, ISerDes contextValueSerdes) { + public void SetSerdesIfNull(ISerDes contextKeySerdes, ISerDes contextValueSerdes) { keySerdes ??= contextKeySerdes; valueSerdes ??= contextValueSerdes; } diff --git a/core/State/Windowed.cs b/core/State/Windowed.cs index a8cfe34e..1b06ec28 100644 --- a/core/State/Windowed.cs +++ b/core/State/Windowed.cs @@ -12,7 +12,23 @@ namespace Streamiz.Kafka.Net.State /// Thus, a windowed has type , V. /// /// type of key - public class Windowed + public abstract class Windowed + { + /// + /// Return the window containing the values associated with this key. + /// + public abstract Window Window { get; } + } + + /// + /// The result key type of a windowed stream aggregation. + /// If a gets grouped and aggregated using a window-aggregation the resulting is a + /// so-called "windowed " with a combined key type that encodes the corresponding aggregation window and + /// the original record key. + /// Thus, a windowed has type , V. + /// + /// type of key + public class Windowed : Windowed { /// /// Constructor with Key and Window data @@ -33,7 +49,7 @@ public Windowed(K key, Window window) /// /// Return the window containing the values associated with this key. /// - public Window Window { get; } + public override Window Window { get; } /// /// diff --git a/core/Table/IKTable.cs b/core/Table/IKTable.cs index 785d5dbc..6fe4fc70 100644 --- a/core/Table/IKTable.cs +++ b/core/Table/IKTable.cs @@ -957,6 +957,6 @@ public interface IKTable /// Configuration object determining what, if any, updates to suppress /// A config used to name the processor in the topology. Default : null /// A new KTable with the desired suppression characteristics. - IKTable Suppress(Suppressed suppressed, string named = null); + IKTable Suppress(Suppressed suppressed, string named = null); } } \ No newline at end of file diff --git a/core/Table/Internal/Graph/KTableSuppress.cs b/core/Table/Internal/Graph/KTableSuppress.cs index 5eace2d6..87dc3f4b 100644 --- a/core/Table/Internal/Graph/KTableSuppress.cs +++ b/core/Table/Internal/Graph/KTableSuppress.cs @@ -1,14 +1,16 @@ using Streamiz.Kafka.Net.Processors; using Streamiz.Kafka.Net.State; +using Streamiz.Kafka.Net.State.Suppress; namespace Streamiz.Kafka.Net.Table.Internal.Graph { internal class KTableSuppress : IKTableProcessorSupplier { - internal class KTableSuppressValueGetter : IKTableValueGetter + private class KTableSuppressValueGetter : IKTableValueGetter { private readonly IKTableValueGetter _parentKTableValueGetter; private readonly string _storeName; + private ITimeOrderedKeyValueBuffer> buffer; public KTableSuppressValueGetter(IKTableValueGetterSupplier parentKTableValueGetterSupplier, string storeName) { @@ -19,12 +21,13 @@ public KTableSuppressValueGetter(IKTableValueGetterSupplier parentKTableVal public void Init(ProcessorContext context) { _parentKTableValueGetter.Init(context); - context.GetStateStore(_storeName); + buffer = (ITimeOrderedKeyValueBuffer>)context.GetStateStore(_storeName); } public ValueAndTimestamp Get(K key) { - throw new System.NotImplementedException(); + var value = buffer.PriorValueForBuffered(key); + return value ?? _parentKTableValueGetter.Get(key); } public void Close() @@ -33,12 +36,12 @@ public void Close() } } - private readonly Suppressed _suppressed; + private readonly Suppressed _suppressed; private readonly string _storeName; private readonly KTable _parent; public KTableSuppress( - Suppressed suppressed, + Suppressed suppressed, string storeName, KTable parentTable) { @@ -48,8 +51,7 @@ public KTableSuppress( // The suppress buffer requires seeing the old values, to support the prior value view. _parent.EnableSendingOldValues(); } - - + public void EnableSendingOldValues() => _parent.EnableSendingOldValues(); diff --git a/core/Table/Internal/KTable.cs b/core/Table/Internal/KTable.cs index 23a7ab63..7524fba5 100644 --- a/core/Table/Internal/KTable.cs +++ b/core/Table/Internal/KTable.cs @@ -242,7 +242,7 @@ public IKTable OuterJoin(IKTable table, Func va #region Suppress - public IKTable Suppress(Suppressed suppressed, string named = null) + public IKTable Suppress(Suppressed suppressed, string named = null) { var name = new Named(named).OrElseGenerateWithPrefix(builder, KTable.SUPPRESS_NAME); suppressed.Name = name; diff --git a/core/Table/Suppressed.cs b/core/Table/Suppressed.cs index 874f14a5..5918fda9 100644 --- a/core/Table/Suppressed.cs +++ b/core/Table/Suppressed.cs @@ -1,20 +1,21 @@ using System; using System.Collections.Generic; +using Streamiz.Kafka.Net.SerDes; using Streamiz.Kafka.Net.State; namespace Streamiz.Kafka.Net.Table { public static class SuppressedBuilder { - public static Suppressed UntilWindowClose(TimeSpan gracePeriod, IBufferConfig bufferConfig) - where K : Windowed => new( + public static Suppressed UntilWindowClose(TimeSpan gracePeriod, IBufferConfig bufferConfig) + where K : Windowed => new( null, gracePeriod, bufferConfig, (_, key) => key.Window.EndMs, true); - public static Suppressed UntilTimeLimit(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig) + public static Suppressed UntilTimeLimit(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig) => new( null, timeToWaitMoreEvents, @@ -116,7 +117,7 @@ public EagerConfig WithLoggingDisabled() } } - public class Suppressed + public class Suppressed { public TimeSpan SuppressionTime { get; } public IBufferConfig BufferConfig { get; } @@ -124,6 +125,9 @@ public class Suppressed public bool SafeToDropTombstones { get; } public string Name { get; internal set; } + public ISerDes KeySerdes { get; private set; } + public ISerDes ValueSerdes { get; private set; } + internal Suppressed( string name, TimeSpan suppressionTime, @@ -137,5 +141,17 @@ internal Suppressed( TimeDefinition = timeDefinition; SafeToDropTombstones = safeToDropTombstones; } + + public Suppressed WithKeySerdes(ISerDes keySerdes) + { + KeySerdes = keySerdes; + return this; + } + + public Suppressed WithValueSerdes(ISerDes valueSerdes) + { + ValueSerdes = valueSerdes; + return this; + } } } \ No newline at end of file diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs index f34e4e83..332f4bd4 100644 --- a/launcher/sample-stream/Program.cs +++ b/launcher/sample-stream/Program.cs @@ -47,8 +47,9 @@ private static Topology BuildTopology() builder.Stream("input") .GroupByKey() + .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))) .Count() - .Suppress(SuppressedBuilder.UntilTimeLimit(TimeSpan.FromMinutes(1), StrictBufferConfig.Unbounded())) + .Suppress(SuppressedBuilder.UntilWindowClose, long>(TimeSpan.FromMinutes(1), StrictBufferConfig.Unbounded())) .ToStream() .To("output");