Skip to content

Commit

Permalink
KIP-328 - Implementation done
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Oct 10, 2024
1 parent 059d274 commit 8887009
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 19 deletions.
1 change: 0 additions & 1 deletion core/Metrics/Internal/ProcessorNodeMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal class ProcessorNodeMetrics
internal static readonly string RETRY_AVG_DESCRIPTION = StreamMetricsRegistry.RATE_DESCRIPTION_PREFIX + RETRY_DESCRIPTION;
internal static readonly string RETRY_MAX_DESCRIPTION = StreamMetricsRegistry.TOTAL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

// NOT USE FOR MOMENT
public static Sensor SuppressionEmitSensor(string threadId,
TaskId taskId,
string processorNodeId,
Expand Down
107 changes: 107 additions & 0 deletions core/Processors/KTableSuppressProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Threading;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Internal;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.Suppress;
using Streamiz.Kafka.Net.Table;
using Streamiz.Kafka.Net.Table.Internal;

namespace Streamiz.Kafka.Net.Processors
{
internal class KTableSuppressProcessor<K, V> : AbstractProcessor<K, Change<V>>
{
private readonly long maxRecords;
private readonly long maxBytes;
private readonly long suppressionDurationMs;
private readonly BUFFER_FULL_STRATEGY bufferFullStrategy;
private readonly bool safeToDropTombstone;
private readonly string storeName;
private readonly Func<ProcessorContext, K, long> timeDefinition;

private Sensor suppressionEmitSensor;
private long observedStreamTime = -1;
private ITimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
private readonly ISerDes<K> keySerdes;
private readonly ISerDes<V> valueSerdes;

private bool OverCapacity => buffer?.BufferSize > maxBytes || buffer?.NumRecords > maxRecords;

public KTableSuppressProcessor(Suppressed<K, V> suppressed, string storeName)
{
maxRecords = suppressed.BufferConfig.MaxRecords;
maxBytes = suppressed.BufferConfig.MaxBytes;
bufferFullStrategy = suppressed.BufferConfig.BufferFullStrategy;
suppressionDurationMs = (long)suppressed.SuppressionTime.TotalMilliseconds;
safeToDropTombstone = suppressed.SafeToDropTombstones;
this.storeName = storeName;
timeDefinition = suppressed.TimeDefinition;
keySerdes = suppressed.KeySerdes;
valueSerdes = suppressed.ValueSerdes;
}

public override void Init(ProcessorContext context)
{
base.Init(context);
suppressionEmitSensor = ProcessorNodeMetrics.SuppressionEmitSensor(
Thread.CurrentThread.Name,
context.Id,
Name,
context.Metrics);

buffer = (ITimeOrderedKeyValueBuffer<K, V, Change<V>>)context.GetStateStore(storeName);
buffer.SetSerdesIfNull(keySerdes, valueSerdes);
}

public override void Process(K key, Change<V> value)
{
observedStreamTime = Math.Max(observedStreamTime, Context.Timestamp);
Buffer(key, value);
EnforceConstraints();
}

private void Buffer(K key, Change<V> value)
{
var bufferTime = timeDefinition(Context, key);
buffer.Put(bufferTime, key, value, Context.RecordContext);
}

private void EnforceConstraints()
{
long expiryTime = observedStreamTime - suppressionDurationMs;
buffer.EvictWhile(() => buffer.MinTimestamp <= expiryTime, Emit);

if (OverCapacity)
{
switch (bufferFullStrategy)
{
case BUFFER_FULL_STRATEGY.EMIT:
buffer.EvictWhile(() => OverCapacity, Emit);
break;
case BUFFER_FULL_STRATEGY.SHUTDOWN:
throw new StreamsException(
$"{Name} buffer exceeded its max capacity. Currently [{buffer.NumRecords}/{maxRecords}] records and [{buffer.BufferSize}/{maxBytes}] bytes.");
}
}
}

private void Emit(K key, Change<V> value, IRecordContext context)
{
if (value.NewValue != null || !safeToDropTombstone)
{
var currentRecordContext = Context.RecordContext;
try
{
Context.SetRecordMetaData(context);
Forward(key, value);
suppressionEmitSensor.Record();
}
finally
{
Context.SetRecordMetaData(currentRecordContext);
}
}
}
}
}
2 changes: 2 additions & 0 deletions core/State/Suppress/ITimeOrderedKeyValueBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.SerDes;

namespace Streamiz.Kafka.Net.State.Suppress
{
Expand All @@ -12,5 +13,6 @@ internal interface ITimeOrderedKeyValueBuffer<K, V, T> : IStateStore
bool Put(long timestamp, K key, T value, IRecordContext recordContext);
ValueAndTimestamp<V> PriorValueForBuffered(K key);
void EvictWhile(Func<bool> predicate, Action<K, T, IRecordContext> evictHandler);
void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal class InMemoryTimeOrderedKeyValueChangeBuffer<K, V> : ITimeOrderedKeyVa
public long MinTimestamp { get; private set; } = Int64.MaxValue;
public string Name { get; }
public bool Persistent => false;
public bool IsLocally { get; }
public bool IsLocally => true;
public bool IsOpen { get; private set; }

private Sensor bufferSizeSensor;
Expand Down Expand Up @@ -88,7 +88,7 @@ public InMemoryTimeOrderedKeyValueChangeBuffer(
};
}

internal void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes) {
public void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes) {
keySerdes ??= contextKeySerdes;
valueSerdes ??= contextValueSerdes;
}
Expand Down
20 changes: 18 additions & 2 deletions core/State/Windowed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,23 @@ namespace Streamiz.Kafka.Net.State
/// Thus, a windowed <see cref="IKTable{K, V}"/> has type <code><see cref="Windowed{K}"/>, V</code>.
/// </summary>
/// <typeparam name="K">type of key</typeparam>
public class Windowed<K>
public abstract class Windowed
{
/// <summary>
/// Return the window containing the values associated with this key.
/// </summary>
public abstract Window Window { get; }
}

/// <summary>
/// The result key type of a windowed stream aggregation.
/// If a <see cref="IKStream{K, V}"/> gets grouped and aggregated using a window-aggregation the resulting <see cref="IKTable{K, V}"/>is a
/// so-called "windowed <see cref="IKTable{K, V}"/>" with a combined key type that encodes the corresponding aggregation window and
/// the original record key.
/// Thus, a windowed <see cref="IKTable{K, V}"/> has type <code><see cref="Windowed{K}"/>, V</code>.
/// </summary>
/// <typeparam name="K">type of key</typeparam>
public class Windowed<K> : Windowed
{
/// <summary>
/// Constructor with Key and Window data
Expand All @@ -33,7 +49,7 @@ public Windowed(K key, Window window)
/// <summary>
/// Return the window containing the values associated with this key.
/// </summary>
public Window Window { get; }
public override Window Window { get; }

/// <summary>
///
Expand Down
2 changes: 1 addition & 1 deletion core/Table/IKTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,6 @@ public interface IKTable<K, V>
/// <param name="suppressed">Configuration object determining what, if any, updates to suppress</param>
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
/// <returns>A new KTable with the desired suppression characteristics.</returns>
IKTable<K, V> Suppress(Suppressed<K> suppressed, string named = null);
IKTable<K, V> Suppress(Suppressed<K, V> suppressed, string named = null);
}
}
16 changes: 9 additions & 7 deletions core/Table/Internal/Graph/KTableSuppress.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.State.Suppress;

namespace Streamiz.Kafka.Net.Table.Internal.Graph
{
internal class KTableSuppress<K, S, V> : IKTableProcessorSupplier<K, V, V>
{
internal class KTableSuppressValueGetter : IKTableValueGetter<K, V>
private class KTableSuppressValueGetter : IKTableValueGetter<K, V>
{
private readonly IKTableValueGetter<K, V> _parentKTableValueGetter;
private readonly string _storeName;
private ITimeOrderedKeyValueBuffer<K,V,Change<V>> buffer;

public KTableSuppressValueGetter(IKTableValueGetterSupplier<K,V> parentKTableValueGetterSupplier, string storeName)
{
Expand All @@ -19,12 +21,13 @@ public KTableSuppressValueGetter(IKTableValueGetterSupplier<K,V> parentKTableVal
public void Init(ProcessorContext context)
{
_parentKTableValueGetter.Init(context);
context.GetStateStore(_storeName);
buffer = (ITimeOrderedKeyValueBuffer<K, V, Change<V>>)context.GetStateStore(_storeName);
}

public ValueAndTimestamp<V> Get(K key)
{
throw new System.NotImplementedException();
var value = buffer.PriorValueForBuffered(key);
return value ?? _parentKTableValueGetter.Get(key);
}

public void Close()
Expand All @@ -33,12 +36,12 @@ public void Close()
}
}

private readonly Suppressed<K> _suppressed;
private readonly Suppressed<K, V> _suppressed;
private readonly string _storeName;
private readonly KTable<K, S, V> _parent;

public KTableSuppress(
Suppressed<K> suppressed,
Suppressed<K, V> suppressed,
string storeName,
KTable<K, S, V> parentTable)
{
Expand All @@ -48,8 +51,7 @@ public KTableSuppress(
// The suppress buffer requires seeing the old values, to support the prior value view.
_parent.EnableSendingOldValues();
}



public void EnableSendingOldValues()
=> _parent.EnableSendingOldValues();

Expand Down
2 changes: 1 addition & 1 deletion core/Table/Internal/KTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public IKTable<K, VR> OuterJoin<VT, VR>(IKTable<K, VT> table, Func<V, VT, VR> va

#region Suppress

public IKTable<K, V> Suppress(Suppressed<K> suppressed, string named = null)
public IKTable<K, V> Suppress(Suppressed<K, V> suppressed, string named = null)
{
var name = new Named(named).OrElseGenerateWithPrefix(builder, KTable.SUPPRESS_NAME);
suppressed.Name = name;
Expand Down
24 changes: 20 additions & 4 deletions core/Table/Suppressed.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
using System;
using System.Collections.Generic;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State;

namespace Streamiz.Kafka.Net.Table
{
public static class SuppressedBuilder
{
public static Suppressed<K> UntilWindowClose<K>(TimeSpan gracePeriod, IBufferConfig bufferConfig)
where K : Windowed<K> => new(
public static Suppressed<K, V> UntilWindowClose<K, V>(TimeSpan gracePeriod, IBufferConfig bufferConfig)
where K : Windowed => new(
null,
gracePeriod,
bufferConfig,
(_, key) => key.Window.EndMs,
true);

public static Suppressed<K> UntilTimeLimit<K>(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig)
public static Suppressed<K, V> UntilTimeLimit<K, V>(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig)
=> new(
null,
timeToWaitMoreEvents,
Expand Down Expand Up @@ -116,14 +117,17 @@ public EagerConfig WithLoggingDisabled()
}
}

public class Suppressed<K>
public class Suppressed<K,V>
{
public TimeSpan SuppressionTime { get; }
public IBufferConfig BufferConfig { get; }
public Func<ProcessorContext, K, long> TimeDefinition { get; }
public bool SafeToDropTombstones { get; }
public string Name { get; internal set; }

public ISerDes<K> KeySerdes { get; private set; }
public ISerDes<V> ValueSerdes { get; private set; }

internal Suppressed(
string name,
TimeSpan suppressionTime,
Expand All @@ -137,5 +141,17 @@ internal Suppressed(
TimeDefinition = timeDefinition;
SafeToDropTombstones = safeToDropTombstones;
}

public Suppressed<K, V> WithKeySerdes(ISerDes<K> keySerdes)
{
KeySerdes = keySerdes;
return this;
}

public Suppressed<K, V> WithValueSerdes(ISerDes<V> valueSerdes)
{
ValueSerdes = valueSerdes;
return this;
}
}
}
3 changes: 2 additions & 1 deletion launcher/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ private static Topology BuildTopology()

builder.Stream<string, string>("input")
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1)))
.Count()
.Suppress(SuppressedBuilder.UntilTimeLimit<string>(TimeSpan.FromMinutes(1), StrictBufferConfig.Unbounded()))
.Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromMinutes(1), StrictBufferConfig.Unbounded()))
.ToStream()
.To("output");

Expand Down

0 comments on commit 8887009

Please sign in to comment.