From 4a82b5ba72f129c59669642de475f29957c38a21 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Thu, 21 Nov 2024 14:39:01 -0800 Subject: [PATCH] #385 - Fix EOS duplicate handle --- core/Kafka/Internal/StreamsProducer.cs | 2 +- .../RocksDBMetricsRecordingTrigger.cs | 9 +++--- launcher/sample-stream/Program.cs | 31 ++++++++++++------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/core/Kafka/Internal/StreamsProducer.cs b/core/Kafka/Internal/StreamsProducer.cs index 0fe43ac8..a0ea730c 100644 --- a/core/Kafka/Internal/StreamsProducer.cs +++ b/core/Kafka/Internal/StreamsProducer.cs @@ -58,7 +58,7 @@ public StreamsProducer( case ProcessingGuarantee.AT_LEAST_ONCE: break; case ProcessingGuarantee.EXACTLY_ONCE: - _producerConfig.TransactionalId = $"{config.ApplicationId}-{processId}"; + _producerConfig.TransactionalId = $"{config.ApplicationId}-{processId}-{threadId}"; break; default: throw new StreamsException($"Guarantee {config.Guarantee} is not supported yet"); diff --git a/core/Metrics/Internal/RocksDBMetricsRecordingTrigger.cs b/core/Metrics/Internal/RocksDBMetricsRecordingTrigger.cs index 39bcd6ce..2c714d7e 100644 --- a/core/Metrics/Internal/RocksDBMetricsRecordingTrigger.cs +++ b/core/Metrics/Internal/RocksDBMetricsRecordingTrigger.cs @@ -1,18 +1,19 @@ +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Runtime.InteropServices; using Streamiz.Kafka.Net.Errors; namespace Streamiz.Kafka.Net.Metrics.Internal { internal class RocksDbMetricsRecordingTrigger { - private IDictionary MetricsRecorders { get; } - = new Dictionary(); + private ConcurrentDictionary MetricsRecorders { get; } = new(); internal void AddMetricsRecorder(RocksDbMetricsRecorder recorder) { if (!MetricsRecorders.ContainsKey(recorder.Name)) { - MetricsRecorders.Add(recorder.Name, recorder); + MetricsRecorders.TryAdd(recorder.Name, recorder); return; } @@ -21,7 +22,7 @@ internal void AddMetricsRecorder(RocksDbMetricsRecorder recorder) } internal void RemoveMetricsRecorder(RocksDbMetricsRecorder recorder) - => MetricsRecorders.Remove(recorder.Name); + => MetricsRecorders.TryRemove(recorder.Name, out RocksDbMetricsRecorder _); internal void Run(long now) { diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs index c35f4564..3b16286a 100644 --- a/launcher/sample-stream/Program.cs +++ b/launcher/sample-stream/Program.cs @@ -20,11 +20,13 @@ public static async Task Main(string[] args) ApplicationId = $"test-app", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, + Guarantee = ProcessingGuarantee.EXACTLY_ONCE, Logger = LoggerFactory.Create((b) => { b.AddConsole(); - b.SetMinimumLevel(LogLevel.Debug); - }) + b.SetMinimumLevel(LogLevel.Information); + }), + NumStreamThreads = 2 }; var t = BuildTopology(); @@ -40,18 +42,23 @@ public static async Task Main(string[] args) private static Topology BuildTopology() { var builder = new StreamBuilder(); - - builder.Stream("input2") + + builder + .Stream("input") + .Map((k, v, r) => + { + var newKey = Guid.NewGuid().ToString(); + return KeyValuePair.Create(newKey, newKey); + }) .GroupByKey() - .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))) - .Count() - .Suppress(SuppressedBuilder.UntilWindowClose, long>(TimeSpan.Zero, - StrictBufferConfig.Unbounded()) - .WithKeySerdes(new TimeWindowedSerDes(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds))) + .Aggregate( + () => Guid.NewGuid().ToString(), + (key, curr, acc) => acc, + RocksDb.As("test-ktable") + ) .ToStream() - .Map((k,v, r) => new KeyValuePair(k.Key, v)) - .To("output2"); - + .To("output"); + return builder.Build(); } }