Skip to content

Commit

Permalink
KIP-328 - Fix Sonar + add Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Oct 16, 2024
1 parent fad0c7b commit b85d9b4
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/Processors/KTableSuppressProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class KTableSuppressProcessor<K, V> : AbstractProcessor<K, Change<V>>
private readonly ISerDes<K> keySerdes;
private readonly ISerDes<V> valueSerdes;

private bool OverCapacity => buffer?.BufferSize > maxBytes || buffer?.NumRecords > maxRecords;
private bool OverCapacity => buffer.BufferSize > maxBytes || buffer.NumRecords > maxRecords;

public KTableSuppressProcessor(Suppressed<K, V> suppressed, string storeName)
{
Expand Down
5 changes: 1 addition & 4 deletions core/State/Suppress/Internal/BufferKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public int Compare(BufferKey x, BufferKey y)
=> x.CompareTo(y);
}

internal class BufferKey : IEquatable<BufferKey>, IComparable<BufferKey>
internal class BufferKey : IComparable<BufferKey>
{
public long Time { get; }
public Bytes Key { get; }
Expand All @@ -21,9 +21,6 @@ public BufferKey(long time, Bytes key)
Key = key;
}

public bool Equals(BufferKey other)
=> Time.Equals(other.Time) && Key.Equals(other.Key);

public int CompareTo(BufferKey other)
{
var compared = Time.CompareTo(other.Time);
Expand Down
12 changes: 12 additions & 0 deletions core/State/Suppress/Internal/BufferValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public override bool Equals(object obj)
((NewValue == null && value.NewValue == null) || NewValue.SequenceEqual(value.NewValue));
}

public override int GetHashCode()
{
#if NETSTANDARD2_0
uint hashCode1 = NewValue != null ? (uint) NewValue.GetHashCode() : 0U;
uint hashCode2 = PriorValue != null ? (uint) PriorValue.GetHashCode() : 0U;
uint hashCode3 = OldValue != null ? (uint) OldValue.GetHashCode() : 0U;
return (int)(hashCode1 & hashCode2 & hashCode3);
#else
return HashCode.Combine(NewValue, PriorValue, OldValue);
#endif
}

public long MemoryEstimatedSize()
{
return (PriorValue?.Length ?? 0) +
Expand Down
201 changes: 201 additions & 0 deletions core/Table/Suppressed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,36 @@

namespace Streamiz.Kafka.Net.Table
{
/// <summary>
/// <see cref="Suppressed{K,V}"/> builder
/// </summary>
public static class SuppressedBuilder
{
/// <summary>
/// Configure the suppression to emit only the "final results" from the window.
/// <para>
/// By default all Streams operators emit results whenever new results are available. This includes windowed operations.
/// </para>
/// <para>
/// Using a <see cref="StrictBufferConfig"/> configuration will instead emit just one result per key for each window, guaranteeing
/// to deliver only the final result. This option is suitable for use cases in which the business logic
/// requires a hard guarantee that only the final result is propagated. For example, sending alerts.
/// </para>
/// To accomplish this, the operator will buffer events from the window until the window close (that is,
/// until the end-time passes, and additionally until the grace period expires). Since windowed operators
/// are required to reject out-of-order events for a window whose grace period is expired, there is an additional
/// guarantee that the final results emitted from this suppression will match any queryable state upstream.
/// <para>
/// Using a <see cref="EagerConfig"/> configuration will define how much space to use for buffering intermediate results.
/// It may results multiple values for the same window depending your throughput (# of keys or # of bytes).
/// In this case, it can be appropriate to limit the number of downstream records and limiting the memory usage of your application.
/// </para>
/// </summary>
/// <param name="gracePeriod">Define the grace period where your window is considered expired</param>
/// <param name="bufferConfig">A configuration specifying how much space to use for buffering intermediate results.</param>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
/// <returns>a suppression configuration</returns>
public static Suppressed<K, V> UntilWindowClose<K, V>(TimeSpan gracePeriod, IBufferConfig bufferConfig)
where K : Windowed => new(
null,
Expand All @@ -15,6 +43,15 @@ public static Suppressed<K, V> UntilWindowClose<K, V>(TimeSpan gracePeriod, IBuf
(_, key) => key.Window.EndMs,
true);

/// <summary>
/// Configure the suppression to wait a specific amount of time after receiving a record before emitting it further downstream.
/// If another record for the same key arrives in the mean time, it replaces the first record in the buffer but does not re-start the timer.
/// </summary>
/// <param name="timeToWaitMoreEvents">The amount of time to wait, per record, for new events.</param>
/// <param name="bufferConfig">A configuration specifying how much space to use for buffering intermediate results.</param>
/// <typeparam name="K">Key type</typeparam>
/// <typeparam name="V">Value type</typeparam>
/// <returns>a suppression configuration</returns>
public static Suppressed<K, V> UntilTimeLimit<K, V>(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig)
=> new(
null,
Expand All @@ -24,27 +61,72 @@ public static Suppressed<K, V> UntilTimeLimit<K, V>(TimeSpan timeToWaitMoreEvent
false);
}

/// <summary>
/// Buffer Full Strategyy
/// </summary>
public enum BUFFER_FULL_STRATEGY
{
/// <summary>
/// Emit early the results
/// </summary>
EMIT,
/// <summary>
/// Stop the stream
/// </summary>
SHUTDOWN
}

/// <summary>
/// Determine the buffer rules for the suppress processor in term of records, cache size and the strategy in case the byffer is full
/// </summary>
public interface IBufferConfig
{
/// <summary>
/// Define how many records the buffer can accept
/// </summary>
long MaxRecords{ get; }
/// <summary>
/// Define how many bytes the buffer can store
/// </summary>
long MaxBytes { get; }
/// <summary>
/// Define the buffer full strategy policy
/// </summary>
BUFFER_FULL_STRATEGY BufferFullStrategy { get; }
/// <summary>
/// Enable the change logging
/// </summary>
bool LoggingEnabled { get; }
/// <summary>
/// Explicit config for the backup changelog topic
/// </summary>
IDictionary<string, string> Config { get; }
}

/// <summary>
/// Marker class for a buffer configuration that is "strict" in the sense that it will strictly enforce the time bound and never emit early.
/// </summary>
public class StrictBufferConfig : IBufferConfig
{
/// <summary>
/// Define how many records the buffer can accept
/// </summary>
public long MaxRecords { get; }
/// <summary>
/// Define how many bytes the buffer can store
/// </summary>
public long MaxBytes { get; }
/// <summary>
/// Define the buffer full strategy policy as "SHUTDOWN"
/// </summary>
public BUFFER_FULL_STRATEGY BufferFullStrategy => BUFFER_FULL_STRATEGY.SHUTDOWN;
/// <summary>
/// Enable the change logging
/// </summary>
public bool LoggingEnabled { get; private set; }
/// <summary>
/// Explicit config for the backup changelog topic
/// </summary>
public IDictionary<string, string> Config { get; private set;}

private StrictBufferConfig(long maxRecords, long maxBytes)
Expand All @@ -63,17 +145,49 @@ private StrictBufferConfig()
Config = new Dictionary<string, string>();
}

/// <summary>
/// Create a buffer unconstrained by size (either keys or bytes).
/// As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
/// If there isn't enough resource available to meet the demand, the application will encounter an
/// <see cref="OutOfMemoryException"/> and shut down (not guaranteed to be a graceful exit).
/// This is a convenient option if you doubt that your buffer will be that large, but also don't
/// wish to pick particular constraints, such as in testing.
/// This buffer is "strict" in the sense that it will enforce the time bound or crash.
/// It will never emit early.
/// </summary>
/// <returns></returns>
public static StrictBufferConfig Unbounded() => new();

/// <summary>
/// Create a buffer to gracefully shut down the application when the number of different keys exceed the maximum.
/// </summary>
/// <param name="maxRecords">Maximum number of different keys</param>
/// <returns></returns>
public static StrictBufferConfig Bounded(long maxRecords) => new(maxRecords, Int64.MaxValue);

/// <summary>
/// Create a buffer to gracefully shut down the application when the number of bytes used by the buffer is exceed.
/// </summary>
/// <param name="maxBytes">Maximum number of bytes</param>
/// <returns></returns>
public static StrictBufferConfig Bounded(CacheSize maxBytes) => new(Int64.MaxValue, maxBytes.CacheSizeBytes);

/// <summary>
/// Enable the logging feature
/// </summary>
/// <param name="config">Optional configuration for the changelog topic</param>
/// <returns></returns>
public StrictBufferConfig WithLoggingEnabled(IDictionary<string, string> config)
{
Config = config ?? new Dictionary<string, string>();
LoggingEnabled = true;
return this;
}

/// <summary>
/// Disable the logging feature
/// </summary>
/// <returns></returns>
public StrictBufferConfig WithLoggingDisabled()
{
Config = null;
Expand All @@ -82,12 +196,31 @@ public StrictBufferConfig WithLoggingDisabled()
}
}

/// <summary>
/// Marker class for a buffer configuration that will strictly enforce size constraints (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
/// results downstream, but does not promise to eliminate them entirely.
/// </summary>
public class EagerConfig : IBufferConfig
{
/// <summary>
/// Define how many records the buffer can accept
/// </summary>
public long MaxRecords { get; }
/// <summary>
/// Define how many bytes the buffer can store
/// </summary>
public long MaxBytes { get; }
/// <summary>
/// Define the buffer full strategy policy as "EMIT"
/// </summary>
public BUFFER_FULL_STRATEGY BufferFullStrategy => BUFFER_FULL_STRATEGY.EMIT;
/// <summary>
/// Enable the change logging
/// </summary>
public bool LoggingEnabled { get; private set; }
/// <summary>
/// Explicit config for the backup changelog topic
/// </summary>
public IDictionary<string, string> Config { get; private set; }

private EagerConfig(long maxRecords, long maxBytes)
Expand All @@ -98,17 +231,44 @@ private EagerConfig(long maxRecords, long maxBytes)
Config = new Dictionary<string, string>();
}

/// <summary>
/// Create a size-constrained buffer in terms of the maximum number of keys it will store.
/// </summary>
/// <param name="maxRecords">Maximum number of keys</param>
/// <returns></returns>
public static EagerConfig EmitEarlyWhenFull(long maxRecords) => new(maxRecords, Int64.MaxValue);

/// <summary>
/// Create a size-constrained buffer in terms of the maximum number of bytes it will use.
/// </summary>
/// <param name="maxBytes">Maximum number of bytes</param>
/// <returns></returns>
public static EagerConfig EmitEarlyWhenFull(CacheSize maxBytes) => new(Int64.MaxValue, maxBytes.CacheSizeBytes);

/// <summary>
/// Create a size-constrained buffer in terms of the maximum number of keys it will store OR maximum number of bytes it will use.
/// </summary>
/// <param name="maxRecords">Maximum number of keys</param>
/// <param name="maxBytes">Maximum number of bytes</param>
/// <returns></returns>
public static EagerConfig EmitEarlyWhenFull(long maxRecords, CacheSize maxBytes) => new(maxRecords, maxBytes.CacheSizeBytes);

/// <summary>
/// Enable the logging feature
/// </summary>
/// <param name="config">Optional configuration for the changelog topic</param>
/// <returns></returns>
public EagerConfig WithLoggingEnabled(IDictionary<string, string> config)
{
Config = config ?? new Dictionary<string, string>();
LoggingEnabled = true;
return this;
}

/// <summary>
/// Disable the logging feature
/// </summary>
/// <returns></returns>
public EagerConfig WithLoggingDisabled()
{
Config = null;
Expand All @@ -117,15 +277,46 @@ public EagerConfig WithLoggingDisabled()
}
}

/// <summary>
/// Define the behavior of the suppress processor used in <see cref="IKTable{K,V}.Suppress"/>.
/// </summary>
/// <typeparam name="K">Type of key</typeparam>
/// <typeparam name="V">Type of value</typeparam>
public class Suppressed<K,V>
{
/// <summary>
/// Used as a grace period
/// </summary>
public TimeSpan SuppressionTime { get; }

/// <summary>
/// Define the buffer config policy
/// </summary>
public IBufferConfig BufferConfig { get; }

/// <summary>
/// Define the buffer time for each record
/// </summary>
public Func<ProcessorContext, K, long> TimeDefinition { get; }

/// <summary>
/// Be careful. It's only safe to drop tombstones for windowed KTables in "final results" mode.
/// </summary>
public bool SafeToDropTombstones { get; }

/// <summary>
/// Name of the suppress processor
/// </summary>
public string Name { get; internal set; }

/// <summary>
/// (Optional) Key serdes
/// </summary>
public ISerDes<K> KeySerdes { get; private set; }

/// <summary>
/// (Optional) Value serdes
/// </summary>
public ISerDes<V> ValueSerdes { get; private set; }

internal Suppressed(
Expand All @@ -142,12 +333,22 @@ internal Suppressed(
SafeToDropTombstones = safeToDropTombstones;
}

/// <summary>
/// Define the key serdes
/// </summary>
/// <param name="keySerdes">key serdes</param>
/// <returns></returns>
public Suppressed<K, V> WithKeySerdes(ISerDes<K> keySerdes)
{
KeySerdes = keySerdes;
return this;
}

/// <summary>
/// Define the value serdes
/// </summary>
/// <param name="valueSerdes">value serdes</param>
/// <returns></returns>
public Suppressed<K, V> WithValueSerdes(ISerDes<V> valueSerdes)
{
ValueSerdes = valueSerdes;
Expand Down
Loading

0 comments on commit b85d9b4

Please sign in to comment.