Skip to content

Commit

Permalink
sequentially close the external stream thread
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Oct 21, 2024
1 parent 33fad96 commit d36208e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
19 changes: 9 additions & 10 deletions core/Processors/ExternalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ internal class ExternalStreamThread : IThread
private IAdminClient adminClient;
private readonly string logPrefix;
private readonly Thread thread;
private CancellationToken token;

private DateTime lastCommit = DateTime.Now;
private DateTime lastMetrics = DateTime.Now;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void Run()
{
SetState(ThreadState.RUNNING);

while (!token.IsCancellationRequested)
while (State.IsRunning())
{
if (exception != null)
{
Expand Down Expand Up @@ -205,13 +204,11 @@ private void CompleteShutdown()
{
if (!IsDisposable)
{

log.LogInformation($"{logPrefix}Shutting down");

SetState(ThreadState.PENDING_SHUTDOWN);

IsRunning = false;


if (State != ThreadState.PENDING_SHUTDOWN)
SetState(ThreadState.PENDING_SHUTDOWN);

CommitOffsets(true);

var consumer = GetConsumer();
Expand Down Expand Up @@ -308,6 +305,9 @@ private void CloseThread()
{
try
{
log.LogInformation($"{logPrefix}Shutting down");
SetState(ThreadState.PENDING_SHUTDOWN);

thread.Join();
}
catch (Exception e)
Expand All @@ -326,8 +326,7 @@ public void Start(CancellationToken token)
IsRunning = false;
return;
}

this.token = token;

IsRunning = true;

if(configuration.Guarantee == ProcessingGuarantee.EXACTLY_ONCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ public void Shutdown()
_tokenSource.Cancel();

foreach (var t in _streamThreads)
{
t.Dispose();
}

_externalStreamThread?.Dispose();
_globalStreamThread?.Dispose();
Expand Down

0 comments on commit d36208e

Please sign in to comment.