Skip to content

Commit

Permalink
Merge pull request #352 from LGouellec/reproducer/328
Browse files Browse the repository at this point in the history
Fix issue 328
  • Loading branch information
LGouellec authored Jul 31, 2024
2 parents dbe7eb6 + c8356d7 commit a7082a6
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 121 deletions.
21 changes: 12 additions & 9 deletions core/Kafka/Internal/StreamsRebalanceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@ public void PartitionsRevoked(IConsumer<byte[], byte[]> consumer, List<TopicPart
DateTime start = DateTime.Now;
lock (manager._lock)
{
manager.RebalanceInProgress = true;
manager.RevokeTasks(new List<TopicPartition>(partitions.Select(p => p.TopicPartition)));
Thread.SetState(ThreadState.PARTITIONS_REVOKED);
manager.RebalanceInProgress = false;
if (Thread.IsRunning)
{
manager.RebalanceInProgress = true;
manager.RevokeTasks(new List<TopicPartition>(partitions.Select(p => p.TopicPartition)));
Thread.SetState(ThreadState.PARTITIONS_REVOKED);
manager.RebalanceInProgress = false;

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms");
sb.AppendLine(
$"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}");
log.LogInformation(sb.ToString());
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms");
sb.AppendLine(
$"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}");
log.LogInformation(sb.ToString());
}
}
}

Expand Down
34 changes: 14 additions & 20 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public override string ToString()
private readonly StreamMetricsRegistry metricsRegistry;

private readonly CancellationTokenSource _cancelSource = new();
private readonly SequentiallyGracefullyShutdownHook shutdownHook;

internal State StreamState { get; private set; }

Expand Down Expand Up @@ -402,6 +403,13 @@ string Protect(string str)
queryableStoreProvider = new QueryableStoreProvider(stateStoreProviders, globalStateStoreProvider);

StreamState = State.CREATED;

shutdownHook = new SequentiallyGracefullyShutdownHook(
threads,
globalStreamThread,
externalStreamThread,
_cancelSource
);
}

/// <summary>
Expand Down Expand Up @@ -456,7 +464,7 @@ await Task.Factory.StartNew(() =>

RunMiddleware(true, true);

globalStreamThread?.Start(_cancelSource.Token);
globalStreamThread?.Start();
externalStreamThread?.Start(_cancelSource.Token);

foreach (var t in threads)
Expand All @@ -474,8 +482,8 @@ await Task.Factory.StartNew(() =>
try
{
// Allow time for streams thread to run
await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
token ?? _cancelSource.Token);
// await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
// token ?? _cancelSource.Token);
}
catch
{
Expand All @@ -489,16 +497,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
/// </summary>
public void Dispose()
{
Task.Factory.StartNew(() =>
{
if (!_cancelSource.IsCancellationRequested)
{
_cancelSource.Cancel();
}

Close();
_cancelSource.Dispose();
}).Wait(TimeSpan.FromSeconds(30));
Close();
_cancelSource.Dispose();
}

/// <summary>
Expand All @@ -517,13 +517,7 @@ private void Close()
{
RunMiddleware(true, false);

foreach (var t in threads)
{
t.Dispose();
}

externalStreamThread?.Dispose();
globalStreamThread?.Dispose();
shutdownHook.Shutdown();

RunMiddleware(false, false);
metricsRegistry.RemoveClientSensors();
Expand Down
2 changes: 1 addition & 1 deletion core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old,

InitializeInternalTopicManager();

globalStreamThread?.Start(token);
globalStreamThread?.Start();
externalStreamThread?.Start(token);

threadTopology.Start(token);
Expand Down
21 changes: 12 additions & 9 deletions core/Processors/GlobalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public void Close()
private readonly string logPrefix;
private readonly string threadClientId;
private readonly IConsumer<byte[], byte[]> globalConsumer;
private CancellationToken token;
private readonly object stateLock = new object();
private readonly object stateLock = new();
private readonly IStreamConfig configuration;
private StateConsumer stateConsumer;
private readonly IGlobalStateMaintainer globalStateMaintainer;
Expand Down Expand Up @@ -140,7 +139,7 @@ private void Run()
SetState(GlobalThreadState.RUNNING);
try
{
while (!token.IsCancellationRequested && State.IsRunning())
while (State.IsRunning())
{
stateConsumer.PollAndUpdate();

Expand All @@ -166,11 +165,11 @@ private void Run()
// https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1065
}

Dispose(false);
//Dispose(false);
}
}

public void Start(CancellationToken token)
public void Start()
{
log.LogInformation("{LogPrefix}Starting", logPrefix);

Expand All @@ -184,9 +183,7 @@ public void Start(CancellationToken token)
$"{logPrefix}Error happened during initialization of the global state store; this thread has shutdown : {e}");
throw;
}

this.token = token;


thread.Start();
}

Expand Down Expand Up @@ -273,7 +270,13 @@ protected virtual void Dispose(bool waitForThread)

if (waitForThread)
{
thread.Join();
try
{
thread.Join();
}
catch (ThreadStateException)
{
}
}

SetState(GlobalThreadState.DEAD);
Expand Down
1 change: 0 additions & 1 deletion core/Processors/IThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ interface IThread : IDisposable
void Run();
void Start(CancellationToken token);
IEnumerable<ITask> ActiveTasks { get; }

event ThreadStateListener StateChanged;
}
}
44 changes: 44 additions & 0 deletions core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class SequentiallyGracefullyShutdownHook
{
private readonly IThread[] _streamThreads;
private readonly GlobalStreamThread _globalStreamThread;
private readonly IThread _externalStreamThread;
private readonly CancellationTokenSource _tokenSource;
private readonly ILogger log = Logger.GetLogger(typeof(SequentiallyGracefullyShutdownHook));

public SequentiallyGracefullyShutdownHook(
IThread [] streamThreads,
GlobalStreamThread globalStreamThread,
IThread externalStreamThread,
CancellationTokenSource tokenSource
)
{
_streamThreads = streamThreads;
_globalStreamThread = globalStreamThread;
_externalStreamThread = externalStreamThread;
_tokenSource = tokenSource;
}

public void Shutdown()
{
log.LogInformation($"Request shutdown gracefully");
_tokenSource.Cancel();

foreach (var t in _streamThreads)
{
t.Dispose();
}

_externalStreamThread?.Dispose();
_globalStreamThread?.Dispose();

log.LogInformation($"Shutdown gracefully successful");
}
}
}
5 changes: 4 additions & 1 deletion core/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2736,9 +2736,12 @@ public MetricsRecordingLevel MetricsRecording
}

/// <summary>
/// Time wait before completing the start task of <see cref="KafkaStream"/>. (default: 5000)
/// Time wait before completing the start task of <see cref="KafkaStream"/>.
/// Should be removed in the next release.
/// (default: 5000)
/// </summary>
[StreamConfigProperty("" + startTaskDelayMsCst)]
[Obsolete]
public long StartTaskDelayMs
{
get => configProperties[startTaskDelayMsCst];
Expand Down
40 changes: 40 additions & 0 deletions environment/confs/order.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"namespace": "ksql",
"name": "product",
"type": "record",
"fields": [
{
"name": "category",
"type": {
"type": "string",
"arg.properties": {
"options": ["1", "2", "3"]
}
}
},
{"name": "name", "type": {
"type": "string",
"arg.properties": {
"iteration": {
"start": 0
}
}
}},
{"name": "description", "type": {
"type": "string",
"arg.properties": {
"iteration": {
"start": 0
}
}
}},
{"name": "price", "type": {
"type": "double",
"arg.properties": {
"iteration": {
"start": 0
}
}
}}
]
}
6 changes: 4 additions & 2 deletions environment/datagen_connector.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "datagen-users",
"name": "datagen-products",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
Expand All @@ -9,6 +9,8 @@
"value.converter.schemas.enable": "false",
"max.interval": 50,
"iterations": 10000000,
"tasks.max": "1"
"tasks.max": "1",
"schema.filename": "/home/appuser/order.avsc",
"schema.keyfield": "name"
}
}
2 changes: 2 additions & 0 deletions environment/docker-compose-with-connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ services:
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
volumes:
- ./confs/order.avsc:/home/appuser/order.avsc

akhq:
image: tchiotludo/akhq:latest
Expand Down
39 changes: 15 additions & 24 deletions environment/start.sh
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
#!/bin/bash

curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
curl -i -X PUT http://localhost:8083/connectors/datagen_product/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"kafka.topic": "shoes",
"quickstart": "shoes",
"max.interval": 100,
"iterations": 10000000,
"tasks.max": "1"
}'
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "product3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 50,
"iterations": 10000000,
"tasks.max": "1",
"schema.filename": "/home/appuser/order.avsc",
"schema.keyfield": "name"
}'

curl -i -X PUT http://localhost:8083/connectors/datagen_local_02/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"kafka.topic": "orders",
"quickstart": "shoe_orders",
"max.interval": 100,
"iterations": 10000000,
"tasks.max": "1"
}'

# curl -i -X PUT http://localhost:8083/connectors/datagen_product/pause
# curl -i -X PUT http://localhost:8083/connectors/datagen_product/resume
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Globalization;
using System.Linq;
using System.Runtime.CompilerServices;
using Streamiz.Kafka.Net.Mock;

namespace Streamiz.Kafka.Net.Metrics.OpenTelemetry
{
Expand Down
12 changes: 12 additions & 0 deletions samples/sample-stream/log4net.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,20 @@
<param name="ConversionPattern" value="%d [%t] %-5p %c - %m%n" />
</layout>
</appender>
<appender name="file" type="log4net.Appender.RollingFileAppender">
<file value="myapp.log" />
<appendToFile value="true" />
<rollingStyle value="Size" />
<maxSizeRollBackups value="5" />
<maximumFileSize value="10MB" />
<staticLogFileName value="true" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date [%thread] %level %logger - %message%newline" />
</layout>
</appender>
<root>
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
<appender-ref ref="file" />
</root>
</log4net>
Loading

0 comments on commit a7082a6

Please sign in to comment.