From 2a0761636cf6b636a9f8839c8aa12562a9adf402 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Fri, 12 Jul 2024 15:29:00 -0700 Subject: [PATCH 1/4] reproducer 328 --- samples/sample-stream/Reproducer314.cs | 2 +- samples/sample-stream/Reproducer328.cs | 62 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 samples/sample-stream/Reproducer328.cs diff --git a/samples/sample-stream/Reproducer314.cs b/samples/sample-stream/Reproducer314.cs index 44230759..9723e5c4 100644 --- a/samples/sample-stream/Reproducer314.cs +++ b/samples/sample-stream/Reproducer314.cs @@ -12,7 +12,7 @@ namespace sample_stream; public class Reproducer314 { - public static async Task Main(string[] args) + public static async Task Main2(string[] args) { Console.WriteLine("Hello Streams"); diff --git a/samples/sample-stream/Reproducer328.cs b/samples/sample-stream/Reproducer328.cs new file mode 100644 index 00000000..8b6e10b8 --- /dev/null +++ b/samples/sample-stream/Reproducer328.cs @@ -0,0 +1,62 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Streamiz.Kafka.Net; +using Streamiz.Kafka.Net.SerDes; +using Streamiz.Kafka.Net.Table; + +namespace sample_stream; + +public class Reproducer328 +{ + public class Product + { + public string Category { get; set; } + public string Name { get; set; } + + public override string ToString() + { + return $"Name:{Name}, Category: {Category}"; + } + } + + public static async Task Main(string[] args) + { + var config = new StreamConfig + { + ApplicationId = $"test-reproducer328", + BootstrapServers = "localhost:9092", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + var builder = CreateTopology(); + var t = builder.Build(); + var stream = new KafkaStream(t, config); + + Console.CancelKeyPress += (sender, eventArgs) => stream.Dispose(); + + await stream.StartAsync(); + } + + private static StreamBuilder CreateTopology() + { + StreamBuilder builder = new StreamBuilder(); + var global = builder.GlobalTable("product_category", + InMemory.As("product-store")); + + builder.Stream>("product") + .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) + .Join(global, + (s, s1) => s1.Category, + (product, s) => + { + product.Category = s; + return product; + }) + .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) + .To>("product_output"); + + return builder; + } +} \ No newline at end of file From 5f5b13b47d7c15c6859bdffa7205c39f7bae2c20 Mon Sep 17 00:00:00 2001 From: LGouellec Date: Mon, 15 Jul 2024 16:25:55 -0700 Subject: [PATCH 2/4] FIX in progress --- .../Internal/StreamsRebalanceListener.cs | 21 ++++---- core/KafkaStream.cs | 49 ++++++++----------- core/Mock/ClusterInMemoryTopologyDriver.cs | 2 +- core/Processors/GlobalStreamThread.cs | 13 ++--- core/Processors/IThread.cs | 1 - .../SequentiallyGracefullyShutdownHook.cs | 44 +++++++++++++++++ core/StreamConfig.cs | 5 +- environment/confs/order.avsc | 40 +++++++++++++++ environment/datagen_connector.json | 8 +-- environment/docker-compose-with-connect.yml | 2 + environment/start.sh | 39 ++++++--------- samples/sample-stream/Reproducer328.cs | 25 +++++++--- samples/sample-stream/log4net.config | 12 +++++ .../Private/GlobalStreamThreadTests.cs | 24 +++++---- 14 files changed, 192 insertions(+), 93 deletions(-) create mode 100644 core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs create mode 100644 environment/confs/order.avsc diff --git a/core/Kafka/Internal/StreamsRebalanceListener.cs b/core/Kafka/Internal/StreamsRebalanceListener.cs index 41f6a0b1..a8f25f75 100644 --- a/core/Kafka/Internal/StreamsRebalanceListener.cs +++ b/core/Kafka/Internal/StreamsRebalanceListener.cs @@ -55,16 +55,19 @@ public void PartitionsRevoked(IConsumer consumer, List(partitions.Select(p => p.TopicPartition))); - Thread.SetState(ThreadState.PARTITIONS_REVOKED); - manager.RebalanceInProgress = false; + if (Thread.IsRunning) + { + manager.RebalanceInProgress = true; + manager.RevokeTasks(new List(partitions.Select(p => p.TopicPartition))); + Thread.SetState(ThreadState.PARTITIONS_REVOKED); + manager.RebalanceInProgress = false; - StringBuilder sb = new StringBuilder(); - sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms"); - sb.AppendLine( - $"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}"); - log.LogInformation(sb.ToString()); + StringBuilder sb = new StringBuilder(); + sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms"); + sb.AppendLine( + $"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}"); + log.LogInformation(sb.ToString()); + } } } diff --git a/core/KafkaStream.cs b/core/KafkaStream.cs index babef907..f1598feb 100644 --- a/core/KafkaStream.cs +++ b/core/KafkaStream.cs @@ -259,6 +259,7 @@ public override string ToString() private readonly StreamMetricsRegistry metricsRegistry; private readonly CancellationTokenSource _cancelSource = new(); + private readonly SequentiallyGracefullyShutdownHook shutdownHook; internal State StreamState { get; private set; } @@ -401,6 +402,13 @@ string Protect(string str) queryableStoreProvider = new QueryableStoreProvider(stateStoreProviders, globalStateStoreProvider); StreamState = State.CREATED; + + shutdownHook = new SequentiallyGracefullyShutdownHook( + threads, + globalStreamThread, + externalStreamThread, + _cancelSource + ); } /// @@ -424,13 +432,9 @@ public void Start(CancellationToken? token = null) public async Task StartAsync(CancellationToken? token = null) { if (token.HasValue) - { - token.Value.Register(() => { - _cancelSource.Cancel(); - Dispose(); - }); - } - await Task.Factory.StartNew(async () => + token.Value.Register(Dispose); + + await Task.Factory.StartNew(() => { if (SetState(State.REBALANCING)) { @@ -448,12 +452,13 @@ await Task.Factory.StartNew(async () => SetState(State.PENDING_SHUTDOWN); SetState(State.ERROR); } - return; + + return Task.CompletedTask; } RunMiddleware(true, true); - globalStreamThread?.Start(_cancelSource.Token); + globalStreamThread?.Start(); externalStreamThread?.Start(_cancelSource.Token); foreach (var t in threads) @@ -463,14 +468,16 @@ await Task.Factory.StartNew(async () => RunMiddleware(false, true); } + + return Task.CompletedTask; }, token ?? _cancelSource.Token); try { // Allow time for streams thread to run - await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), - token ?? _cancelSource.Token); + // await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), + // token ?? _cancelSource.Token); } catch { @@ -484,16 +491,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs), /// public void Dispose() { - Task.Factory.StartNew(() => - { - if (!_cancelSource.IsCancellationRequested) - { - _cancelSource.Cancel(); - } - - Close(); - _cancelSource.Dispose(); - }).Wait(TimeSpan.FromSeconds(30)); + Close(); + _cancelSource.Dispose(); } /// @@ -512,13 +511,7 @@ private void Close() { RunMiddleware(true, false); - foreach (var t in threads) - { - t.Dispose(); - } - - externalStreamThread?.Dispose(); - globalStreamThread?.Dispose(); + shutdownHook.Shutdown(); RunMiddleware(false, false); metricsRegistry.RemoveClientSensors(); diff --git a/core/Mock/ClusterInMemoryTopologyDriver.cs b/core/Mock/ClusterInMemoryTopologyDriver.cs index 8b22280b..63100725 100644 --- a/core/Mock/ClusterInMemoryTopologyDriver.cs +++ b/core/Mock/ClusterInMemoryTopologyDriver.cs @@ -212,7 +212,7 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old, InitializeInternalTopicManager(); - globalStreamThread?.Start(token); + globalStreamThread?.Start(); externalStreamThread?.Start(token); threadTopology.Start(token); diff --git a/core/Processors/GlobalStreamThread.cs b/core/Processors/GlobalStreamThread.cs index 60969fe9..59af4920 100644 --- a/core/Processors/GlobalStreamThread.cs +++ b/core/Processors/GlobalStreamThread.cs @@ -109,8 +109,7 @@ public void Close() private readonly string logPrefix; private readonly string threadClientId; private readonly IConsumer globalConsumer; - private CancellationToken token; - private readonly object stateLock = new object(); + private readonly object stateLock = new(); private readonly IStreamConfig configuration; private StateConsumer stateConsumer; private readonly IGlobalStateMaintainer globalStateMaintainer; @@ -140,7 +139,7 @@ private void Run() SetState(GlobalThreadState.RUNNING); try { - while (!token.IsCancellationRequested && State.IsRunning()) + while (State.IsRunning()) { stateConsumer.PollAndUpdate(); @@ -166,11 +165,11 @@ private void Run() // https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1065 } - Dispose(false); + //Dispose(false); } } - public void Start(CancellationToken token) + public void Start() { log.LogInformation("{LogPrefix}Starting", logPrefix); @@ -184,9 +183,7 @@ public void Start(CancellationToken token) $"{logPrefix}Error happened during initialization of the global state store; this thread has shutdown : {e}"); throw; } - - this.token = token; - + thread.Start(); } diff --git a/core/Processors/IThread.cs b/core/Processors/IThread.cs index 6647bfd2..7f55166a 100644 --- a/core/Processors/IThread.cs +++ b/core/Processors/IThread.cs @@ -14,7 +14,6 @@ interface IThread : IDisposable void Run(); void Start(CancellationToken token); IEnumerable ActiveTasks { get; } - event ThreadStateListener StateChanged; } } diff --git a/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs b/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs new file mode 100644 index 00000000..fd626179 --- /dev/null +++ b/core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs @@ -0,0 +1,44 @@ +using System.Threading; +using Microsoft.Extensions.Logging; +using Streamiz.Kafka.Net.Crosscutting; + +namespace Streamiz.Kafka.Net.Processors.Internal +{ + internal class SequentiallyGracefullyShutdownHook + { + private readonly IThread[] _streamThreads; + private readonly GlobalStreamThread _globalStreamThread; + private readonly IThread _externalStreamThread; + private readonly CancellationTokenSource _tokenSource; + private readonly ILogger log = Logger.GetLogger(typeof(SequentiallyGracefullyShutdownHook)); + + public SequentiallyGracefullyShutdownHook( + IThread [] streamThreads, + GlobalStreamThread globalStreamThread, + IThread externalStreamThread, + CancellationTokenSource tokenSource + ) + { + _streamThreads = streamThreads; + _globalStreamThread = globalStreamThread; + _externalStreamThread = externalStreamThread; + _tokenSource = tokenSource; + } + + public void Shutdown() + { + log.LogInformation($"Request shutdown gracefully"); + _tokenSource.Cancel(); + + foreach (var t in _streamThreads) + { + t.Dispose(); + } + + _externalStreamThread?.Dispose(); + _globalStreamThread?.Dispose(); + + log.LogInformation($"Shutdown gracefully successful"); + } + } +} \ No newline at end of file diff --git a/core/StreamConfig.cs b/core/StreamConfig.cs index 0e4319a3..10822b67 100644 --- a/core/StreamConfig.cs +++ b/core/StreamConfig.cs @@ -2716,9 +2716,12 @@ public MetricsRecordingLevel MetricsRecording } /// - /// Time wait before completing the start task of . (default: 5000) + /// Time wait before completing the start task of . + /// Should be removed in the next release. + /// (default: 5000) /// [StreamConfigProperty("" + startTaskDelayMsCst)] + [Obsolete] public long StartTaskDelayMs { get => configProperties[startTaskDelayMsCst]; diff --git a/environment/confs/order.avsc b/environment/confs/order.avsc new file mode 100644 index 00000000..b869c2a7 --- /dev/null +++ b/environment/confs/order.avsc @@ -0,0 +1,40 @@ +{ + "namespace": "ksql", + "name": "product", + "type": "record", + "fields": [ + { + "name": "category", + "type": { + "type": "string", + "arg.properties": { + "options": ["1", "2", "3"] + } + } + }, + {"name": "name", "type": { + "type": "string", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }}, + {"name": "description", "type": { + "type": "string", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }}, + {"name": "price", "type": { + "type": "double", + "arg.properties": { + "iteration": { + "start": 0 + } + } + }} + ] +} \ No newline at end of file diff --git a/environment/datagen_connector.json b/environment/datagen_connector.json index 66abd216..cb0eda80 100644 --- a/environment/datagen_connector.json +++ b/environment/datagen_connector.json @@ -1,5 +1,5 @@ { - "name": "datagen-users", + "name": "datagen-products", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "users", @@ -7,8 +7,10 @@ "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", - "max.interval": 1000, + "max.interval": 50, "iterations": 10000000, - "tasks.max": "1" + "tasks.max": "1", + "schema.filename": "/home/appuser/order.avsc", + "schema.keyfield": "name" } } \ No newline at end of file diff --git a/environment/docker-compose-with-connect.yml b/environment/docker-compose-with-connect.yml index 3a4489ca..6e358fe9 100644 --- a/environment/docker-compose-with-connect.yml +++ b/environment/docker-compose-with-connect.yml @@ -78,6 +78,8 @@ services: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + volumes: + - ./confs/order.avsc:/home/appuser/order.avsc akhq: image: tchiotludo/akhq:latest diff --git a/environment/start.sh b/environment/start.sh index 9263e49c..765f503e 100644 --- a/environment/start.sh +++ b/environment/start.sh @@ -1,29 +1,20 @@ #!/bin/bash -curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \ +curl -i -X PUT http://localhost:8083/connectors/datagen_product/config \ -H "Content-Type: application/json" \ -d '{ - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": false, - "kafka.topic": "shoes", - "quickstart": "shoes", - "max.interval": 100, - "iterations": 10000000, - "tasks.max": "1" - }' + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "product3", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 50, + "iterations": 10000000, + "tasks.max": "1", + "schema.filename": "/home/appuser/order.avsc", + "schema.keyfield": "name" + }' -curl -i -X PUT http://localhost:8083/connectors/datagen_local_02/config \ - -H "Content-Type: application/json" \ - -d '{ - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": false, - "kafka.topic": "orders", - "quickstart": "shoe_orders", - "max.interval": 100, - "iterations": 10000000, - "tasks.max": "1" - }' \ No newline at end of file + +# curl -i -X PUT http://localhost:8083/connectors/datagen_product/pause +# curl -i -X PUT http://localhost:8083/connectors/datagen_product/resume \ No newline at end of file diff --git a/samples/sample-stream/Reproducer328.cs b/samples/sample-stream/Reproducer328.cs index 8b6e10b8..c6065c8a 100644 --- a/samples/sample-stream/Reproducer328.cs +++ b/samples/sample-stream/Reproducer328.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; +using Microsoft.Extensions.Logging; using Streamiz.Kafka.Net; using Streamiz.Kafka.Net.SerDes; using Streamiz.Kafka.Net.Table; @@ -12,12 +13,14 @@ public class Reproducer328 { public class Product { - public string Category { get; set; } - public string Name { get; set; } + public string category { get; set; } + public string name { get; set; } + public string description { get; set; } + public double price { get; set; } public override string ToString() { - return $"Name:{Name}, Category: {Category}"; + return $"Name:{name}, Category: {category}, Price: {price}"; } } @@ -27,7 +30,13 @@ public static async Task Main(string[] args) { ApplicationId = $"test-reproducer328", BootstrapServers = "localhost:9092", - AutoOffsetReset = AutoOffsetReset.Earliest + AutoOffsetReset = AutoOffsetReset.Earliest, + MaxPollRecords = 1, + Logger = LoggerFactory.Create(b => + { + b.SetMinimumLevel(LogLevel.Debug); + b.AddLog4Net(); + }) }; var builder = CreateTopology(); @@ -45,17 +54,17 @@ private static StreamBuilder CreateTopology() var global = builder.GlobalTable("product_category", InMemory.As("product-store")); - builder.Stream>("product") + builder.Stream>("product3") .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) .Join(global, - (s, s1) => s1.Category, + (s, s1) => s1.category, (product, s) => { - product.Category = s; + product.category = s; return product; }) .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) - .To>("product_output"); + .To>("product_output3"); return builder; } diff --git a/samples/sample-stream/log4net.config b/samples/sample-stream/log4net.config index 8c6b0b02..0531d3e7 100644 --- a/samples/sample-stream/log4net.config +++ b/samples/sample-stream/log4net.config @@ -5,8 +5,20 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs index 8a42bf26..b92a49f5 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs @@ -47,13 +47,13 @@ public void ShouldConvertExceptionsToStreamsException() { streamConfigMock.Setup(x => x.PollMs).Throws(new Exception("boom")); - Assert.Throws(() => globalStreamThread.Start(cancellationTokenSource.Token)); + Assert.Throws(() => globalStreamThread.Start()); } [Test] public void ShouldBeRunningAfterSuccesfullStart() { - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); // we need to wait for thread to set running state Thread.Sleep(100); @@ -65,8 +65,9 @@ public void ShouldStopRunningWhenClosedByUser() { var token = cancellationTokenSource.Token; - globalStreamThread.Start(token); + globalStreamThread.Start(); cancellationTokenSource.Cancel(); + globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); @@ -78,8 +79,9 @@ public void ShouldStopGlobalConsumer() { var token = cancellationTokenSource.Token; - globalStreamThread.Start(token); + globalStreamThread.Start(); cancellationTokenSource.Cancel(); + globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); @@ -91,8 +93,9 @@ public void ShouldStopGlobalStateMaintainer() { var token = cancellationTokenSource.Token; - globalStreamThread.Start(token); + globalStreamThread.Start(); cancellationTokenSource.Cancel(); + globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); @@ -105,8 +108,9 @@ public void ShouldStopGlobalStateMaintainerEvenIfStoppingConsumerThrows() globalConsumerMock.Setup(x => x.Close()).Throws(new Exception()); var token = cancellationTokenSource.Token; - globalStreamThread.Start(token); + globalStreamThread.Start(); cancellationTokenSource.Cancel(); + globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); @@ -122,7 +126,7 @@ public void ShouldAssignTopicsToConsumer() }; globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(partitionOffsetDictionary); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); var parts = partitionOffsetDictionary.Keys.Select(o => new TopicPartitionOffset(o, Offset.Beginning)); globalConsumerMock.Verify(x => x.Assign(parts)); @@ -139,7 +143,7 @@ public void ShouldConsumeRecords() .Returns(result2) .Returns((ConsumeResult) null); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); // wait some time so that thread can process data Thread.Sleep(100); @@ -151,7 +155,7 @@ public void ShouldConsumeRecords() public void ShouldNotFlushTooSoon() { streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(100); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); // this should be true as the thread should wait 100ms to flush globalStateMaintainerMock.Verify(x => x.FlushState(), Times.Never); @@ -161,7 +165,7 @@ public void ShouldNotFlushTooSoon() public void ShouldFlush() { streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(10); - globalStreamThread.Start(cancellationTokenSource.Token); + globalStreamThread.Start(); Thread.Sleep(50); // we are waiting longer than CommitIntervalMs so thread should already flush at least once From 3241896fabb76d795f49ccde652df69421cb5fed Mon Sep 17 00:00:00 2001 From: LGouellec Date: Tue, 16 Jul 2024 09:20:59 -0700 Subject: [PATCH 3/4] enable batching example --- samples/sample-stream/Reproducer328.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/sample-stream/Reproducer328.cs b/samples/sample-stream/Reproducer328.cs index c6065c8a..68dc7407 100644 --- a/samples/sample-stream/Reproducer328.cs +++ b/samples/sample-stream/Reproducer328.cs @@ -31,7 +31,6 @@ public static async Task Main(string[] args) ApplicationId = $"test-reproducer328", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, - MaxPollRecords = 1, Logger = LoggerFactory.Create(b => { b.SetMinimumLevel(LogLevel.Debug); From c8356d726a6bfac8fe0c60272d062a75405231de Mon Sep 17 00:00:00 2001 From: LGouellec Date: Wed, 31 Jul 2024 10:38:54 -0700 Subject: [PATCH 4/4] fix unit tests --- core/KafkaStream.cs | 4 - core/Processors/GlobalStreamThread.cs | 8 +- .../OpenTelemetryMetricsExporter.cs | 4 - samples/sample-stream/Reproducer328.cs | 70 --------- samples/sample-stream/sample-stream.csproj | 6 - .../Private/GlobalStateManagerTests.cs | 6 + .../Private/GlobalStreamThreadTests.cs | 146 +++++++++++++----- 7 files changed, 120 insertions(+), 124 deletions(-) delete mode 100644 samples/sample-stream/Reproducer328.cs diff --git a/core/KafkaStream.cs b/core/KafkaStream.cs index 942d801b..0a850644 100644 --- a/core/KafkaStream.cs +++ b/core/KafkaStream.cs @@ -432,16 +432,12 @@ public void Start(CancellationToken? token = null) public async Task StartAsync(CancellationToken? token = null) { if (token.HasValue) -<<<<<<< HEAD - token.Value.Register(Dispose); -======= { token.Value.Register(() => { _cancelSource.Cancel(); Dispose(); }); } ->>>>>>> develop await Task.Factory.StartNew(() => { diff --git a/core/Processors/GlobalStreamThread.cs b/core/Processors/GlobalStreamThread.cs index 59af4920..dc6de433 100644 --- a/core/Processors/GlobalStreamThread.cs +++ b/core/Processors/GlobalStreamThread.cs @@ -270,7 +270,13 @@ protected virtual void Dispose(bool waitForThread) if (waitForThread) { - thread.Join(); + try + { + thread.Join(); + } + catch (ThreadStateException) + { + } } SetState(GlobalThreadState.DEAD); diff --git a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs index 2f4db88c..f71595d2 100644 --- a/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs +++ b/metrics/Streamiz.Kafka.Net.Metrics.OpenTelemetry/OpenTelemetryMetricsExporter.cs @@ -1,10 +1,6 @@ -using System; using System.Collections.Generic; using System.Diagnostics.Metrics; -using System.Globalization; using System.Linq; -using System.Runtime.CompilerServices; -using Streamiz.Kafka.Net.Mock; namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry { diff --git a/samples/sample-stream/Reproducer328.cs b/samples/sample-stream/Reproducer328.cs deleted file mode 100644 index 68dc7407..00000000 --- a/samples/sample-stream/Reproducer328.cs +++ /dev/null @@ -1,70 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Confluent.Kafka; -using Microsoft.Extensions.Logging; -using Streamiz.Kafka.Net; -using Streamiz.Kafka.Net.SerDes; -using Streamiz.Kafka.Net.Table; - -namespace sample_stream; - -public class Reproducer328 -{ - public class Product - { - public string category { get; set; } - public string name { get; set; } - public string description { get; set; } - public double price { get; set; } - - public override string ToString() - { - return $"Name:{name}, Category: {category}, Price: {price}"; - } - } - - public static async Task Main(string[] args) - { - var config = new StreamConfig - { - ApplicationId = $"test-reproducer328", - BootstrapServers = "localhost:9092", - AutoOffsetReset = AutoOffsetReset.Earliest, - Logger = LoggerFactory.Create(b => - { - b.SetMinimumLevel(LogLevel.Debug); - b.AddLog4Net(); - }) - }; - - var builder = CreateTopology(); - var t = builder.Build(); - var stream = new KafkaStream(t, config); - - Console.CancelKeyPress += (sender, eventArgs) => stream.Dispose(); - - await stream.StartAsync(); - } - - private static StreamBuilder CreateTopology() - { - StreamBuilder builder = new StreamBuilder(); - var global = builder.GlobalTable("product_category", - InMemory.As("product-store")); - - builder.Stream>("product3") - .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) - .Join(global, - (s, s1) => s1.category, - (product, s) => - { - product.category = s; - return product; - }) - .Peek((k, v) => Console.WriteLine($"Product Key : {k} | Product Value : {v}")) - .To>("product_output3"); - - return builder; - } -} \ No newline at end of file diff --git a/samples/sample-stream/sample-stream.csproj b/samples/sample-stream/sample-stream.csproj index 6f41e8e0..1c6eca58 100644 --- a/samples/sample-stream/sample-stream.csproj +++ b/samples/sample-stream/sample-stream.csproj @@ -15,12 +15,6 @@ - - - - - - diff --git a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs index 50300de6..d4fc4c09 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStateManagerTests.cs @@ -87,6 +87,12 @@ public void SetUp() stateManager.SetGlobalProcessorContext(context); } + [TearDown] + public void Dispose() + { + + } + [Test] public void ShouldInitializeStateStores() { diff --git a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs index b92a49f5..c8d6f5dc 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/GlobalStreamThreadTests.cs @@ -14,112 +14,146 @@ namespace Streamiz.Kafka.Net.Tests.Private { public class GlobalStreamThreadTests { - private Mock globalStateMaintainerMock; - private Mock streamConfigMock; - private Mock> globalConsumerMock; - - private GlobalStreamThread globalStreamThread; - private CancellationTokenSource cancellationTokenSource; - - [SetUp] - public void SetUp() + [Test] + public void ShouldConvertExceptionsToStreamsException() { - globalConsumerMock = new Mock>(); - globalStateMaintainerMock = new Mock(); - streamConfigMock = new Mock(); + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); streamConfigMock.Setup(x => x.PollMs).Returns(1); streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); - cancellationTokenSource = new CancellationTokenSource(); - globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, globalStateMaintainerMock.Object, new StreamMetricsRegistry()); - } - - [TearDown] - public void TearDown() - { - cancellationTokenSource.Cancel(); - } - - [Test] - public void ShouldConvertExceptionsToStreamsException() - { + streamConfigMock.Setup(x => x.PollMs).Throws(new Exception("boom")); Assert.Throws(() => globalStreamThread.Start()); + globalStreamThread.Dispose(); } [Test] public void ShouldBeRunningAfterSuccesfullStart() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalStreamThread.Start(); // we need to wait for thread to set running state Thread.Sleep(100); Assert.AreEqual(GlobalThreadState.RUNNING, globalStreamThread.State); + + globalStreamThread.Dispose(); } [Test] public void ShouldStopRunningWhenClosedByUser() { - var token = cancellationTokenSource.Token; + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); - globalStreamThread.Start(); - cancellationTokenSource.Cancel(); - globalStreamThread.Dispose(); + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalStreamThread.Start(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); Assert.AreEqual(GlobalThreadState.DEAD, globalStreamThread.State); } [Test] public void ShouldStopGlobalConsumer() { - var token = cancellationTokenSource.Token; + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); globalStreamThread.Start(); - cancellationTokenSource.Cancel(); - globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); globalConsumerMock.Verify(x => x.Close()); } [Test] public void ShouldStopGlobalStateMaintainer() { - var token = cancellationTokenSource.Token; + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalStreamThread.Start(); - cancellationTokenSource.Cancel(); - globalStreamThread.Dispose(); - + // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); globalStateMaintainerMock.Verify(x => x.Close()); } [Test] public void ShouldStopGlobalStateMaintainerEvenIfStoppingConsumerThrows() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + globalConsumerMock.Setup(x => x.Close()).Throws(new Exception()); - var token = cancellationTokenSource.Token; globalStreamThread.Start(); - cancellationTokenSource.Cancel(); - globalStreamThread.Dispose(); // thread should stop after some time Thread.Sleep(100); + globalStreamThread.Dispose(); + globalStateMaintainerMock.Verify(x => x.Close()); } [Test] public void ShouldAssignTopicsToConsumer() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + var partitionOffsetDictionary = new Dictionary() { {new TopicPartition("topic", 0), Offset.Beginning} @@ -130,11 +164,22 @@ public void ShouldAssignTopicsToConsumer() var parts = partitionOffsetDictionary.Keys.Select(o => new TopicPartitionOffset(o, Offset.Beginning)); globalConsumerMock.Verify(x => x.Assign(parts)); + globalStreamThread.Dispose(); } [Test] public void ShouldConsumeRecords() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + var result1 = new ConsumeResult(); var result2 = new ConsumeResult(); @@ -149,27 +194,50 @@ public void ShouldConsumeRecords() Thread.Sleep(100); globalStateMaintainerMock.Verify(x => x.Update(result1), Times.Once); globalStateMaintainerMock.Verify(x => x.Update(result2), Times.Once); + globalStreamThread.Dispose(); } [Test] public void ShouldNotFlushTooSoon() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(100); globalStreamThread.Start(); // this should be true as the thread should wait 100ms to flush globalStateMaintainerMock.Verify(x => x.FlushState(), Times.Never); + globalStreamThread.Dispose(); } [Test] public void ShouldFlush() { + var globalConsumerMock = new Mock>(); + var globalStateMaintainerMock = new Mock(); + var streamConfigMock = new Mock(); + streamConfigMock.Setup(x => x.PollMs).Returns(1); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(1); + globalStateMaintainerMock.Setup(x => x.Initialize()).Returns(new Dictionary()); + + var globalStreamThread = new GlobalStreamThread("global", globalConsumerMock.Object, streamConfigMock.Object, + globalStateMaintainerMock.Object, new StreamMetricsRegistry()); + streamConfigMock.Setup(x => x.CommitIntervalMs).Returns(10); globalStreamThread.Start(); Thread.Sleep(50); // we are waiting longer than CommitIntervalMs so thread should already flush at least once globalStateMaintainerMock.Verify(x => x.FlushState()); + globalStreamThread.Dispose(); } } } \ No newline at end of file