Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-328 : Feature/suppress #377

Merged
merged 10 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ static async System.Threading.Tasks.Task Main(string[] args)
| Sliding window | X | | No plan for now |
| Session window | X | | No plan for now |
| Cache | X | X | EA 1.6.0 |
| Suppress(..) | X | | No plan for now |
| Suppress(..) | X | X | Plan for 1.7.0 |
| Interactive Queries | X | | No plan for now |
| State store batch restoring | X | | No plan for now |
| Exactly Once (v1 and v2) | X | X | EOS V1 supported, EOS V2 not supported yet |
| Exactly Once v2 | X | X | |

# Community Support

Expand Down
32 changes: 32 additions & 0 deletions core/Crosscutting/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ public ByteBuffer PutInt(int @int)

#region ReadOperation

public long GetLong()
{
var bytes = reader.ReadBytes(sizeof(long));

if (BitConverter.IsLittleEndian && bigEndian)
bytes = bytes.Reverse().ToArray();

return BitConverter.ToInt64(bytes, 0);
}

public int GetInt()
{
var bytes = reader.ReadBytes(sizeof(int));

if (BitConverter.IsLittleEndian && bigEndian)
bytes = bytes.Reverse().ToArray();

return BitConverter.ToInt32(bytes, 0);
}

public byte[] GetBytes(int size)
{
return reader.ReadBytes(size);
}

public long GetLong(int offset)
{
reader.BaseStream.Seek(offset, SeekOrigin.Begin);
Expand Down Expand Up @@ -116,6 +141,13 @@ public byte[] GetBytes(int offset, int size)
return reader.ReadBytes(size);
}

public byte[] GetNullableSizePrefixedArray()
{
var size = GetInt();
if (size == -1)
return null;
return GetBytes(size);
}
#endregion

public byte[] ToArray()
Expand Down
1 change: 0 additions & 1 deletion core/Metrics/Internal/ProcessorNodeMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal class ProcessorNodeMetrics
internal static readonly string RETRY_AVG_DESCRIPTION = StreamMetricsRegistry.RATE_DESCRIPTION_PREFIX + RETRY_DESCRIPTION;
internal static readonly string RETRY_MAX_DESCRIPTION = StreamMetricsRegistry.TOTAL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

// NOT USE FOR MOMENT
public static Sensor SuppressionEmitSensor(string threadId,
TaskId taskId,
string processorNodeId,
Expand Down
2 changes: 1 addition & 1 deletion core/ProcessorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ProcessorContext
internal virtual SerDesContext SerDesContext { get; }
internal virtual IStreamConfig Configuration { get; }
internal virtual IRecordContext RecordContext { get; private set; }
internal IRecordCollector RecordCollector { get; private set; }
internal virtual IRecordCollector RecordCollector { get; private set; }
internal virtual IStateManager States { get; }
internal virtual bool FollowMetadata { get; set; }

Expand Down
11 changes: 11 additions & 0 deletions core/Processors/IRecordContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,16 @@ public interface IRecordContext
/// </summary>
/// <param name="headers">new headers</param>
void SetHeaders(Headers headers);

/// <summary>
/// Estimate amount of memory used to store this record context
/// </summary>
long MemorySizeEstimate { get; }

/// <summary>
/// Serialize all the metadata into a byte array
/// </summary>
/// <returns>Return a byte array which contains all the metadata fields serialized</returns>
byte[] Serialize();
}
}
77 changes: 76 additions & 1 deletion core/Processors/Internal/RecordContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Confluent.Kafka;
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Processors.Internal
{
Expand Down Expand Up @@ -42,5 +45,77 @@ public void SetHeaders(Headers headers)
{
Headers = headers;
}

public long MemorySizeEstimate
=> sizeof(int) + // partition
sizeof(long) + //offset
sizeof(long) + // timestamp
Topic.Length + // topic length
Headers.GetEstimatedSize(); // headers size

public byte[] Serialize()
{
var size = sizeof(int) + // partition
sizeof(long) + //offset
sizeof(long) + // timestamp
sizeof(int) + // topic length
Topic.Length + // topic
sizeof(int); // number of headers

List<(byte[], byte[])> byteHeaders = new();

foreach(var header in Headers)
{
size += 2 * sizeof(int); // size of key and value
size += header.Key.Length;
if (header.GetValueBytes() != null)
size += header.GetValueBytes().Length;
byteHeaders.Add((Encoding.UTF8.GetBytes(header.Key), header.GetValueBytes()));
}

var buffer = ByteBuffer.Build(size, true);
buffer.PutInt(Partition)
.PutLong(Offset)
.PutLong(Timestamp)
.PutInt(Topic.Length)
.Put(Encoding.UTF8.GetBytes(Topic))
.PutInt(byteHeaders.Count);

foreach (var byteHeader in byteHeaders)
{
buffer.PutInt(byteHeader.Item1.Length).Put(byteHeader.Item1);
if (byteHeader.Item2 != null)
buffer.PutInt(byteHeader.Item2.Length).Put(byteHeader.Item2);
else
buffer.PutInt(-1);
}

return buffer.ToArray();
}

internal static IRecordContext Deserialize(ByteBuffer buffer)
{
var partition = buffer.GetInt();
var offset = buffer.GetLong();
var timestamp = buffer.GetLong();
var topic = Encoding.UTF8.GetString(buffer.GetBytes(buffer.GetInt()));
var headerCount = buffer.GetInt();
Headers headers = new Headers();

for (int i = 0; i < headerCount; ++i)
{
var headerKeyLength = buffer.GetInt();
var headerKey = buffer.GetBytes(headerKeyLength);
byte[] headerValue = null;
var headerValueLength = buffer.GetInt();

if (headerValueLength > 0)
headerValue = buffer.GetBytes(headerValueLength);

headers.Add(Encoding.UTF8.GetString(headerKey), headerValue);
}

return new RecordContext(headers, offset, timestamp, partition, topic);
}
}
}
107 changes: 107 additions & 0 deletions core/Processors/KTableSuppressProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Threading;
using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Internal;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.Suppress;
using Streamiz.Kafka.Net.Table;
using Streamiz.Kafka.Net.Table.Internal;

namespace Streamiz.Kafka.Net.Processors
{
internal class KTableSuppressProcessor<K, V> : AbstractProcessor<K, Change<V>>
{
private readonly long maxRecords;
private readonly long maxBytes;
private readonly long suppressionDurationMs;
private readonly BUFFER_FULL_STRATEGY bufferFullStrategy;
private readonly bool safeToDropTombstone;
private readonly string storeName;
private readonly Func<ProcessorContext, K, long> timeDefinition;

private Sensor suppressionEmitSensor;
private long observedStreamTime = -1;
private ITimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
private readonly ISerDes<K> keySerdes;
private readonly ISerDes<V> valueSerdes;

private bool OverCapacity => buffer.BufferSize > maxBytes || buffer.NumRecords > maxRecords;

public KTableSuppressProcessor(Suppressed<K, V> suppressed, string storeName)
{
maxRecords = suppressed.BufferConfig.MaxRecords;
maxBytes = suppressed.BufferConfig.MaxBytes;
bufferFullStrategy = suppressed.BufferConfig.BufferFullStrategy;
suppressionDurationMs = (long)suppressed.SuppressionTime.TotalMilliseconds;
safeToDropTombstone = suppressed.SafeToDropTombstones;
this.storeName = storeName;
timeDefinition = suppressed.TimeDefinition;
keySerdes = suppressed.KeySerdes;
valueSerdes = suppressed.ValueSerdes;
}

public override void Init(ProcessorContext context)
{
base.Init(context);
suppressionEmitSensor = ProcessorNodeMetrics.SuppressionEmitSensor(
Thread.CurrentThread.Name,
context.Id,
Name,
context.Metrics);

buffer = (ITimeOrderedKeyValueBuffer<K, V, Change<V>>)context.GetStateStore(storeName);
buffer.SetSerdesIfNull(keySerdes, valueSerdes);
}

public override void Process(K key, Change<V> value)
{
observedStreamTime = Math.Max(observedStreamTime, Context.Timestamp);
Buffer(key, value);
EnforceConstraints();
}

private void Buffer(K key, Change<V> value)
{
var bufferTime = timeDefinition(Context, key);
buffer.Put(bufferTime, key, value, Context.RecordContext);
}

private void EnforceConstraints()
{
long expiryTime = observedStreamTime - suppressionDurationMs;
buffer.EvictWhile(() => buffer.MinTimestamp <= expiryTime, Emit);

if (OverCapacity)
{
switch (bufferFullStrategy)
{
case BUFFER_FULL_STRATEGY.EMIT:
buffer.EvictWhile(() => OverCapacity, Emit);
break;
case BUFFER_FULL_STRATEGY.SHUTDOWN:
throw new StreamsException(
$"{Name} buffer exceeded its max capacity. Currently [{buffer.NumRecords}/{maxRecords}] records and [{buffer.BufferSize}/{maxBytes}] bytes.");
}
}
}

private void Emit(K key, Change<V> value, IRecordContext context)
{
if (value.NewValue != null || !safeToDropTombstone)
{
var currentRecordContext = Context.RecordContext;
try
{
Context.SetRecordMetaData(context);
Forward(key, value);
suppressionEmitSensor.Record();
}
finally
{
Context.SetRecordMetaData(currentRecordContext);
}
}
}
}
}
6 changes: 1 addition & 5 deletions core/State/Cache/CacheEntryValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ internal CacheEntryValue(byte[] value, Headers headers, long offset, long timest

public long Size =>
(Value != null ? Value.LongLength : 0) +
sizeof(int) + // partition
sizeof(long) + //offset
sizeof(long) + // timestamp
Context.Topic.Length + // topic length
Context.Headers.GetEstimatedSize(); // headers size
Context.MemorySizeEstimate;
}
}
19 changes: 19 additions & 0 deletions core/State/Suppress/ITimeOrderedKeyValueBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using Streamiz.Kafka.Net.Processors;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.Suppress.Internal;

namespace Streamiz.Kafka.Net.State.Suppress
{
internal interface ITimeOrderedKeyValueBuffer<K, V, T> : IStateStore
{
long NumRecords { get; }
long BufferSize { get; }
long MinTimestamp { get; }

bool Put(long timestamp, K key, T value, IRecordContext recordContext);
Maybe<ValueAndTimestamp<V>> PriorValueForBuffered(K key);
void EvictWhile(Func<bool> predicate, Action<K, T, IRecordContext> evictHandler);
void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.State.Suppress.Internal;

namespace Streamiz.Kafka.Net.State.Suppress
{
internal class InMemoryTimeOrderedKeyValueChangeBufferBuilder<K, V>
: AbstractStoreBuilder<K, V, InMemoryTimeOrderedKeyValueChangeBuffer<K, V>>
{
public InMemoryTimeOrderedKeyValueChangeBufferBuilder(string name, ISerDes<K> keySerde, ISerDes<V> valueSerde)
: base(name, keySerde, valueSerde)
{
WithCachingDisabled(); // disable explicitly the cache
}

// Not used
public override bool IsWindowStore => false;
public override long RetentionMs => -1;

public override InMemoryTimeOrderedKeyValueChangeBuffer<K, V> Build() =>
new(
Name,
LoggingEnabled,
keySerdes,
valueSerdes);
}
}
30 changes: 30 additions & 0 deletions core/State/Suppress/Internal/BufferKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.State.Suppress.Internal
{
internal class BufferKeyComparer : IComparer<BufferKey>
{
public int Compare(BufferKey x, BufferKey y)
=> x.CompareTo(y);
}

internal class BufferKey : IComparable<BufferKey>
{
public long Time { get; }
public Bytes Key { get; }

public BufferKey(long time, Bytes key)
{
Time = time;
Key = key;
}

public int CompareTo(BufferKey other)
{
var compared = Time.CompareTo(other.Time);
return compared == 0 ? Key.CompareTo(other.Key) : compared;
}
}
}
Loading
Loading