Skip to content

Commit

Permalink
applied changes to fix issue LGouellec#328
Browse files Browse the repository at this point in the history
  • Loading branch information
emaalbero committed Jul 26, 2024
1 parent 9cfac50 commit fc44224
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 66 deletions.
21 changes: 12 additions & 9 deletions core/Kafka/Internal/StreamsRebalanceListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@ public void PartitionsRevoked(IConsumer<byte[], byte[]> consumer, List<TopicPart
DateTime start = DateTime.Now;
lock (manager._lock)
{
manager.RebalanceInProgress = true;
manager.RevokeTasks(new List<TopicPartition>(partitions.Select(p => p.TopicPartition)));
Thread.SetState(ThreadState.PARTITIONS_REVOKED);
manager.RebalanceInProgress = false;
if (Thread.IsRunning)
{
manager.RebalanceInProgress = true;
manager.RevokeTasks(new List<TopicPartition>(partitions.Select(p => p.TopicPartition)));
Thread.SetState(ThreadState.PARTITIONS_REVOKED);
manager.RebalanceInProgress = false;

StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms");
sb.AppendLine(
$"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}");
log.LogInformation(sb.ToString());
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Partition revocation took {DateTime.Now - start} ms");
sb.AppendLine(
$"\tCurrent suspended active tasks: {string.Join(",", partitions.Select(p => $"{p.Topic}-{p.Partition}"))}");
log.LogInformation(sb.ToString());
}
}
}

Expand Down
49 changes: 21 additions & 28 deletions core/KafkaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public override string ToString()
private readonly StreamMetricsRegistry metricsRegistry;

private readonly CancellationTokenSource _cancelSource = new();
private readonly SequentiallyGracefullyShutdownHook shutdownHook;

internal State StreamState { get; private set; }

Expand Down Expand Up @@ -401,6 +402,13 @@ string Protect(string str)
queryableStoreProvider = new QueryableStoreProvider(stateStoreProviders, globalStateStoreProvider);

StreamState = State.CREATED;

shutdownHook = new SequentiallyGracefullyShutdownHook(
threads,
globalStreamThread,
externalStreamThread,
_cancelSource
);
}

/// <summary>
Expand All @@ -424,13 +432,9 @@ public void Start(CancellationToken? token = null)
public async Task StartAsync(CancellationToken? token = null)
{
if (token.HasValue)
{
token.Value.Register(() => {
_cancelSource.Cancel();
Dispose();
});
}
await Task.Factory.StartNew(async () =>
token.Value.Register(Dispose);

await Task.Factory.StartNew(() =>
{
if (SetState(State.REBALANCING))
{
Expand All @@ -448,12 +452,12 @@ await Task.Factory.StartNew(async () =>
SetState(State.PENDING_SHUTDOWN);
SetState(State.ERROR);
}
return;
return Task.CompletedTask;
}

RunMiddleware(true, true);

globalStreamThread?.Start(_cancelSource.Token);
globalStreamThread?.Start();
externalStreamThread?.Start(_cancelSource.Token);

foreach (var t in threads)
Expand All @@ -463,14 +467,17 @@ await Task.Factory.StartNew(async () =>

RunMiddleware(false, true);
}


return Task.CompletedTask;
}, token ?? _cancelSource.Token);


try
{
// Allow time for streams thread to run
await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
token ?? _cancelSource.Token);
//await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
// token ?? _cancelSource.Token);
}
catch
{
Expand All @@ -484,16 +491,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(configuration.StartTaskDelayMs),
/// </summary>
public void Dispose()
{
Task.Factory.StartNew(() =>
{
if (!_cancelSource.IsCancellationRequested)
{
_cancelSource.Cancel();
}

Close();
_cancelSource.Dispose();
}).Wait(TimeSpan.FromSeconds(30));
Close();
_cancelSource.Dispose();
}

/// <summary>
Expand All @@ -511,14 +510,8 @@ private void Close()
else
{
RunMiddleware(true, false);

foreach (var t in threads)
{
t.Dispose();
}

externalStreamThread?.Dispose();
globalStreamThread?.Dispose();
shutdownHook.Shutdown();

RunMiddleware(false, false);
metricsRegistry.RemoveClientSensors();
Expand Down
2 changes: 1 addition & 1 deletion core/Mock/ClusterInMemoryTopologyDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void stateChangedHandeler(IThread thread, ThreadStateTransitionValidator old,

InitializeInternalTopicManager();

globalStreamThread?.Start(token);
globalStreamThread?.Start();
externalStreamThread?.Start(token);

threadTopology.Start(token);
Expand Down
11 changes: 6 additions & 5 deletions core/Mock/Kafka/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ internal ConsumeResult<byte[], byte[]> Consume(MockConsumer mockConsumer, TimeSp
Key = record.Key,
Value = record.Value,
Timestamp = new Timestamp(
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable)
record.Timestamp ?? DateTime.Now, TimestampType.NotAvailable),
Headers = record.Headers
}
};
++offset.OffsetConsumed;
Expand Down Expand Up @@ -799,8 +800,8 @@ internal DeliveryReport<byte[], byte[]> Produce(string topic, Message<byte[], by
partition = RandomGenerator.GetInt32(topics[topic].PartitionNumber);
else
partition = Math.Abs(MurMurHash3.Hash(new MemoryStream(message.Key))) % topics[topic].PartitionNumber;
topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs);

topics[topic].AddMessage(message.Key, message.Value, partition, message.Timestamp.UnixTimestampMs, message.Headers);

r.Message = message;
r.Partition = partition;
Expand All @@ -822,13 +823,13 @@ internal DeliveryReport<byte[], byte[]> Produce(TopicPartition topicPartition, M
CreateTopic(topicPartition.Topic);
if (topics[topicPartition.Topic].PartitionNumber > topicPartition.Partition)
{
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}
else
{
topics[topicPartition.Topic].CreateNewPartitions(topicPartition.Partition);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs);
topics[topicPartition.Topic].AddMessage(message.Key, message.Value, topicPartition.Partition, message.Timestamp.UnixTimestampMs, message.Headers);
r.Status = PersistenceStatus.Persisted;
}

Expand Down
9 changes: 5 additions & 4 deletions core/Mock/Kafka/MockPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Streamiz.Kafka.Net.Mock.Kafka
{
internal class MockPartition
{
private readonly List<(byte[], byte[], long)> log = new();
private readonly List<(byte[], byte[], long, Headers)> log = new();
private readonly Dictionary<long, long> mappingOffsets = new();

public MockPartition(int indice)
Expand All @@ -21,10 +21,10 @@ public MockPartition(int indice)
public long LowOffset { get; private set; } = Offset.Unset;
public long HighOffset { get; private set; } = Offset.Unset;

internal void AddMessageInLog(byte[] key, byte[] value, long timestamp)
internal void AddMessageInLog(byte[] key, byte[] value, long timestamp, Headers headers)
{
mappingOffsets.Add(Size, log.Count);
log.Add((key, value, timestamp));
log.Add((key, value, timestamp, headers));
++Size;
UpdateOffset();
}
Expand All @@ -47,7 +47,8 @@ internal TestRecord<byte[], byte[]> GetMessage(long offset)
{
Key = record.Item1,
Value = record.Item2,
Timestamp = record.Item3.FromMilliseconds()
Timestamp = record.Item3.FromMilliseconds(),
Headers = record.Item4
};
}

Expand Down
4 changes: 2 additions & 2 deletions core/Mock/Kafka/MockTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public MockTopic(string topic, int part)
public int PartitionNumber { get; private set; }
public IEnumerable<MockPartition> Partitions => partitions.AsReadOnly();

public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0)
public void AddMessage(byte[] key, byte[] value, int partition, long timestamp = 0, Headers headers = null)
{
partitions[partition].AddMessageInLog(key, value, timestamp);
partitions[partition].AddMessageInLog(key, value, timestamp, headers);
}

public TestRecord<byte[], byte[]> GetMessage(int partition, long consumerOffset)
Expand Down
2 changes: 2 additions & 0 deletions core/Mock/Sync/SyncPipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public void PublishRecord(string topic, byte[] key, byte[] value, DateTime times
public void Flush()
{
long now = DateTime.Now.GetMilliseconds();
TaskManager.CurrentTask = task;
while (task.CanProcess(now))
task.Process();

task.PunctuateStreamTime();
task.PunctuateSystemTime();
TaskManager.CurrentTask = null;
}

public void Close()
Expand Down
11 changes: 4 additions & 7 deletions core/Processors/GlobalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public void Close()
private readonly string logPrefix;
private readonly string threadClientId;
private readonly IConsumer<byte[], byte[]> globalConsumer;
private CancellationToken token;
private readonly object stateLock = new object();
private readonly object stateLock = new();
private readonly IStreamConfig configuration;
private StateConsumer stateConsumer;
private readonly IGlobalStateMaintainer globalStateMaintainer;
Expand Down Expand Up @@ -140,7 +139,7 @@ private void Run()
SetState(GlobalThreadState.RUNNING);
try
{
while (!token.IsCancellationRequested && State.IsRunning())
while (State.IsRunning())
{
stateConsumer.PollAndUpdate();

Expand All @@ -166,11 +165,11 @@ private void Run()
// https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1065
}

Dispose(false);
//Dispose(false);
}
}

public void Start(CancellationToken token)
public void Start()
{
log.LogInformation("{LogPrefix}Starting", logPrefix);

Expand All @@ -185,8 +184,6 @@ public void Start(CancellationToken token)
throw;
}

this.token = token;

thread.Start();
}

Expand Down
44 changes: 44 additions & 0 deletions core/Processors/Internal/SequentiallyGracefullyShutdownHook.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Threading;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Crosscutting;

namespace Streamiz.Kafka.Net.Processors.Internal
{
internal class SequentiallyGracefullyShutdownHook
{
private readonly IThread[] _streamThreads;
private readonly GlobalStreamThread _globalStreamThread;
private readonly IThread _externalStreamThread;
private readonly CancellationTokenSource _tokenSource;
private readonly ILogger log = Logger.GetLogger(typeof(SequentiallyGracefullyShutdownHook));

public SequentiallyGracefullyShutdownHook(
IThread[] streamThreads,
GlobalStreamThread globalStreamThread,
IThread externalStreamThread,
CancellationTokenSource tokenSource
)
{
_streamThreads = streamThreads;
_globalStreamThread = globalStreamThread;
_externalStreamThread = externalStreamThread;
_tokenSource = tokenSource;
}

public void Shutdown()
{
log.LogInformation($"Request shutdown gracefully");
_tokenSource.Cancel();

foreach (var t in _streamThreads)
{
t.Dispose();
}

_externalStreamThread?.Dispose();
_globalStreamThread?.Dispose();

log.LogInformation($"Shutdown gracefully successful");
}
}
}
1 change: 1 addition & 0 deletions core/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,7 @@ public MetricsRecordingLevel MetricsRecording
/// Time wait before completing the start task of <see cref="KafkaStream"/>. (default: 5000)
/// </summary>
[StreamConfigProperty("" + startTaskDelayMsCst)]
[Obsolete]
public long StartTaskDelayMs
{
get => configProperties[startTaskDelayMsCst];
Expand Down
Loading

0 comments on commit fc44224

Please sign in to comment.