diff --git a/.circleci/config.yml b/.circleci/config.yml
index 9fc6e226..65c38f79 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -15,7 +15,7 @@
- run: echo "deb http://ftp.debian.org/debian stable main contrib non-free" >> /etc/apt/sources.list
- run: apt update
- run: apt install -y unzip
- # Install JAVA 11
+ # Install JAVA 17
- run: mkdir -p /usr/share/man/man1 # FIX https://github.com/geerlingguy/ansible-role-java/issues/64
- run: apt install -y openjdk-17-jdk
# BEGIN Dependencies for RocksDB
@@ -24,7 +24,7 @@
# END Dependencies for RocksDB
- run: set JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64/
- run: export JAVA_HOME
- - run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs"
+ - run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs,**State/Cache/Internal/*.cs"
- run: dotnet build
- run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN}
diff --git a/README.md b/README.md
index 22a65cac..a5130497 100644
--- a/README.md
+++ b/README.md
@@ -104,22 +104,22 @@ static async System.Threading.Tasks.Task Main(string[] args)
|:------------------------------------------------------------:|:----------------------------------:|:----------------------:|:------------------------------------------:|
| Stateless processors | X | X | |
| RocksDb store | X | X | |
-| Standby replicas | X | | No plan for now |
+| Standby replicas | X | | No plan for now |
| InMemory store | X | X | |
| Transformer, Processor API | X | X | |
-| Punctuate | X | X | |
+| Punctuate | X | X | |
| KStream-KStream Join | X | X | |
| KTable-KTable Join | X | X | |
-| KTable-KTable FK Join | X | | Plan for 1.6.0 |
+| KTable-KTable FK Join | X | | Plan for future |
| KStream-KTable Join | X | X | |
| KStream-GlobalKTable Join | X | X | |
-| KStream Async Processing (external call inside the topology) | | X | Not supported in Kafka Streams JAVA |
+| KStream Async Processing (external call inside the topology) | | X | Not supported in Kafka Streams JAVA |
| Hopping window | X | X | |
| Tumbling window | X | X | |
| Sliding window | X | | No plan for now |
| Session window | X | | No plan for now |
-| Cache | X | | Plan for 1.6.0 |
-| Suppress(..) | X | | No plan for now |
+| Cache | X | X | EA 1.6.0 |
+| Suppress(..) | X | | No plan for now |
| Interactive Queries | X | | No plan for now |
| State store batch restoring | X | | No plan for now |
| Exactly Once (v1 and v2) | X | X | EOS V1 supported, EOS V2 not supported yet |
@@ -137,3 +137,13 @@ When adding or changing a service please add tests and documentations.
# Support
You can found support [here](https://discord.gg/J7Jtxum)
+
+# Star History
+
+
+
+
diff --git a/core/Crosscutting/DictionaryExtensions.cs b/core/Crosscutting/DictionaryExtensions.cs
index f28d3b5e..6a3cd1f1 100644
--- a/core/Crosscutting/DictionaryExtensions.cs
+++ b/core/Crosscutting/DictionaryExtensions.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using Confluent.Kafka;
namespace Streamiz.Kafka.Net.Crosscutting
@@ -26,11 +27,9 @@ public static bool AddOrUpdate(this IDictionary map, K key, V value)
map[key] = value;
return false;
}
- else
- {
- map.Add(key, value);
- return true;
- }
+
+ map.Add(key, value);
+ return true;
}
///
@@ -134,5 +133,32 @@ public static void CreateListOrAdd(this IDictionary> source, K
source.Add(key, new List{value});
}
+ #if NETSTANDARD2_0
+ public static bool TryAdd( this IDictionary dictionary,
+ K key,
+ V value){
+ if (dictionary == null)
+ throw new ArgumentNullException(nameof (dictionary));
+ if (dictionary.ContainsKey(key))
+ return false;
+ dictionary.Add(key, value);
+ return true;
+ }
+
+ public static bool Remove( this IDictionary dictionary,
+ K key,
+ out V value){
+ bool result = dictionary.TryGetValue(key, out V valueTmp);
+ if (result)
+ {
+ value = valueTmp;
+ dictionary.Remove(key);
+ return true;
+ }
+ value = default(V);
+ return false;
+ }
+ #endif
+
}
}
diff --git a/core/Crosscutting/KafkaExtensions.cs b/core/Crosscutting/KafkaExtensions.cs
index 553b4132..758471f3 100644
--- a/core/Crosscutting/KafkaExtensions.cs
+++ b/core/Crosscutting/KafkaExtensions.cs
@@ -86,6 +86,9 @@ internal static Headers Clone(this Headers headers)
originHeader.ForEach(h => copyHeaders.Add(h.Key, h.Item2));
return copyHeaders;
}
+
+ internal static long GetEstimatedSize(this Headers headers)
+ => headers.Sum(header => header.Key.Length + header.GetValueBytes().LongLength);
internal static Headers AddOrUpdate(this Headers headers, string key, byte[] value)
{
diff --git a/core/Crosscutting/SortedDictionaryExtensions.cs b/core/Crosscutting/SortedDictionaryExtensions.cs
index 4d2b05be..d946f324 100644
--- a/core/Crosscutting/SortedDictionaryExtensions.cs
+++ b/core/Crosscutting/SortedDictionaryExtensions.cs
@@ -33,5 +33,19 @@ internal static IEnumerable> SubMap(this SortedDictiona
}
}
}
+
+ internal static IEnumerable> TailMap(this SortedDictionary sortedDic, K keyFrom,
+ bool inclusive)
+ {
+ foreach (K k in sortedDic.Keys)
+ {
+ int rT = sortedDic.Comparer.Compare(keyFrom, k);
+
+ if ((inclusive && rT <= 0) || (!inclusive && rT < 0))
+ {
+ yield return new KeyValuePair(k, sortedDic[k]);
+ }
+ }
+ }
}
}
diff --git a/core/Crosscutting/Utils.cs b/core/Crosscutting/Utils.cs
index 357ea00d..4996d7fc 100644
--- a/core/Crosscutting/Utils.cs
+++ b/core/Crosscutting/Utils.cs
@@ -42,5 +42,11 @@ public static bool IsNumeric(object expression, out Double number)
, NumberFormatInfo.InvariantInfo
, out number);
}
+
+ public static void CheckIfNotNull(object parameter, string nameAccessor)
+ {
+ if(parameter == null)
+ throw new ArgumentException($"{nameAccessor} must not be null");
+ }
}
}
diff --git a/core/KafkaStream.cs b/core/KafkaStream.cs
index 662bba83..babef907 100644
--- a/core/KafkaStream.cs
+++ b/core/KafkaStream.cs
@@ -324,7 +324,7 @@ string Protect(string str)
() => StreamState != null && StreamState.IsRunning() ? 1 : 0,
() => threads.Count(t => t.State != ThreadState.DEAD && t.State != ThreadState.PENDING_SHUTDOWN),
metricsRegistry);
-
+
threads = new IThread[numStreamThreads];
var threadState = new Dictionary();
diff --git a/core/Metrics/Internal/CachingMetrics.cs b/core/Metrics/Internal/CachingMetrics.cs
new file mode 100644
index 00000000..3683c887
--- /dev/null
+++ b/core/Metrics/Internal/CachingMetrics.cs
@@ -0,0 +1,69 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Streamiz.Kafka.Net.Processors.Internal;
+
+namespace Streamiz.Kafka.Net.Metrics.Internal
+{
+ internal class CachingMetrics
+ {
+ internal static string CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
+ private static string CACHE_SIZE_BYTES_TOTAL_DESCRIPTION = "The total size in bytes of this cache state store.";
+
+ internal static string HIT_RATIO = "hit-ratio";
+ private static string HIT_RATIO_DESCRIPTION = "The hit ratio defined as the ratio of cache read hits over the total cache read requests.";
+ private static string HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio";
+ private static string HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio";
+ private static string HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio";
+
+ public static Sensor HitRatioSensor(
+ TaskId taskId,
+ string storeType,
+ string storeName,
+ StreamMetricsRegistry streamsMetrics) {
+
+ Sensor sensor;
+ string hitMetricName = HIT_RATIO;
+ IDictionary tags =
+ streamsMetrics.StoreLevelTags(GetThreadId(), taskId.ToString(), storeName, storeType);
+
+ sensor = streamsMetrics.StoreLevelSensor(GetThreadId(), taskId, storeName, hitMetricName, HIT_RATIO_DESCRIPTION, MetricsRecordingLevel.DEBUG);
+
+ SensorHelper.AddAvgAndMinAndMaxToSensor(
+ sensor,
+ StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP,
+ tags,
+ hitMetricName,
+ HIT_RATIO_AVG_DESCRIPTION,
+ HIT_RATIO_MAX_DESCRIPTION,
+ HIT_RATIO_MIN_DESCRIPTION);
+
+ return sensor;
+ }
+
+ public static Sensor TotalCacheSizeBytesSensor(
+ TaskId taskId,
+ string storeType,
+ string storeName,
+ StreamMetricsRegistry streamsMetrics) {
+
+ Sensor sensor;
+ string totalCacheMetricName = CACHE_SIZE_BYTES_TOTAL;
+ IDictionary tags =
+ streamsMetrics.StoreLevelTags(GetThreadId(), taskId.ToString(), storeName, storeType);
+
+ sensor = streamsMetrics.StoreLevelSensor(GetThreadId(), taskId, storeName, totalCacheMetricName, CACHE_SIZE_BYTES_TOTAL_DESCRIPTION, MetricsRecordingLevel.DEBUG);
+
+ SensorHelper.AddValueMetricToSensor(
+ sensor,
+ StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP,
+ tags,
+ totalCacheMetricName,
+ CACHE_SIZE_BYTES_TOTAL_DESCRIPTION);
+
+ return sensor;
+ }
+
+ private static string GetThreadId() => Thread.CurrentThread.Name;
+ }
+}
\ No newline at end of file
diff --git a/core/Metrics/Internal/SensorHelper.cs b/core/Metrics/Internal/SensorHelper.cs
index a104e032..13c532ac 100644
--- a/core/Metrics/Internal/SensorHelper.cs
+++ b/core/Metrics/Internal/SensorHelper.cs
@@ -125,6 +125,40 @@ internal static void AddAvgAndMaxToSensor(Sensor sensor,
);
}
+ internal static void AddAvgAndMinAndMaxToSensor(Sensor sensor,
+ string group,
+ IDictionary tags,
+ string operation,
+ string descriptionOfAvg,
+ string descriptionOfMax,
+ string descriptionOfMin) {
+
+ sensor.AddStatMetric(
+ new MetricName(
+ operation + StreamMetricsRegistry.AVG_SUFFIX,
+ group,
+ descriptionOfAvg,
+ tags),
+ new Avg()
+ );
+ sensor.AddStatMetric(
+ new MetricName(
+ operation + StreamMetricsRegistry.MAX_SUFFIX,
+ group,
+ descriptionOfMax,
+ tags),
+ new Max()
+ );
+ sensor.AddStatMetric(
+ new MetricName(
+ operation + StreamMetricsRegistry.MIN_SUFFIX,
+ group,
+ descriptionOfMin,
+ tags),
+ new Min()
+ );
+ }
+
internal static void AddRateOfSumAndSumMetricsToSensor(Sensor sensor,
string group,
IDictionary tags,
diff --git a/core/Metrics/Sensor.cs b/core/Metrics/Sensor.cs
index d5c2c00d..5b1a4d0b 100644
--- a/core/Metrics/Sensor.cs
+++ b/core/Metrics/Sensor.cs
@@ -154,6 +154,9 @@ internal void Record()
internal void Record(long value)
=> Record(value, DateTime.Now.GetMilliseconds());
+ internal void Record(double value)
+ => Record(value, DateTime.Now.GetMilliseconds());
+
internal virtual void Record(double value, long timeMs)
=> RecordInternal(value, timeMs);
diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs
index 61f4b63d..e47fcff7 100644
--- a/core/Mock/ClusterInMemoryTopologyDriver.cs
+++ b/core/Mock/ClusterInMemoryTopologyDriver.cs
@@ -226,6 +226,12 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old,
}
}
}
+
+ public void TriggerCommit()
+ {
+ throw new NotImplementedException();
+ //((StreamThread)threadTopology)?.Manager.CommitAll();
+ }
#endregion
}
diff --git a/core/Mock/IBehaviorTopologyTestDriver.cs b/core/Mock/IBehaviorTopologyTestDriver.cs
index 40cd5b92..68958831 100644
--- a/core/Mock/IBehaviorTopologyTestDriver.cs
+++ b/core/Mock/IBehaviorTopologyTestDriver.cs
@@ -14,5 +14,6 @@ internal interface IBehaviorTopologyTestDriver : IDisposable
TestOutputTopic CreateOutputTopic(string topicName, TimeSpan consumeTimeout, ISerDes keySerdes = null, ISerDes valueSerdes = null);
TestMultiInputTopic CreateMultiInputTopic(string[] topics, ISerDes keySerdes = null, ISerDes valueSerdes = null);
IStateStore GetStateStore(string name);
+ void TriggerCommit();
}
}
diff --git a/core/Mock/TaskSynchronousTopologyDriver.cs b/core/Mock/TaskSynchronousTopologyDriver.cs
index 75d9828c..df9b06a4 100644
--- a/core/Mock/TaskSynchronousTopologyDriver.cs
+++ b/core/Mock/TaskSynchronousTopologyDriver.cs
@@ -237,7 +237,10 @@ public TestInputTopic CreateInputTopic(string topicName, ISerDes
null);
foreach (var topic in topicsLink)
+ {
+ GetTask(topic);
pipeInput.Flushed += () => ForwardRepartitionTopic(consumer, topic);
+ }
return new TestInputTopic(pipeInput, configuration, keySerdes, valueSerdes);
}
@@ -296,6 +299,21 @@ public IStateStore GetStateStore(string name)
return hasGlobalTopology ? globalProcessorContext.GetStateStore(name) : null;
}
+ public void TriggerCommit()
+ {
+ foreach(var extProcessor in externalProcessorTopologies.Values)
+ extProcessor.Flush();
+
+ foreach (var task in tasks.Values)
+ {
+ var consumer = supplier.GetConsumer(topicConfiguration.ToConsumerConfig("consumer-repartition-forwarder"),
+ null);
+ task.Commit();
+ }
+
+ globalTask?.FlushState();
+ }
+
#endregion
}
}
\ No newline at end of file
diff --git a/core/Mock/TopologyTestDriver.cs b/core/Mock/TopologyTestDriver.cs
index 168c6bf7..44c16e08 100644
--- a/core/Mock/TopologyTestDriver.cs
+++ b/core/Mock/TopologyTestDriver.cs
@@ -165,6 +165,14 @@ public void Dispose()
behavior.Dispose();
}
+ ///
+ /// Trigger the driver to commit, especially needed if you use caching
+ ///
+ public void Commit()
+ {
+ behavior.TriggerCommit();
+ }
+
#region Create Input Topic
///
diff --git a/core/ProcessorContext.cs b/core/ProcessorContext.cs
index 64b2d766..5d1033d5 100644
--- a/core/ProcessorContext.cs
+++ b/core/ProcessorContext.cs
@@ -68,6 +68,8 @@ public class ProcessorContext
///
public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}";
+ internal bool ConfigEnableCache => Configuration.StateStoreCacheMaxBytes > 0;
+
// FOR TESTING
internal ProcessorContext()
{
@@ -91,6 +93,11 @@ internal ProcessorContext UseRecordCollector(IRecordCollector collector)
return this;
}
+ internal void SetRecordMetaData(IRecordContext context)
+ {
+ RecordContext = context;
+ }
+
internal void SetRecordMetaData(ConsumeResult result)
{
RecordContext = new RecordContext(result);
diff --git a/core/Processors/AbstractKTableProcessor.cs b/core/Processors/AbstractKTableProcessor.cs
index f19641b2..c1eb653e 100644
--- a/core/Processors/AbstractKTableProcessor.cs
+++ b/core/Processors/AbstractKTableProcessor.cs
@@ -1,4 +1,5 @@
-using Streamiz.Kafka.Net.Errors;
+using Confluent.Kafka;
+using Streamiz.Kafka.Net.Errors;
using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;
using Streamiz.Kafka.Net.Table.Internal;
@@ -11,7 +12,7 @@ internal abstract class AbstractKTableProcessor : AbstractProcesso
protected readonly bool sendOldValues;
private readonly bool throwException = false;
protected ITimestampedKeyValueStore store;
- protected TimestampedTupleForwarder tupleForwarder;
+ protected TimestampedTupleForwarder tupleForwarder;
protected AbstractKTableProcessor(string queryableStoreName, bool sendOldValues, bool throwExceptionStateNull = false)
{
@@ -27,7 +28,16 @@ public override void Init(ProcessorContext context)
if (queryableStoreName != null)
{
store = (ITimestampedKeyValueStore)context.GetStateStore(queryableStoreName);
- tupleForwarder = new TimestampedTupleForwarder(this, sendOldValues);
+ tupleForwarder = new TimestampedTupleForwarder(
+ store,
+ this,kv => {
+ context.CurrentProcessor = this;
+ Forward(kv.Key,
+ new Change(sendOldValues ? kv.Value.OldValue.Value : default, kv.Value.NewValue.Value),
+ kv.Value.NewValue.Timestamp);
+ },
+ sendOldValues,
+ context.ConfigEnableCache);
}
if (throwException && (queryableStoreName == null || store == null || tupleForwarder == null))
diff --git a/core/Processors/Internal/RecordContext.cs b/core/Processors/Internal/RecordContext.cs
index ca8bc278..4aa76a05 100644
--- a/core/Processors/Internal/RecordContext.cs
+++ b/core/Processors/Internal/RecordContext.cs
@@ -5,21 +5,22 @@ namespace Streamiz.Kafka.Net.Processors.Internal
internal class RecordContext : IRecordContext
{
public RecordContext()
+ : this(new Headers(), -1, -1, -1, "")
{
- Offset = -1;
- Timestamp = -1;
- Topic = "";
- Partition = -1;
- Headers = new Headers();
+ }
+
+ public RecordContext(Headers headers, long offset, long timestamp, int partition, string topic)
+ {
+ Offset = offset;
+ Timestamp = timestamp;
+ Topic = topic;
+ Partition = partition;
+ Headers = headers;
}
public RecordContext(ConsumeResult result)
+ : this(result.Message.Headers, result.Offset, result.Message.Timestamp.UnixTimestampMs, result.Partition, result.Topic)
{
- Offset = result.Offset;
- Timestamp = result.Message.Timestamp.UnixTimestampMs;
- Topic = result.Topic;
- Partition = result.Partition;
- Headers = result.Message.Headers;
}
public long Offset { get; }
diff --git a/core/Processors/Internal/StoreChangelogReader.cs b/core/Processors/Internal/StoreChangelogReader.cs
index 249593b5..d8ca3925 100644
--- a/core/Processors/Internal/StoreChangelogReader.cs
+++ b/core/Processors/Internal/StoreChangelogReader.cs
@@ -203,7 +203,7 @@ private void RestoreChangelog(ChangelogMetadata changelogMetadata)
log.LogDebug($"Restored {numRecords} records from " +
$"changelog {changelogMetadata.StoreMetadata.Store.Name} " +
$"to store {changelogMetadata.StoreMetadata.ChangelogTopicPartition}, " +
- $"end offset is {(changelogMetadata.RestoreEndOffset.HasValue ? changelogMetadata.RestoreEndOffset.Value : "unknown")}, " +
+ $"end offset is {(changelogMetadata.RestoreEndOffset.HasValue ? changelogMetadata.RestoreEndOffset.Value.ToString() : "unknown")}, " +
$"current offset is {currentOffset}");
changelogMetadata.BufferedLimit = 0;
diff --git a/core/Processors/Internal/TimestampedTupleForwarder.cs b/core/Processors/Internal/TimestampedTupleForwarder.cs
index a3bc488a..51958ada 100644
--- a/core/Processors/Internal/TimestampedTupleForwarder.cs
+++ b/core/Processors/Internal/TimestampedTupleForwarder.cs
@@ -1,19 +1,30 @@
-using Streamiz.Kafka.Net.Table.Internal;
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.State;
+using Streamiz.Kafka.Net.State.Cache;
+using Streamiz.Kafka.Net.State.Internal;
+using Streamiz.Kafka.Net.Table.Internal;
namespace Streamiz.Kafka.Net.Processors.Internal
{
- // TODO REFACTOR
internal class TimestampedTupleForwarder
{
private readonly IProcessor processor;
private readonly bool sendOldValues;
private readonly bool cachingEnabled;
- public TimestampedTupleForwarder(IProcessor processor, bool sendOldValues)
+ public TimestampedTupleForwarder(
+ IStateStore store,
+ IProcessor processor,
+ Action>>> listener,
+ bool sendOldValues,
+ bool configCachingEnabled)
{
this.processor = processor;
this.sendOldValues = sendOldValues;
- cachingEnabled = false;
+ cachingEnabled = configCachingEnabled &&
+ ((IWrappedStateStore)store).IsCachedStore &&
+ ((ICachedStateStore>)store).SetFlushListener(listener, sendOldValues);
}
public void MaybeForward(K key, V newValue, V oldValue)
@@ -30,7 +41,8 @@ public void MaybeForward(K key, VR newValue, VR oldValue)
public void MaybeForward(K key, V newValue, V oldValue, long timestamp)
{
- processor?.Forward(key, new Change(sendOldValues ? oldValue : default, newValue), timestamp);
+ if (!cachingEnabled)
+ processor?.Forward(key, new Change(sendOldValues ? oldValue : default, newValue), timestamp);
}
public void MaybeForward(K key, VR newValue, VR oldValue, long timestamp)
diff --git a/core/Processors/KStreamWindowAggregateProcessor.cs b/core/Processors/KStreamWindowAggregateProcessor.cs
index 1470041f..2afce3da 100644
--- a/core/Processors/KStreamWindowAggregateProcessor.cs
+++ b/core/Processors/KStreamWindowAggregateProcessor.cs
@@ -3,6 +3,7 @@
using Streamiz.Kafka.Net.Stream;
using System;
using Microsoft.Extensions.Logging;
+using Streamiz.Kafka.Net.Table.Internal;
namespace Streamiz.Kafka.Net.Processors
{
@@ -36,7 +37,18 @@ public override void Init(ProcessorContext context)
{
base.Init(context);
windowStore = (ITimestampedWindowStore)context.GetStateStore(storeName);
- tupleForwarder = new TimestampedTupleForwarder, Agg>(this, sendOldValues);
+ tupleForwarder = new TimestampedTupleForwarder, Agg>(
+ windowStore,
+ this,
+ kv =>
+ {
+ context.CurrentProcessor = this;
+ Forward(kv.Key,
+ new Change(sendOldValues ? kv.Value.OldValue.Value : default, kv.Value.NewValue.Value),
+ kv.Value.NewValue.Timestamp);
+ },
+ sendOldValues,
+ context.ConfigEnableCache);
}
public override void Process(K key, V value)
diff --git a/core/Processors/KTableMapProcessor.cs b/core/Processors/KTableMapProcessor.cs
index 8b83b396..b73ea082 100644
--- a/core/Processors/KTableMapProcessor.cs
+++ b/core/Processors/KTableMapProcessor.cs
@@ -27,14 +27,23 @@ public override void Process(K key, Change value)
KeyValuePair oldPair = value.OldValue == null ? default : mapper.Apply(key, value.OldValue);
KeyValuePair newPair = value.NewValue == null ? default : mapper.Apply(key, value.NewValue);
- // if the value is null, we do not need to forward its selected key-value further
+ bool oldPairNotNull = value.OldValue != null;
+ bool newPairNotNull = value.NewValue != null;
+
// if the selected repartition key or value is null, skip
// forward oldPair first, to be consistent with reduce and aggregate
- if (oldPair.Key != null && oldPair.Value != null)
- Forward(oldPair.Key, new Change(oldPair.Value, default));
-
- if (newPair.Key != null && newPair.Value != null)
- Forward(newPair.Key, new Change(default, newPair.Value));
+ if (oldPairNotNull && newPairNotNull && oldPair.Key.Equals(newPair.Key))
+ {
+ Forward(oldPair.Key, new Change(oldPair.Value, newPair.Value));
+ }
+ else
+ {
+ if(oldPairNotNull)
+ Forward(oldPair.Key, new Change(oldPair.Value, default));
+
+ if(newPairNotNull)
+ Forward(newPair.Key, new Change(default, newPair.Value));
+ }
}
}
}
diff --git a/core/Processors/KTableSourceProcessor.cs b/core/Processors/KTableSourceProcessor.cs
index c75bfc72..d7002e0c 100644
--- a/core/Processors/KTableSourceProcessor.cs
+++ b/core/Processors/KTableSourceProcessor.cs
@@ -28,7 +28,17 @@ public override void Init(ProcessorContext context)
if (this.queryableName != null)
{
store = (ITimestampedKeyValueStore)context.GetStateStore(queryableName);
- tupleForwarder = new TimestampedTupleForwarder(this, sendOldValues);
+ tupleForwarder = new TimestampedTupleForwarder(
+ store,
+ this,
+ kv => {
+ context.CurrentProcessor = this;
+ context.CurrentProcessor.Forward(kv.Key,
+ new Change(sendOldValues ? (kv.Value.OldValue != null ? kv.Value.OldValue.Value : default) : default, kv.Value.NewValue.Value),
+ kv.Value.NewValue.Timestamp);
+ },
+ sendOldValues,
+ context.ConfigEnableCache);
}
}
diff --git a/core/Processors/StatefullProcessor.cs b/core/Processors/StatefullProcessor.cs
index ae972237..1ffde514 100644
--- a/core/Processors/StatefullProcessor.cs
+++ b/core/Processors/StatefullProcessor.cs
@@ -1,5 +1,7 @@
-using Streamiz.Kafka.Net.Processors.Internal;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Processors.Internal;
using Streamiz.Kafka.Net.State;
+using Streamiz.Kafka.Net.Table.Internal;
namespace Streamiz.Kafka.Net.Processors
{
@@ -8,7 +10,7 @@ internal abstract class StatefullProcessor : AbstractProcessor store;
- protected TimestampedTupleForwarder tupleForwarder;
+ protected TimestampedTupleForwarder tupleForwarder;
protected StatefullProcessor(string storeName, bool sendOldValues)
{
@@ -20,7 +22,18 @@ public override void Init(ProcessorContext context)
{
base.Init(context);
store = (ITimestampedKeyValueStore)context.GetStateStore(storeName);
- tupleForwarder = new TimestampedTupleForwarder(this, sendOldValues);
+ tupleForwarder = new TimestampedTupleForwarder(
+ store,
+ this,
+ kv =>
+ {
+ context.CurrentProcessor = this;
+ Forward(kv.Key,
+ new Change(sendOldValues ? kv.Value.OldValue.Value : default, kv.Value.NewValue.Value),
+ kv.Value.NewValue.Timestamp);
+ },
+ sendOldValues,
+ context.ConfigEnableCache);
}
}
}
diff --git a/core/Processors/StreamThread.cs b/core/Processors/StreamThread.cs
index 9bb6e1c0..8574e6c9 100644
--- a/core/Processors/StreamThread.cs
+++ b/core/Processors/StreamThread.cs
@@ -106,7 +106,7 @@ internal static IThread Create(string threadId, string clientId, InternalTopolog
private readonly ILogger log = Logger.GetLogger(typeof(StreamThread));
private readonly Thread thread;
private readonly IConsumer consumer;
- private readonly TaskManager manager;
+ private TaskManager Manager { get; }
private readonly InternalTopologyBuilder builder;
private readonly TimeSpan consumeTimeout;
private readonly string threadId;
@@ -155,7 +155,7 @@ private StreamThread(string threadId, string clientId, TaskManager manager, ICon
InternalTopologyBuilder builder, IChangelogReader storeChangelogReader,
StreamMetricsRegistry streamMetricsRegistry, TimeSpan timeSpan, long commitInterval)
{
- this.manager = manager;
+ this.Manager = manager;
this.consumer = consumer;
this.builder = builder;
consumeTimeout = timeSpan;
@@ -234,7 +234,7 @@ public void Run()
try
{
- if (!manager.RebalanceInProgress)
+ if (!Manager.RebalanceInProgress)
{
RestorePhase();
@@ -268,10 +268,10 @@ public void Run()
{
long processLatency = 0;
- if (!manager.RebalanceInProgress)
+ if (!Manager.RebalanceInProgress)
processLatency = ActionHelper.MeasureLatency(() =>
{
- processed = manager.Process(now);
+ processed = Manager.Process(now);
});
else
processed = 0;
@@ -296,7 +296,7 @@ public void Run()
int punctuated = 0;
var punctuateLatency = ActionHelper.MeasureLatency(() =>
{
- punctuated = manager.Punctuate();
+ punctuated = Manager.Punctuate();
});
totalPunctuateLatency += punctuateLatency;
summaryPunctuated += punctuated;
@@ -399,16 +399,16 @@ public void Run()
private void RestorePhase()
{
- if (State == ThreadState.PARTITIONS_ASSIGNED || State == ThreadState.RUNNING && manager.NeedRestoration())
+ if (State == ThreadState.PARTITIONS_ASSIGNED || State == ThreadState.RUNNING && Manager.NeedRestoration())
{
log.LogDebug($"{logPrefix} State is {State}, initializing and restoring tasks if necessary");
restorationInProgress = true;
- if (manager.TryToCompleteRestoration())
+ if (Manager.TryToCompleteRestoration())
{
restorationInProgress = false;
log.LogInformation(
- $"Restoration took {DateTime.Now.GetMilliseconds() - LastPartitionAssignedTime}ms for all tasks {string.Join(",", manager.ActiveTaskIds)}");
+ $"Restoration took {DateTime.Now.GetMilliseconds() - LastPartitionAssignedTime}ms for all tasks {string.Join(",", Manager.ActiveTaskIds)}");
if (State == ThreadState.PARTITIONS_ASSIGNED)
SetState(ThreadState.RUNNING);
}
@@ -435,7 +435,7 @@ private int AddToTasks(IEnumerable> records)
foreach (var record in records)
{
count++;
- var task = manager.ActiveTaskFor(record.TopicPartition);
+ var task = Manager.ActiveTaskFor(record.TopicPartition);
if (task != null)
{
if (task.IsClosed)
@@ -482,7 +482,7 @@ public void Start(CancellationToken token)
ThreadMetrics.CreateStartThreadSensor(threadId, DateTime.Now.GetMilliseconds(), streamMetricsRegistry);
}
- public IEnumerable ActiveTasks => manager.ActiveTasks;
+ public IEnumerable ActiveTasks => Manager.ActiveTasks;
public long LastPartitionAssignedTime { get; internal set; }
@@ -501,7 +501,7 @@ private void HandleTaskMigrated(TaskMigratedException e)
"{LogPrefix}Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group",
logPrefix);
- manager.HandleLostAll();
+ Manager.HandleLostAll();
consumer.Unsubscribe();
consumer.Subscribe(builder.GetSourceTopics());
}
@@ -512,12 +512,12 @@ private void HandleInnerException()
"{LogPrefix}Detected that the thread throw an inner exception. Your configuration manager has decided to continue running stream processing. So will close out all assigned tasks and rejoin the consumer group",
logPrefix);
- manager.HandleLostAll();
+ Manager.HandleLostAll();
consumer.Unsubscribe();
consumer.Subscribe(builder.GetSourceTopics());
}
- private int Commit()
+ internal int Commit() // for testing
{
int committed = 0;
if (DateTime.Now - lastCommit > TimeSpan.FromMilliseconds(commitTimeMs))
@@ -525,12 +525,12 @@ private int Commit()
DateTime beginCommit = DateTime.Now;
log.LogDebug(
"Committing all active tasks {TaskIDs} since {DateTime}ms has elapsed (commit interval is {CommitTime}ms)",
- string.Join(",", manager.ActiveTaskIds), (DateTime.Now - lastCommit).TotalMilliseconds,
+ string.Join(",", Manager.ActiveTaskIds), (DateTime.Now - lastCommit).TotalMilliseconds,
commitTimeMs);
- committed = manager.CommitAll();
+ committed = Manager.CommitAll();
if (committed > 0)
log.LogDebug("Committed all active tasks {TaskIDs} in {TimeElapsed}ms",
- string.Join(",", manager.ActiveTaskIds), (DateTime.Now - beginCommit).TotalMilliseconds);
+ string.Join(",", Manager.ActiveTaskIds), (DateTime.Now - beginCommit).TotalMilliseconds);
if (committed == -1)
{
@@ -571,7 +571,7 @@ private void CompleteShutdown()
IsRunning = false;
- manager.Close();
+ Manager.Close();
consumer.Unsubscribe();
consumer.Close();
@@ -595,7 +595,7 @@ private void CompleteShutdown()
private IEnumerable> PollRequest(TimeSpan ts)
{
- if (!restorationInProgress && !manager.RebalanceInProgress)
+ if (!restorationInProgress && !Manager.RebalanceInProgress)
{
lastPollMs = DateTime.Now.GetMilliseconds();
return consumer.ConsumeRecords(ts, streamConfig.MaxPollRecords);
diff --git a/core/SerDes/Internal/BytesSerDes.cs b/core/SerDes/Internal/BytesSerDes.cs
index 0917b884..de9884e7 100644
--- a/core/SerDes/Internal/BytesSerDes.cs
+++ b/core/SerDes/Internal/BytesSerDes.cs
@@ -9,6 +9,6 @@ public override Bytes Deserialize(byte[] data, SerializationContext context)
=> Bytes.Wrap(data);
public override byte[] Serialize(Bytes data, SerializationContext context)
- => data != null ? data.Get : null;
+ => data?.Get;
}
}
diff --git a/core/State/AbstractStoreBuilder.cs b/core/State/AbstractStoreBuilder.cs
index 1a0b619a..72169347 100644
--- a/core/State/AbstractStoreBuilder.cs
+++ b/core/State/AbstractStoreBuilder.cs
@@ -30,8 +30,8 @@ public abstract class AbstractStoreBuilder : IStoreBuilder
/// Value serdes
///
protected readonly ISerDes valueSerdes;
-
- // private bool enableCaching;
+
+ private bool enableCaching = false;
private bool enableLogging = true;
///
@@ -59,6 +59,11 @@ public abstract class AbstractStoreBuilder : IStoreBuilder
///
public bool LoggingEnabled => enableLogging;
+ ///
+ /// Caching enabled or not
+ ///
+ public bool CachingEnabled => enableCaching;
+
///
///
///
@@ -78,7 +83,7 @@ protected AbstractStoreBuilder(String name, ISerDes keySerde, ISerDes valu
///
public IStoreBuilder WithCachingEnabled()
{
- //enableCaching = true;
+ enableCaching = true;
return this;
}
@@ -88,7 +93,7 @@ public IStoreBuilder WithCachingEnabled()
///
public IStoreBuilder WithCachingDisabled()
{
- //enableCaching = false;
+ enableCaching = false;
return this;
}
diff --git a/core/State/Cache/CacheEntryValue.cs b/core/State/Cache/CacheEntryValue.cs
new file mode 100644
index 00000000..e5d32313
--- /dev/null
+++ b/core/State/Cache/CacheEntryValue.cs
@@ -0,0 +1,33 @@
+using Confluent.Kafka;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.Processors.Internal;
+
+namespace Streamiz.Kafka.Net.State.Cache
+{
+ internal class CacheEntryValue
+ {
+ public byte[] Value { get; }
+ public IRecordContext Context { get; }
+
+ internal CacheEntryValue(byte[] value)
+ {
+ Context = new RecordContext();
+ Value = value;
+ }
+
+ internal CacheEntryValue(byte[] value, Headers headers, long offset, long timestamp, int partition, string topic)
+ {
+ Context = new RecordContext(headers.Clone(), offset, timestamp, partition, topic);
+ Value = value;
+ }
+
+ public long Size =>
+ (Value != null ? Value.LongLength : 0) +
+ sizeof(int) + // partition
+ sizeof(long) + //offset
+ sizeof(long) + // timestamp
+ Context.Topic.Length + // topic length
+ Context.Headers.GetEstimatedSize(); // headers size
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/CachingKeyValueStore.cs b/core/State/Cache/CachingKeyValueStore.cs
new file mode 100644
index 00000000..7ea8e479
--- /dev/null
+++ b/core/State/Cache/CachingKeyValueStore.cs
@@ -0,0 +1,287 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Metrics;
+using Streamiz.Kafka.Net.Metrics.Internal;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.State.Cache.Enumerator;
+using Streamiz.Kafka.Net.State.Cache.Internal;
+using Streamiz.Kafka.Net.State.Enumerator;
+using Streamiz.Kafka.Net.State.Internal;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.State.Cache
+{
+ internal class CachingKeyValueStore :
+ WrappedStateStore>,
+ IKeyValueStore,
+ ICachedStateStore
+ {
+ private MemoryCache cache;
+ private Action>> flushListener;
+ private bool sendOldValue;
+ private bool cachingEnabled;
+
+ private Sensor hitRatioSensor = NoRunnableSensor.Empty;
+ private Sensor totalCacheSizeSensor = NoRunnableSensor.Empty;
+
+ public CachingKeyValueStore(IKeyValueStore wrapped)
+ : base(wrapped)
+ { }
+
+ protected virtual void RegisterMetrics()
+ {
+ if (cachingEnabled)
+ {
+ hitRatioSensor = CachingMetrics.HitRatioSensor(context.Id, "cache-store", Name, context.Metrics);
+ totalCacheSizeSensor =
+ CachingMetrics.TotalCacheSizeBytesSensor(context.Id, "cache-store", Name, context.Metrics);
+ }
+ }
+
+ public override bool IsCachedStore => true;
+
+ public bool SetFlushListener(Action>> listener, bool sendOldChanges)
+ {
+ flushListener = listener;
+ sendOldValue = sendOldChanges;
+ return true;
+ }
+
+ // Only for testing
+ internal void CreateCache(ProcessorContext context)
+ {
+ cachingEnabled = context.Configuration.StateStoreCacheMaxBytes > 0;
+ if(cachingEnabled)
+ cache = new MemoryCache(new MemoryCacheOptions {
+ SizeLimit = context.Configuration.StateStoreCacheMaxBytes,
+ CompactionPercentage = .20
+ }, new BytesComparer());
+ }
+
+ private byte[] GetInternal(Bytes key)
+ {
+ if (cachingEnabled)
+ {
+ byte[] value;
+
+ if (cache.TryGetValue(key, out CacheEntryValue priorEntry))
+ value = priorEntry.Value;
+ else
+ {
+ value = wrapped.Get(key);
+ if(value != null)
+ PutInternal(key, new CacheEntryValue(value), true);
+ }
+
+ var currentStat = cache.GetCurrentStatistics();
+ hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));
+
+ return value;
+ }
+
+ return wrapped.Get(key);
+ }
+
+ public override void Init(ProcessorContext context, IStateStore root)
+ {
+ base.Init(context, root);
+ CreateCache(context);
+ RegisterMetrics();
+ }
+
+ private void UpdateRatioSensor()
+ {
+ var currentStat = cache.GetCurrentStatistics();
+ hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));
+ }
+
+ private void CacheEntryEviction(Bytes key, CacheEntryValue value, EvictionReason reason, MemoryCache state)
+ {
+ if (reason is EvictionReason.Replaced or EvictionReason.None) return;
+
+ if (flushListener != null)
+ {
+ byte[] rawNewValue = value.Value;
+ byte[] rawOldValue = rawNewValue == null || sendOldValue ? wrapped.Get(key) : null;
+
+ // this is an optimization: if this key did not exist in underlying store and also not in the cache,
+ // we can skip flushing to downstream as well as writing to underlying store
+ if (rawNewValue != null || rawOldValue != null)
+ {
+ var currentContext = context.RecordContext;
+ context.SetRecordMetaData(value.Context);
+ wrapped.Put(key, rawNewValue);
+ flushListener(new KeyValuePair>(
+ key.Get,
+ new Change(sendOldValue ? rawOldValue : null, rawNewValue)));
+ context.SetRecordMetaData(currentContext);
+ }
+ }
+ else
+ {
+ var currentContext = context.RecordContext;
+ context.SetRecordMetaData(value.Context);
+ wrapped.Put(key, value.Value);
+ context.SetRecordMetaData(currentContext);
+ }
+
+ totalCacheSizeSensor.Record(cache.Size);
+ }
+
+ public override void Flush()
+ {
+ if (cachingEnabled)
+ {
+ cache.Compact(1); // Compact 100% of the cache
+ base.Flush();
+ }
+ else
+ wrapped.Flush();
+ }
+
+ public byte[] Get(Bytes key)
+ => GetInternal(key);
+
+ public IKeyValueEnumerator Range(Bytes from, Bytes to)
+ {
+ if (cachingEnabled)
+ {
+ var storeEnumerator = wrapped.Range(from, to);
+ var cacheEnumerator =
+ new CacheEnumerator(
+ cache.KeyRange(from, to, true, true),
+ cache,
+ UpdateRatioSensor);
+
+ return new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, true);
+ }
+ return wrapped.Range(from, to);
+ }
+
+ public IKeyValueEnumerator ReverseRange(Bytes from, Bytes to)
+ {
+ if (cachingEnabled)
+ {
+ var storeEnumerator = wrapped.ReverseRange(from, to);
+ var cacheEnumerator =
+ new CacheEnumerator(
+ cache.KeyRange(from, to, true, false),
+ cache,
+ UpdateRatioSensor);
+
+ return new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, false);
+ }
+
+ return wrapped.ReverseRange(from, to);
+ }
+
+ private IEnumerable> InternalAll(bool reverse)
+ {
+ var storeEnumerator = new WrapEnumerableKeyValueEnumerator(wrapped.All());
+ var cacheEnumerator = new CacheEnumerator(
+ cache.KeySetEnumerable(reverse),
+ cache,
+ UpdateRatioSensor);
+
+ var mergedEnumerator = new MergedStoredCacheKeyValueEnumerator(cacheEnumerator, storeEnumerator, reverse);
+ while (mergedEnumerator.MoveNext())
+ if (mergedEnumerator.Current != null)
+ yield return mergedEnumerator.Current.Value;
+ }
+
+ public IEnumerable> All()
+ {
+ if (cachingEnabled)
+ return InternalAll(true);
+ return wrapped.All();
+ }
+
+ public IEnumerable> ReverseAll()
+ {
+ if (cachingEnabled)
+ return InternalAll(false);
+
+ return wrapped.ReverseAll();
+ }
+
+ public long ApproximateNumEntries() => cachingEnabled ? cache.Count : wrapped.ApproximateNumEntries();
+
+ public void Put(Bytes key, byte[] value)
+ {
+ if (cachingEnabled)
+ {
+ var cacheEntry = new CacheEntryValue(
+ value,
+ context.RecordContext.Headers,
+ context.Offset,
+ context.Timestamp,
+ context.Partition,
+ context.Topic);
+
+ PutInternal(key, cacheEntry);
+ }
+ else
+ wrapped.Put(key, value);
+ }
+
+ private void PutInternal(Bytes key, CacheEntryValue entry, bool fromWrappedCache = false)
+ {
+ long totalSize = key.Get.LongLength + entry.Size;
+
+ var memoryCacheEntryOptions = new MemoryCacheEntryOptions()
+ .SetSize(totalSize)
+ .RegisterPostEvictionCallback(CacheEntryEviction, cache);
+
+ cache.Set(key, entry, memoryCacheEntryOptions, fromWrappedCache ? EvictionReason.None : EvictionReason.Setted);
+ totalCacheSizeSensor.Record(cache.Size);
+ }
+
+ public byte[] PutIfAbsent(Bytes key, byte[] value)
+ {
+ if (cachingEnabled)
+ {
+ var v = GetInternal(key);
+ if (v == null)
+ Put(key, value);
+ return v;
+ }
+
+ return wrapped.PutIfAbsent(key, value);
+ }
+
+ public void PutAll(IEnumerable> entries)
+ {
+ if (cachingEnabled)
+ {
+ foreach (var entry in entries)
+ Put(entry.Key, entry.Value);
+ }
+ else
+ wrapped.PutAll(entries);
+ }
+
+ public byte[] Delete(Bytes key)
+ {
+ if (cachingEnabled)
+ {
+ var rawValue = Get(key);
+ Put(key, null);
+ return rawValue;
+ }
+
+ return wrapped.Delete(key);
+ }
+
+ public new void Close()
+ {
+ if (cachingEnabled)
+ {
+ cache.Dispose();
+ base.Close();
+ }
+ else
+ wrapped.Close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/CachingWindowStore.cs b/core/State/Cache/CachingWindowStore.cs
new file mode 100644
index 00000000..05997ea4
--- /dev/null
+++ b/core/State/Cache/CachingWindowStore.cs
@@ -0,0 +1,289 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Metrics;
+using Streamiz.Kafka.Net.Metrics.Internal;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.SerDes.Internal;
+using Streamiz.Kafka.Net.State.Cache.Enumerator;
+using Streamiz.Kafka.Net.State.Cache.Internal;
+using Streamiz.Kafka.Net.State.Enumerator;
+using Streamiz.Kafka.Net.State.Helper;
+using Streamiz.Kafka.Net.State.Internal;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.State.Cache
+{
+ // TODO : involve hit Ratio
+ internal class CachingWindowStore :
+ WrappedStateStore>,
+ IWindowStore,
+ ICachedStateStore
+ {
+ private readonly long _windowSize;
+
+ private MemoryCache cache;
+ private bool cachingEnabled;
+ private bool sendOldValue;
+
+ private Action>> flushListener;
+
+ private Sensor hitRatioSensor;
+ private Sensor totalCacheSizeSensor;
+
+ public CachingWindowStore(
+ IWindowStore wrapped,
+ long windowSize,
+ long segmentInterval,
+ IKeySchema keySchema)
+ : base(wrapped)
+ {
+ _windowSize = windowSize;
+ SegmentInterval = segmentInterval;
+ KeySchema = keySchema;
+ SegmentCacheFunction = new SegmentedCacheFunction(KeySchema, segmentInterval);
+ MaxObservedTimestamp = -1;
+ }
+
+ internal MemoryCache Cache => cache;
+ internal long MaxObservedTimestamp { get; set; }
+ internal long SegmentInterval { get; }
+ internal ICacheFunction SegmentCacheFunction { get; }
+ internal IKeySchema KeySchema { get; }
+
+ public override bool IsCachedStore => true;
+
+ public override void Init(ProcessorContext context, IStateStore root)
+ {
+ base.Init(context, root);
+ CreateCache(context);
+ RegisterMetrics();
+ }
+
+ protected virtual void RegisterMetrics()
+ {
+ if (cachingEnabled)
+ {
+ hitRatioSensor = CachingMetrics.HitRatioSensor(context.Id, "cache-window-store", Name, context.Metrics);
+ totalCacheSizeSensor =
+ CachingMetrics.TotalCacheSizeBytesSensor(context.Id, "cache-window-store", Name, context.Metrics);
+ }
+ }
+
+ // Only for testing
+ internal void CreateCache(ProcessorContext context)
+ {
+ cachingEnabled = context.Configuration.StateStoreCacheMaxBytes > 0;
+ if(cachingEnabled)
+ cache = new MemoryCache(new MemoryCacheOptions {
+ SizeLimit = context.Configuration.StateStoreCacheMaxBytes,
+ CompactionPercentage = .20
+ }, new BytesComparer());
+ }
+
+ public byte[] Fetch(Bytes key, long time)
+ {
+ if (cachingEnabled)
+ {
+ var bytesKey = WindowKeyHelper.ToStoreKeyBinary(key, time, 0);
+ var cacheKey = SegmentCacheFunction.CacheKey(bytesKey);
+
+ byte[] value = null;
+
+ if (cache.TryGetValue(cacheKey, out CacheEntryValue priorEntry))
+ value = priorEntry.Value;
+
+ var currentStat = cache.GetCurrentStatistics();
+ hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));
+
+ return value ?? wrapped.Fetch(key, time);
+ }
+
+ return wrapped.Fetch(key, time);
+ }
+
+ public IWindowStoreEnumerator Fetch(Bytes key, DateTime from, DateTime to)
+ => Fetch(key, from.GetMilliseconds(), to.GetMilliseconds());
+
+ public IWindowStoreEnumerator Fetch(Bytes key, long from, long to)
+ {
+ if (cachingEnabled)
+ {
+ var wrappedEnumerator = wrapped.Fetch(key, from, to);
+ IKeyValueEnumerator cacheEnumerator = wrapped.Persistent
+ ? new CacheEnumeratorWrapper(this, key, from, to, true)
+ : FetchInternal(
+ SegmentCacheFunction.CacheKey(KeySchema.LowerRangeFixedSize(key, from)),
+ SegmentCacheFunction.CacheKey(KeySchema.UpperRangeFixedSize(key, to)), true);
+
+ var hasNextCondition = KeySchema.HasNextCondition(key, key, from, to);
+ var filteredCacheEnumerator =
+ new FilteredCacheEnumerator(cacheEnumerator, hasNextCondition, SegmentCacheFunction);
+
+ return new MergedSortedCacheWindowStoreEnumerator(
+ SegmentCacheFunction,
+ filteredCacheEnumerator,
+ wrappedEnumerator,
+ true);
+ }
+ return wrapped.Fetch(key, from, to);
+ }
+
+ public IKeyValueEnumerator, byte[]> All()
+ {
+ if (cachingEnabled)
+ {
+ var wrappedEnumerator = wrapped.All();
+ var cacheEnumerator = new CacheEnumerator(
+ cache.KeySetEnumerable(true),
+ cache,
+ UpdateRatioSensor);
+
+ return new MergedSortedCacheWindowStoreKeyValueEnumerator(
+ cacheEnumerator,
+ wrappedEnumerator,
+ _windowSize,
+ SegmentCacheFunction,
+ new BytesSerDes(),
+ changelogTopic,
+ WindowKeyHelper.FromStoreKey,
+ WindowKeyHelper.ToStoreKeyBinary,
+ true);
+ }
+ return wrapped.All();
+ }
+
+ public IKeyValueEnumerator, byte[]> FetchAll(DateTime from, DateTime to)
+ {
+ if (cachingEnabled)
+ {
+ var wrappedEnumerator = wrapped.FetchAll(from, to);
+ var cacheEnumerator = new CacheEnumerator(
+ cache.KeySetEnumerable(true),
+ cache,
+ UpdateRatioSensor
+ );
+
+ var hasNextCondition =
+ KeySchema.HasNextCondition(null, null, from.GetMilliseconds(), to.GetMilliseconds());
+ var filteredCacheEnumerator =
+ new FilteredCacheEnumerator(cacheEnumerator, hasNextCondition, SegmentCacheFunction);
+
+ return new MergedSortedCacheWindowStoreKeyValueEnumerator(
+ filteredCacheEnumerator,
+ wrappedEnumerator,
+ _windowSize,
+ SegmentCacheFunction,
+ new BytesSerDes(),
+ changelogTopic,
+ WindowKeyHelper.FromStoreKey,
+ WindowKeyHelper.ToStoreKeyBinary,
+ true);
+ }
+
+ return wrapped.FetchAll(from, to);
+ }
+
+ public void Put(Bytes key, byte[] value, long windowStartTimestamp)
+ {
+ if (cachingEnabled)
+ {
+ var keyBytes = WindowKeyHelper.ToStoreKeyBinary(key, windowStartTimestamp, 0);
+ var cacheEntry = new CacheEntryValue(
+ value,
+ context.RecordContext.Headers,
+ context.Offset,
+ context.Timestamp,
+ context.Partition,
+ context.Topic);
+
+ PutInternal(SegmentCacheFunction.CacheKey(keyBytes), cacheEntry);
+ MaxObservedTimestamp = Math.Max(KeySchema.SegmentTimestamp(keyBytes), MaxObservedTimestamp);
+ }
+ else
+ wrapped.Put(key, value, windowStartTimestamp);
+ }
+
+ public override void Flush()
+ {
+ if (cachingEnabled)
+ {
+ cache.Compact(1); // Compact 100% of the cache
+ base.Flush();
+ }
+ else
+ wrapped.Flush();
+ }
+
+ public bool SetFlushListener(Action>> listener, bool sendOldChanges)
+ {
+ flushListener = listener;
+ sendOldValue = sendOldChanges;
+ return true;
+ }
+
+ private void PutInternal(Bytes key, CacheEntryValue entry, bool fromWrappedCache = false)
+ {
+ long totalSize = key.Get.LongLength + entry.Size;
+
+ var memoryCacheEntryOptions = new MemoryCacheEntryOptions()
+ .SetSize(totalSize)
+ .RegisterPostEvictionCallback(CacheEntryEviction, cache);
+
+ cache.Set(key, entry, memoryCacheEntryOptions, fromWrappedCache ? EvictionReason.None : EvictionReason.Setted);
+ totalCacheSizeSensor.Record(cache.Size);
+ }
+
+ internal CacheEnumerator FetchInternal(Bytes keyFrom, Bytes keyTo, bool forward)
+ {
+ return new CacheEnumerator(
+ cache.KeyRange(keyFrom, keyTo, true, forward),
+ cache,
+ UpdateRatioSensor);
+ }
+
+ private void UpdateRatioSensor()
+ {
+ var currentStat = cache.GetCurrentStatistics();
+ hitRatioSensor.Record((double)currentStat.TotalHits / (currentStat.TotalMisses + currentStat.TotalHits));
+ }
+
+ private void CacheEntryEviction(Bytes key, CacheEntryValue value, EvictionReason reason, MemoryCache state)
+ {
+ if (reason is EvictionReason.Replaced or EvictionReason.None) return;
+
+ var binaryWindowKey = SegmentCacheFunction.Key(key).Get;
+ var windowedKeyBytes = WindowKeyHelper.FromStoreBytesKey(binaryWindowKey, _windowSize);
+ long windowStartTs = windowedKeyBytes.Window.StartMs;
+ var binaryKey = windowedKeyBytes.Key;
+
+ if (flushListener != null)
+ {
+ byte[] rawNewValue = value.Value;
+ byte[] rawOldValue = rawNewValue == null || sendOldValue ? wrapped.Fetch(binaryKey, windowStartTs) : null;
+
+ // this is an optimization: if this key did not exist in underlying store and also not in the cache,
+ // we can skip flushing to downstream as well as writing to underlying store
+ if (rawNewValue != null || rawOldValue != null)
+ {
+ var currentContext = context.RecordContext;
+ context.SetRecordMetaData(value.Context);
+ wrapped.Put(binaryKey, rawNewValue, windowStartTs);
+ flushListener(new KeyValuePair>(
+ binaryWindowKey,
+ new Change(sendOldValue ? rawOldValue : null, rawNewValue)));
+ context.SetRecordMetaData(currentContext);
+ }
+ }
+ else
+ {
+ var currentContext = context.RecordContext;
+ context.SetRecordMetaData(value.Context);
+ wrapped.Put(binaryKey, value.Value, windowStartTs);
+ context.SetRecordMetaData(currentContext);
+ }
+
+ totalCacheSizeSensor.Record(cache.Size);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/AbstractMergedEnumerator.cs b/core/State/Cache/Enumerator/AbstractMergedEnumerator.cs
new file mode 100644
index 00000000..bc08a472
--- /dev/null
+++ b/core/State/Cache/Enumerator/AbstractMergedEnumerator.cs
@@ -0,0 +1,154 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.State.Enumerator;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal abstract class AbstractMergedEnumerator : IKeyValueEnumerator
+ {
+ private enum LastChoice
+ {
+ NONE,
+ STORE,
+ CACHE,
+ BOTH
+ };
+
+ private readonly bool forward;
+ private readonly IKeyValueEnumerator storeEnumerator;
+ private readonly IKeyValueEnumerator cacheEnumerator;
+ private LastChoice _lastChoice = LastChoice.NONE;
+
+ protected AbstractMergedEnumerator(
+ IKeyValueEnumerator cacheEnumerator,
+ IKeyValueEnumerator storeEnumerator,
+ bool forward)
+ {
+ this.storeEnumerator = storeEnumerator;
+ this.cacheEnumerator = cacheEnumerator;
+ this.forward = forward;
+ }
+
+ private bool IsDeletedCacheEntry(KeyValuePair? nextFromCache)
+ => nextFromCache?.Value.Value == null;
+
+ public K PeekNextKey() => Current.Value.Key;
+
+ public bool MoveNext()
+ {
+ // advance the store enumerator if choice is NONE ou STORE
+ if(_lastChoice is LastChoice.NONE or LastChoice.STORE or LastChoice.BOTH)
+ storeEnumerator.MoveNext();
+
+ if (_lastChoice is LastChoice.NONE or LastChoice.CACHE or LastChoice.BOTH)
+ {
+ // skip over items deleted from cache, and corresponding store items if they have the same key
+ while (cacheEnumerator.MoveNext() && IsDeletedCacheEntry(cacheEnumerator.Current))
+ {
+ var currentKeyStore = storeEnumerator.Current;
+ // advance the store enumerator if the key is the same as the deleted cache key
+ if (currentKeyStore != null &&
+ cacheEnumerator.Current != null &&
+ Compare(cacheEnumerator.Current.Value.Key, currentKeyStore.Value.Key) == 0)
+ storeEnumerator.MoveNext();
+ }
+ }
+
+ Bytes nextCacheKey = cacheEnumerator.Current?.Key;
+ bool nullStoreKey = !storeEnumerator.Current.HasValue;
+
+ if (nextCacheKey == null) {
+ Current = CurrentStoreValue();
+ }
+ else if (nullStoreKey) {
+ Current = CurrentCacheValue();
+ }
+ else
+ {
+ int comparison = Compare(nextCacheKey, storeEnumerator.Current.Value.Key);
+ Current = ChooseCurrentValue(comparison);
+ }
+
+ return Current != null;
+ }
+
+ public void Reset()
+ {
+ Current = null;
+ cacheEnumerator.Reset();
+ storeEnumerator.Reset();
+ }
+
+ public KeyValuePair? Current { get; private set; }
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose()
+ {
+ cacheEnumerator.Dispose();
+ storeEnumerator.Dispose();
+ GC.SuppressFinalize(this);
+ }
+
+ #region Abstract
+
+ protected abstract int Compare(Bytes cacheKey, KS storeKey);
+ protected abstract K DeserializeStoreKey(KS storeKey);
+ protected abstract KeyValuePair DeserializeStorePair(KeyValuePair pair);
+ protected abstract K DeserializeCacheKey(Bytes cacheKey);
+ protected abstract V DeserializeCacheValue(CacheEntryValue cacheEntry);
+
+ #endregion
+
+ private KeyValuePair? ChooseCurrentValue(int comparison)
+ {
+ if (forward)
+ {
+ if (comparison > 0) {
+ return CurrentStoreValue();
+ }
+
+ if (comparison < 0) {
+ return CurrentCacheValue();
+ }
+
+ return CurrentCacheValue(true);
+ }
+
+ if (comparison < 0) {
+ return CurrentStoreValue();
+ }
+
+ if (comparison > 0) {
+ return CurrentCacheValue();
+ }
+
+ return CurrentCacheValue(true);
+ }
+
+ private KeyValuePair? CurrentStoreValue()
+ {
+ if (storeEnumerator.Current is not null)
+ {
+ _lastChoice = LastChoice.STORE;
+ return DeserializeStorePair(storeEnumerator.Current.Value);
+ }
+
+ return null;
+ }
+
+ private KeyValuePair? CurrentCacheValue(bool storeEqual = false)
+ {
+ if (cacheEnumerator.Current is not null)
+ {
+ KeyValuePair next = cacheEnumerator.Current.Value;
+ _lastChoice = !storeEqual ? LastChoice.CACHE : LastChoice.BOTH;
+ return new KeyValuePair(DeserializeCacheKey(next.Key), DeserializeCacheValue(next.Value));
+ }
+
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/CacheEnumerator.cs b/core/State/Cache/Enumerator/CacheEnumerator.cs
new file mode 100644
index 00000000..f916de4f
--- /dev/null
+++ b/core/State/Cache/Enumerator/CacheEnumerator.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.State.Cache.Internal;
+using Streamiz.Kafka.Net.State.Enumerator;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class CacheEnumerator : IKeyValueEnumerator
+ where K : class
+ where V : class
+ {
+ private IEnumerator keys;
+ private readonly MemoryCache cache;
+ private readonly Action _beforeClosing;
+ private KeyValuePair? current;
+
+ public CacheEnumerator(
+ IEnumerable keys,
+ MemoryCache cache,
+ Action beforeClosing)
+ {
+ this.keys = keys.GetEnumerator();
+ this.cache = cache;
+ _beforeClosing = beforeClosing;
+ }
+
+ public K PeekNextKey()
+ => current?.Key;
+
+ public bool MoveNext()
+ {
+ var result = keys.MoveNext();
+ if (result)
+ current = new KeyValuePair(keys.Current, cache.Get(keys.Current));
+ else
+ current = null;
+ return result;
+ }
+
+ public void Reset()
+ {
+ keys.Reset();
+ }
+
+ public KeyValuePair? Current => current;
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose()
+ {
+ current = null;
+ keys.Dispose();
+ _beforeClosing?.Invoke();
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/CacheEnumeratorWrapper.cs b/core/State/Cache/Enumerator/CacheEnumeratorWrapper.cs
new file mode 100644
index 00000000..2ddce03c
--- /dev/null
+++ b/core/State/Cache/Enumerator/CacheEnumeratorWrapper.cs
@@ -0,0 +1,163 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.State.Enumerator;
+using Streamiz.Kafka.Net.State.Helper;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class CacheEnumeratorWrapper : IKeyValueEnumerator
+ {
+ private readonly bool forward;
+ private readonly long timeTo;
+ private readonly Bytes keyTo;
+ private readonly CachingWindowStore _windowStore;
+ private readonly Bytes keyFrom;
+ private readonly long segmentInterval;
+
+ private long lastSegmentId;
+ private long currentSegmentId;
+ private Bytes cacheKeyFrom;
+ private Bytes cacheKeyTo;
+
+ private IKeyValueEnumerator current;
+
+ public CacheEnumeratorWrapper(CachingWindowStore windowStore, Bytes key, long timeFrom, long timeTo, bool forward)
+ : this(windowStore, key, key, timeFrom, timeTo, forward)
+ {
+ }
+
+ public CacheEnumeratorWrapper(CachingWindowStore windowStore, Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo, bool forward)
+ {
+ _windowStore = windowStore;
+ this.keyFrom = keyFrom;
+ this.keyTo = keyTo;
+ this.timeTo = timeTo;
+ this.forward = forward;
+
+ segmentInterval = windowStore.SegmentCacheFunction.SegmentInterval;
+
+ if (forward)
+ {
+ lastSegmentId = windowStore.SegmentCacheFunction.SegmentId(Math.Min(timeTo, windowStore.MaxObservedTimestamp));
+ currentSegmentId = windowStore.SegmentCacheFunction.SegmentId(timeFrom);
+
+ SetCacheKeyRange(timeFrom, CurrentSegmentLastTime());
+ current = windowStore.FetchInternal(cacheKeyFrom, cacheKeyTo, true);
+ }
+ else
+ {
+ currentSegmentId = windowStore.SegmentCacheFunction.SegmentId(Math.Min(timeTo, windowStore.MaxObservedTimestamp));
+ lastSegmentId = windowStore.SegmentCacheFunction.SegmentId(timeFrom);
+
+ SetCacheKeyRange(CurrentSegmentBeginTime(), Math.Min(timeTo, windowStore.MaxObservedTimestamp));
+ current = windowStore.FetchInternal(cacheKeyFrom, cacheKeyTo, false);
+ }
+ }
+
+ #region Private
+
+ private void SetCacheKeyRange(long lowerRangeEndTime, long upperRangeEndTime) {
+
+ if (_windowStore.SegmentCacheFunction.SegmentId(lowerRangeEndTime) != _windowStore.SegmentCacheFunction.SegmentId(upperRangeEndTime)) {
+ throw new ArgumentException("Error iterating over segments: segment interval has changed");
+ }
+
+ if (keyFrom != null && keyTo != null && keyFrom.Equals(keyTo)) {
+ cacheKeyFrom = _windowStore.SegmentCacheFunction.CacheKey(SegmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
+ cacheKeyTo = _windowStore.SegmentCacheFunction.CacheKey(SegmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
+ } else {
+ cacheKeyFrom = keyFrom == null ? null :
+ _windowStore.SegmentCacheFunction.CacheKey(_windowStore.KeySchema.LowerRange(keyFrom, lowerRangeEndTime), currentSegmentId);
+ cacheKeyTo = keyTo == null ? null :
+ _windowStore.SegmentCacheFunction.CacheKey(_windowStore.KeySchema.UpperRange(keyTo, timeTo), currentSegmentId);
+ }
+ }
+
+ private Bytes SegmentLowerRangeFixedSize(Bytes key, long segmentBeginTime) {
+ return WindowKeyHelper.ToStoreKeyBinary(key, Math.Max(0, segmentBeginTime), 0);
+ }
+
+ private Bytes SegmentUpperRangeFixedSize(Bytes key, long segmentEndTime) {
+ return WindowKeyHelper.ToStoreKeyBinary(key, segmentEndTime, Int32.MaxValue);
+ }
+
+ private long CurrentSegmentBeginTime() {
+ return currentSegmentId * segmentInterval;
+ }
+
+ private long CurrentSegmentLastTime() {
+ return Math.Min(timeTo, CurrentSegmentBeginTime() + segmentInterval - 1);
+ }
+
+ private void GetNextSegmentIterator() {
+ if (forward) {
+ ++currentSegmentId;
+ // updating as maxObservedTimestamp can change while iterating
+ lastSegmentId =
+ _windowStore.SegmentCacheFunction.SegmentId(Math.Min(timeTo, _windowStore.MaxObservedTimestamp));
+
+ if (currentSegmentId > lastSegmentId) {
+ current = null;
+ return;
+ }
+
+ SetCacheKeyRange(CurrentSegmentBeginTime(), CurrentSegmentLastTime());
+
+ current.Dispose();
+
+ current = _windowStore.FetchInternal(cacheKeyFrom, cacheKeyTo, true);
+ } else {
+ --currentSegmentId;
+
+ // last segment id is stable when iterating backward, therefore no need to update
+ if (currentSegmentId < lastSegmentId) {
+ current = null;
+ return;
+ }
+
+ SetCacheKeyRange(CurrentSegmentBeginTime(), CurrentSegmentLastTime());
+
+ current.Dispose();
+
+ current = _windowStore.FetchInternal(cacheKeyFrom, cacheKeyTo, false);
+ }
+ }
+
+ #endregion
+
+ #region IKeyValueEnumerator impl
+
+ public Bytes PeekNextKey()
+ => current.PeekNextKey();
+
+ public bool MoveNext()
+ {
+ if (current == null) return false;
+
+ if (current.MoveNext())
+ return true;
+
+ while (!current.MoveNext())
+ {
+ GetNextSegmentIterator();
+ if (current == null)
+ return false;
+ }
+
+ return true;
+ }
+
+ public void Reset() => throw new NotImplementedException();
+
+ public KeyValuePair? Current => current?.Current;
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose()
+ => current?.Dispose();
+
+ #endregion
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/FilteredCacheEnumerator.cs b/core/State/Cache/Enumerator/FilteredCacheEnumerator.cs
new file mode 100644
index 00000000..905b44a2
--- /dev/null
+++ b/core/State/Cache/Enumerator/FilteredCacheEnumerator.cs
@@ -0,0 +1,72 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.State.Enumerator;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class FilteredCacheEnumerator : IKeyValueEnumerator
+ {
+ private class WrappredFilteredCacheEnumerator : IKeyValueEnumerator
+ {
+ private readonly ICacheFunction _cacheFunction;
+ private readonly IKeyValueEnumerator _cacheEnumerator;
+
+ public WrappredFilteredCacheEnumerator(
+ ICacheFunction cacheFunction,
+ IKeyValueEnumerator cacheEnumerator)
+ {
+ _cacheFunction = cacheFunction;
+ _cacheEnumerator = cacheEnumerator;
+ }
+
+ private KeyValuePair CachedPair(KeyValuePair next){
+ return new KeyValuePair(_cacheFunction.Key(next.Key), next.Value.Value);
+ }
+
+ public Bytes PeekNextKey() => _cacheFunction.Key(_cacheEnumerator.PeekNextKey());
+
+ public bool MoveNext() => _cacheEnumerator.MoveNext();
+
+ public void Reset() => _cacheEnumerator.Reset();
+
+ public KeyValuePair? Current
+ => _cacheEnumerator.Current is not null
+ ? CachedPair(_cacheEnumerator.Current.Value)
+ : null;
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose() => _cacheEnumerator.Dispose();
+ }
+
+ private readonly IKeyValueEnumerator _cacheEnumerator;
+ private readonly IKeyValueEnumerator _wrappedCacheEnumerator;
+ private readonly Func, bool> _hasNextCondition;
+
+ internal FilteredCacheEnumerator(
+ IKeyValueEnumerator cacheEnumerator,
+ Func,bool> hasNextCondition,
+ ICacheFunction segmentCacheFunction)
+ {
+ _cacheEnumerator = cacheEnumerator;
+ _hasNextCondition = hasNextCondition;
+ _wrappedCacheEnumerator = new WrappredFilteredCacheEnumerator(segmentCacheFunction, _cacheEnumerator);
+ }
+
+ public Bytes PeekNextKey() => _cacheEnumerator.PeekNextKey();
+
+ public bool MoveNext() => _hasNextCondition(_wrappedCacheEnumerator);
+
+ public void Reset()
+ => _cacheEnumerator.Reset();
+
+ public KeyValuePair? Current => _cacheEnumerator.Current;
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose()
+ => _cacheEnumerator.Dispose();
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreEnumerator.cs b/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreEnumerator.cs
new file mode 100644
index 00000000..99236cc8
--- /dev/null
+++ b/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreEnumerator.cs
@@ -0,0 +1,60 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.State.Enumerator;
+using Streamiz.Kafka.Net.State.Helper;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class MergedSortedCacheWindowStoreEnumerator :
+ AbstractMergedEnumerator ,
+ IWindowStoreEnumerator
+ {
+ private readonly ICacheFunction _cacheFunction;
+ private readonly Func _extractStoreTimestamp;
+
+ public MergedSortedCacheWindowStoreEnumerator(
+ ICacheFunction cacheFunction,
+ IKeyValueEnumerator cacheEnumerator,
+ IKeyValueEnumerator storeEnumerator,
+ bool forward)
+ : this(cacheFunction, cacheEnumerator, storeEnumerator, forward, WindowKeyHelper.ExtractStoreTimestamp)
+ {
+
+ }
+
+ private MergedSortedCacheWindowStoreEnumerator(
+ ICacheFunction cacheFunction,
+ IKeyValueEnumerator cacheEnumerator,
+ IKeyValueEnumerator storeEnumerator,
+ bool forward,
+ Func extractStoreTimestamp)
+ : base(cacheEnumerator, storeEnumerator, forward)
+ {
+ _cacheFunction = cacheFunction;
+ _extractStoreTimestamp = extractStoreTimestamp;
+ }
+
+ protected override int Compare(Bytes cacheKey, long storeKey)
+ {
+ byte[] binaryKey = _cacheFunction.BytesFromCacheKey(cacheKey);
+ long cacheTimestamp = _extractStoreTimestamp(binaryKey);
+ return cacheTimestamp.CompareTo(storeKey);
+ }
+
+ protected override long DeserializeStoreKey(long storeKey)
+ => storeKey;
+
+ protected override KeyValuePair DeserializeStorePair(KeyValuePair pair)
+ => pair;
+
+ protected override long DeserializeCacheKey(Bytes cacheKey)
+ {
+ byte[] binaryKey = _cacheFunction.BytesFromCacheKey(cacheKey);
+ return _extractStoreTimestamp(binaryKey);
+ }
+
+ protected override byte[] DeserializeCacheValue(CacheEntryValue cacheEntry)
+ => cacheEntry.Value;
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreKeyValueEnumerator.cs b/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreKeyValueEnumerator.cs
new file mode 100644
index 00000000..b3d3e8b4
--- /dev/null
+++ b/core/State/Cache/Enumerator/MergedSortedCacheWindowStoreKeyValueEnumerator.cs
@@ -0,0 +1,61 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.Enumerator;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class MergedSortedCacheWindowStoreKeyValueEnumerator :
+ AbstractMergedEnumerator, Windowed, byte[], byte[]> {
+
+ private readonly long _windowSize;
+ private readonly ICacheFunction _cacheFunction;
+ private readonly ISerDes _serdes;
+ private readonly string _changelogTopic;
+ private readonly Func, string, Windowed> _storeKeyToWindowKey;
+ private readonly Func _windowKeyToBytes;
+
+ public MergedSortedCacheWindowStoreKeyValueEnumerator(
+ IKeyValueEnumerator cacheEnumerator,
+ IKeyValueEnumerator, byte[]> storeEnumerator,
+ long windowSize,
+ ICacheFunction cacheFunction,
+ ISerDes serdes,
+ string changelogTopic,
+ Func, string, Windowed> storeKeyToWindowKey,
+ Func windowKeyToBytes,
+ bool forward)
+ : base(cacheEnumerator, storeEnumerator, forward)
+ {
+ _windowSize = windowSize;
+ _cacheFunction = cacheFunction;
+ _serdes = serdes;
+ _changelogTopic = changelogTopic;
+ _storeKeyToWindowKey = storeKeyToWindowKey;
+ _windowKeyToBytes = windowKeyToBytes;
+ }
+
+ protected override int Compare(Bytes cacheKey, Windowed storeKey)
+ {
+ var storeKeyBytes = _windowKeyToBytes(storeKey.Key, storeKey.Window.StartMs, 0);
+ return _cacheFunction.CompareSegmentedKeys(cacheKey, storeKeyBytes);
+ }
+
+ protected override Windowed DeserializeStoreKey(Windowed storeKey) => storeKey;
+
+ protected override KeyValuePair, byte[]> DeserializeStorePair(
+ KeyValuePair, byte[]> pair)
+ => pair;
+
+ protected override Windowed DeserializeCacheKey(Bytes cacheKey)
+ {
+ var byteKey = _cacheFunction.Key(cacheKey).Get;
+ return _storeKeyToWindowKey(byteKey, _windowSize, _serdes, _changelogTopic);
+ }
+
+ protected override byte[] DeserializeCacheValue(CacheEntryValue cacheEntry)
+ => cacheEntry.Value;
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Enumerator/MergedStoredCacheKeyValueEnumerator.cs b/core/State/Cache/Enumerator/MergedStoredCacheKeyValueEnumerator.cs
new file mode 100644
index 00000000..dfba0d07
--- /dev/null
+++ b/core/State/Cache/Enumerator/MergedStoredCacheKeyValueEnumerator.cs
@@ -0,0 +1,33 @@
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.State.Enumerator;
+
+namespace Streamiz.Kafka.Net.State.Cache.Enumerator
+{
+ internal class MergedStoredCacheKeyValueEnumerator :
+ AbstractMergedEnumerator
+ {
+ public MergedStoredCacheKeyValueEnumerator(
+ IKeyValueEnumerator cacheEnumerator,
+ IKeyValueEnumerator storeEnumerator,
+ bool forward)
+ : base(cacheEnumerator, storeEnumerator, forward)
+ {
+ }
+
+ protected override int Compare(Bytes cacheKey, Bytes storeKey)
+ => cacheKey.CompareTo(storeKey);
+
+ protected override Bytes DeserializeStoreKey(Bytes storeKey)
+ => storeKey;
+
+ protected override KeyValuePair DeserializeStorePair(KeyValuePair pair)
+ => pair;
+
+ protected override Bytes DeserializeCacheKey(Bytes cacheKey)
+ => cacheKey;
+
+ protected override byte[] DeserializeCacheValue(CacheEntryValue cacheEntry)
+ => cacheEntry.Value;
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/ICacheFunction.cs b/core/State/Cache/ICacheFunction.cs
new file mode 100644
index 00000000..a0d19dc9
--- /dev/null
+++ b/core/State/Cache/ICacheFunction.cs
@@ -0,0 +1,16 @@
+using Streamiz.Kafka.Net.Crosscutting;
+
+namespace Streamiz.Kafka.Net.State.Cache
+{
+ internal interface ICacheFunction
+ {
+ byte[] BytesFromCacheKey(Bytes cacheKey);
+ Bytes Key(Bytes cacheKey);
+ Bytes CacheKey(Bytes cacheKey);
+ Bytes CacheKey(Bytes key, long segmentId);
+ long SegmentId(Bytes key);
+ long SegmentId(long timestamp);
+ long SegmentInterval { get; }
+ int CompareSegmentedKeys(Bytes cacheKey, Bytes storeKey);
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/ICachedStateStore.cs b/core/State/Cache/ICachedStateStore.cs
new file mode 100644
index 00000000..2bc20e32
--- /dev/null
+++ b/core/State/Cache/ICachedStateStore.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.State.Cache
+{
+ internal interface ICachedStateStore
+ {
+ bool SetFlushListener(Action>> listener, bool sendOldChanges);
+ }
+}
\ No newline at end of file
diff --git a/core/State/Cache/Internal/CacheEntry.CacheEntryTokens.cs b/core/State/Cache/Internal/CacheEntry.CacheEntryTokens.cs
new file mode 100644
index 00000000..048bb0ae
--- /dev/null
+++ b/core/State/Cache/Internal/CacheEntry.CacheEntryTokens.cs
@@ -0,0 +1,60 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// This is a fork from Microsoft.Extensions.Caching.Memory.MemoryCache https://github.com/dotnet/runtime/blob/main/src/libraries/Microsoft.Extensions.Caching.Memory
+// The only difference is the compaction process and eviction callback is synchronous whereas the .NET repo is asyncrhonous
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using Microsoft.Extensions.Logging;
+
+namespace Streamiz.Kafka.Net.State.Cache.Internal
+{
+ internal sealed partial class CacheEntry
+ {
+ // this type exists just to reduce average CacheEntry size
+ // which typically is not using expiration tokens or callbacks
+ private sealed class CacheEntryTokens
+ {
+ private List>? _postEvictionCallbacks; // this is not really related to tokens, but was moved here to shrink typical CacheEntry size
+
+ internal List> PostEvictionCallbacks => _postEvictionCallbacks ??= new List>();
+
+
+ internal void InvokeEvictionCallbacks(CacheEntry cacheEntry)
+ {
+ if (_postEvictionCallbacks != null)
+ {
+ InvokeCallbacks(cacheEntry);
+ }
+ }
+
+ private void InvokeCallbacks(CacheEntry entry)
+ {
+ Debug.Assert(entry._tokens != null);
+ List>? callbackRegistrations = Interlocked.Exchange(ref entry._tokens._postEvictionCallbacks, null);
+
+ if (callbackRegistrations == null)
+ {
+ return;
+ }
+
+ for (int i = 0; i < callbackRegistrations.Count; i++)
+ {
+ PostEvictionCallbackRegistration registration = callbackRegistrations[i];
+
+ try
+ {
+ registration.EvictionCallback?.Invoke(entry.Key, entry.Value, entry.EvictionReason, registration.State);
+ }
+ catch (Exception e)
+ {
+ // This will be invoked on a background thread, don't let it throw.
+ entry._cache.Logger.LogError(e, "EvictionCallback invoked failed");
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/State/Cache/Internal/CacheEntry.cs b/core/State/Cache/Internal/CacheEntry.cs
new file mode 100644
index 00000000..d06ff742
--- /dev/null
+++ b/core/State/Cache/Internal/CacheEntry.cs
@@ -0,0 +1,115 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// This is a fork from Microsoft.Extensions.Caching.Memory.MemoryCache https://github.com/dotnet/runtime/blob/main/src/libraries/Microsoft.Extensions.Caching.Memory
+// The only difference is the compaction process and eviction callback is synchronous whereas the .NET repo is asyncrhonous
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace Streamiz.Kafka.Net.State.Cache.Internal
+{
+ internal sealed partial class CacheEntry : ICacheEntry
+ where K : class
+ where V : class
+ {
+ //private static readonly Action