Skip to content

Commit

Permalink
Merge pull request #323 from LGouellec/feature/197
Browse files Browse the repository at this point in the history
Feature/197 - Caching state store
  • Loading branch information
LGouellec authored Jun 20, 2024
2 parents 54608e4 + 7bc1200 commit 0635ab5
Show file tree
Hide file tree
Showing 128 changed files with 5,453 additions and 708 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- run: echo "deb http://ftp.debian.org/debian stable main contrib non-free" >> /etc/apt/sources.list
- run: apt update
- run: apt install -y unzip
# Install JAVA 11
# Install JAVA 17
- run: mkdir -p /usr/share/man/man1 # FIX https://github.com/geerlingguy/ansible-role-java/issues/64
- run: apt install -y openjdk-17-jdk
# BEGIN Dependencies for RocksDB
Expand All @@ -24,7 +24,7 @@
# END Dependencies for RocksDB
- run: set JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64/
- run: export JAVA_HOME
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs"
- run: dotnet sonarscanner begin /k:LGouellec_kafka-streams-dotnet /o:kafka-streams-dotnet /d:sonar.login=${SONAR_TOKEN} /d:sonar.host.url=https://sonarcloud.io /d:sonar.cs.opencover.reportsPaths="**\coverage*.opencover.xml" /d:sonar.coverage.exclusions="**sample*/*.cs,**test*/*.cs,**Tests*.cs,**Mock*.cs,**State/Cache/Internal/*.cs"
- run: dotnet build
- run: dotnet test --no-restore --no-build --verbosity normal -f net6.0 --collect:"XPlat Code Coverage" /p:CollectCoverage=true /p:CoverletOutputFormat=opencover test/Streamiz.Kafka.Net.Tests/Streamiz.Kafka.Net.Tests.csproj
- run: dotnet sonarscanner end /d:sonar.login=${SONAR_TOKEN}
Expand Down
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,22 @@ static async System.Threading.Tasks.Task Main(string[] args)
|:------------------------------------------------------------:|:----------------------------------:|:----------------------:|:------------------------------------------:|
| Stateless processors | X | X | |
| RocksDb store | X | X | |
| Standby replicas | X | | No plan for now |
| Standby replicas | X | | No plan for now |
| InMemory store | X | X | |
| Transformer, Processor API | X | X | |
| Punctuate | X | X | |
| Punctuate | X | X | |
| KStream-KStream Join | X | X | |
| KTable-KTable Join | X | X | |
| KTable-KTable FK Join | X | | Plan for 1.6.0 |
| KTable-KTable FK Join | X | | Plan for future |
| KStream-KTable Join | X | X | |
| KStream-GlobalKTable Join | X | X | |
| KStream Async Processing (external call inside the topology) | | X | Not supported in Kafka Streams JAVA |
| KStream Async Processing (external call inside the topology) | | X | Not supported in Kafka Streams JAVA |
| Hopping window | X | X | |
| Tumbling window | X | X | |
| Sliding window | X | | No plan for now |
| Session window | X | | No plan for now |
| Cache | X | | Plan for 1.6.0 |
| Suppress(..) | X | | No plan for now |
| Cache | X | X | EA 1.6.0 |
| Suppress(..) | X | | No plan for now |
| Interactive Queries | X | | No plan for now |
| State store batch restoring | X | | No plan for now |
| Exactly Once (v1 and v2) | X | X | EOS V1 supported, EOS V2 not supported yet |
Expand All @@ -137,3 +137,13 @@ When adding or changing a service please add tests and documentations.
# Support

You can found support [here](https://discord.gg/J7Jtxum)

# Star History

<a href="https://star-history.com/#LGouellec/kafka-streams-dotnet&Date">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date&theme=dark" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=LGouellec/kafka-streams-dotnet&type=Date" />
</picture>
</a>
36 changes: 31 additions & 5 deletions core/Crosscutting/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Confluent.Kafka;

namespace Streamiz.Kafka.Net.Crosscutting
Expand All @@ -26,11 +27,9 @@ public static bool AddOrUpdate<K, V>(this IDictionary<K, V> map, K key, V value)
map[key] = value;
return false;
}
else
{
map.Add(key, value);
return true;
}

map.Add(key, value);
return true;
}

/// <summary>
Expand Down Expand Up @@ -134,5 +133,32 @@ public static void CreateListOrAdd<K, V>(this IDictionary<K, List<V>> source, K
source.Add(key, new List<V>{value});
}

#if NETSTANDARD2_0
public static bool TryAdd<K, V>( this IDictionary<K, V> dictionary,
K key,
V value){
if (dictionary == null)
throw new ArgumentNullException(nameof (dictionary));
if (dictionary.ContainsKey(key))
return false;
dictionary.Add(key, value);
return true;
}

public static bool Remove<K, V>( this IDictionary<K, V> dictionary,
K key,
out V value){
bool result = dictionary.TryGetValue(key, out V valueTmp);
if (result)
{
value = valueTmp;
dictionary.Remove(key);
return true;
}
value = default(V);
return false;
}
#endif

}
}
3 changes: 3 additions & 0 deletions core/Crosscutting/KafkaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ internal static Headers Clone(this Headers headers)
originHeader.ForEach(h => copyHeaders.Add(h.Key, h.Item2));
return copyHeaders;
}

internal static long GetEstimatedSize(this Headers headers)
=> headers.Sum(header => header.Key.Length + header.GetValueBytes().LongLength);

internal static Headers AddOrUpdate(this Headers headers, string key, byte[] value)
{
Expand Down
14 changes: 14 additions & 0 deletions core/Crosscutting/SortedDictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,19 @@ internal static IEnumerable<KeyValuePair<K, V>> SubMap<K, V>(this SortedDictiona
}
}
}

internal static IEnumerable<KeyValuePair<K, V>> TailMap<K, V>(this SortedDictionary<K, V> sortedDic, K keyFrom,
bool inclusive)
{
foreach (K k in sortedDic.Keys)
{
int rT = sortedDic.Comparer.Compare(keyFrom, k);

if ((inclusive && rT <= 0) || (!inclusive && rT < 0))
{
yield return new KeyValuePair<K, V>(k, sortedDic[k]);
}
}
}
}
}
6 changes: 6 additions & 0 deletions core/Crosscutting/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,11 @@ public static bool IsNumeric(object expression, out Double number)
, NumberFormatInfo.InvariantInfo
, out number);
}

public static void CheckIfNotNull(object parameter, string nameAccessor)
{
if(parameter == null)
throw new ArgumentException($"{nameAccessor} must not be null");
}
}
}
2 changes: 1 addition & 1 deletion core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ string Protect(string str)
() => StreamState != null && StreamState.IsRunning() ? 1 : 0,
() => threads.Count(t => t.State != ThreadState.DEAD && t.State != ThreadState.PENDING_SHUTDOWN),
metricsRegistry);

threads = new IThread[numStreamThreads];
var threadState = new Dictionary<long, Processors.ThreadState>();

Expand Down
69 changes: 69 additions & 0 deletions core/Metrics/Internal/CachingMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Streamiz.Kafka.Net.Processors.Internal;

namespace Streamiz.Kafka.Net.Metrics.Internal
{
internal class CachingMetrics
{
internal static string CACHE_SIZE_BYTES_TOTAL = "cache-size-bytes-total";
private static string CACHE_SIZE_BYTES_TOTAL_DESCRIPTION = "The total size in bytes of this cache state store.";

internal static string HIT_RATIO = "hit-ratio";
private static string HIT_RATIO_DESCRIPTION = "The hit ratio defined as the ratio of cache read hits over the total cache read requests.";
private static string HIT_RATIO_AVG_DESCRIPTION = "The average cache hit ratio";
private static string HIT_RATIO_MIN_DESCRIPTION = "The minimum cache hit ratio";
private static string HIT_RATIO_MAX_DESCRIPTION = "The maximum cache hit ratio";

public static Sensor HitRatioSensor(
TaskId taskId,
string storeType,
string storeName,
StreamMetricsRegistry streamsMetrics) {

Sensor sensor;
string hitMetricName = HIT_RATIO;
IDictionary<string, string> tags =
streamsMetrics.StoreLevelTags(GetThreadId(), taskId.ToString(), storeName, storeType);

sensor = streamsMetrics.StoreLevelSensor(GetThreadId(), taskId, storeName, hitMetricName, HIT_RATIO_DESCRIPTION, MetricsRecordingLevel.DEBUG);

SensorHelper.AddAvgAndMinAndMaxToSensor(
sensor,
StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP,
tags,
hitMetricName,
HIT_RATIO_AVG_DESCRIPTION,
HIT_RATIO_MAX_DESCRIPTION,
HIT_RATIO_MIN_DESCRIPTION);

return sensor;
}

public static Sensor TotalCacheSizeBytesSensor(
TaskId taskId,
string storeType,
string storeName,
StreamMetricsRegistry streamsMetrics) {

Sensor sensor;
string totalCacheMetricName = CACHE_SIZE_BYTES_TOTAL;
IDictionary<string, string> tags =
streamsMetrics.StoreLevelTags(GetThreadId(), taskId.ToString(), storeName, storeType);

sensor = streamsMetrics.StoreLevelSensor(GetThreadId(), taskId, storeName, totalCacheMetricName, CACHE_SIZE_BYTES_TOTAL_DESCRIPTION, MetricsRecordingLevel.DEBUG);

SensorHelper.AddValueMetricToSensor(
sensor,
StreamMetricsRegistry.STATE_STORE_LEVEL_GROUP,
tags,
totalCacheMetricName,
CACHE_SIZE_BYTES_TOTAL_DESCRIPTION);

return sensor;
}

private static string GetThreadId() => Thread.CurrentThread.Name;
}
}
34 changes: 34 additions & 0 deletions core/Metrics/Internal/SensorHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,40 @@ internal static void AddAvgAndMaxToSensor(Sensor sensor,
);
}

internal static void AddAvgAndMinAndMaxToSensor(Sensor sensor,
string group,
IDictionary<string, string> tags,
string operation,
string descriptionOfAvg,
string descriptionOfMax,
string descriptionOfMin) {

sensor.AddStatMetric(
new MetricName(
operation + StreamMetricsRegistry.AVG_SUFFIX,
group,
descriptionOfAvg,
tags),
new Avg()
);
sensor.AddStatMetric(
new MetricName(
operation + StreamMetricsRegistry.MAX_SUFFIX,
group,
descriptionOfMax,
tags),
new Max()
);
sensor.AddStatMetric(
new MetricName(
operation + StreamMetricsRegistry.MIN_SUFFIX,
group,
descriptionOfMin,
tags),
new Min()
);
}

internal static void AddRateOfSumAndSumMetricsToSensor(Sensor sensor,
string group,
IDictionary<string, string> tags,
Expand Down
3 changes: 3 additions & 0 deletions core/Metrics/Sensor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ internal void Record()
internal void Record(long value)
=> Record(value, DateTime.Now.GetMilliseconds());

internal void Record(double value)
=> Record(value, DateTime.Now.GetMilliseconds());

internal virtual void Record(double value, long timeMs)
=> RecordInternal(value, timeMs);

Expand Down
6 changes: 6 additions & 0 deletions core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old,
}
}
}

public void TriggerCommit()
{
throw new NotImplementedException();
//((StreamThread)threadTopology)?.Manager.CommitAll();
}

#endregion
}
Expand Down
1 change: 1 addition & 0 deletions core/Mock/IBehaviorTopologyTestDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ internal interface IBehaviorTopologyTestDriver : IDisposable
TestOutputTopic<K, V> CreateOutputTopic<K, V>(string topicName, TimeSpan consumeTimeout, ISerDes<K> keySerdes = null, ISerDes<V> valueSerdes = null);
TestMultiInputTopic<K, V> CreateMultiInputTopic<K, V>(string[] topics, ISerDes<K> keySerdes = null, ISerDes<V> valueSerdes = null);
IStateStore GetStateStore<K, V>(string name);
void TriggerCommit();
}
}
18 changes: 18 additions & 0 deletions core/Mock/TaskSynchronousTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ public TestInputTopic<K, V> CreateInputTopic<K, V>(string topicName, ISerDes<K>
null);

foreach (var topic in topicsLink)
{
GetTask(topic);
pipeInput.Flushed += () => ForwardRepartitionTopic(consumer, topic);
}

return new TestInputTopic<K, V>(pipeInput, configuration, keySerdes, valueSerdes);
}
Expand Down Expand Up @@ -296,6 +299,21 @@ public IStateStore GetStateStore<K, V>(string name)
return hasGlobalTopology ? globalProcessorContext.GetStateStore(name) : null;
}

public void TriggerCommit()
{
foreach(var extProcessor in externalProcessorTopologies.Values)
extProcessor.Flush();

foreach (var task in tasks.Values)
{
var consumer = supplier.GetConsumer(topicConfiguration.ToConsumerConfig("consumer-repartition-forwarder"),
null);
task.Commit();
}

globalTask?.FlushState();
}

#endregion
}
}
8 changes: 8 additions & 0 deletions core/Mock/TopologyTestDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ public void Dispose()
behavior.Dispose();
}

/// <summary>
/// Trigger the driver to commit, especially needed if you use caching
/// </summary>
public void Commit()
{
behavior.TriggerCommit();
}

#region Create Input Topic

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions core/ProcessorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class ProcessorContext
/// </summary>
public virtual string StateDir => $"{Path.Combine(Configuration.StateDir, Configuration.ApplicationId, Id.ToString())}";

internal bool ConfigEnableCache => Configuration.StateStoreCacheMaxBytes > 0;

// FOR TESTING
internal ProcessorContext()
{
Expand All @@ -91,6 +93,11 @@ internal ProcessorContext UseRecordCollector(IRecordCollector collector)
return this;
}

internal void SetRecordMetaData(IRecordContext context)
{
RecordContext = context;
}

internal void SetRecordMetaData(ConsumeResult<byte[], byte[]> result)
{
RecordContext = new RecordContext(result);
Expand Down
Loading

0 comments on commit 0635ab5

Please sign in to comment.