Skip to content

Commit

Permalink
Batch ingestion not correctly handling OperationCancelledException wh…
Browse files Browse the repository at this point in the history
…ich can cause the ingestion to never used incoming context tasks and hang. (#4791)

Backport of #4780
  • Loading branch information
ramonsmits authored Feb 10, 2025
1 parent 2aac1b5 commit 62c9b20
Show file tree
Hide file tree
Showing 30 changed files with 206 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<Ke
}
} while (await timer.WaitForNextTickAsync(stoppingToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public override async Task Stop(CancellationToken cancellationToken = default)
{
await checkTask;
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
//Swallow
// Even though we are stopping, ONLY swallow when OCE from callee to not hide any ungraceful stop errors
}
finally
{
Expand Down
124 changes: 75 additions & 49 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
class AuditIngestion : BackgroundService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

Expand Down Expand Up @@ -59,23 +59,15 @@ public AuditIngestion(

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());

public async Task StopAsync(CancellationToken cancellationToken)
{
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;

if (transportInfrastructure != null)
{
await transportInfrastructure.Shutdown(cancellationToken);
}
watchdog = new Watchdog(
"audit message ingestion",
EnsureStarted,
EnsureStopped,
ingestionState.ReportError,
ingestionState.Clear,
settings.TimeToRestartAuditIngestionAfterFailure,
logger
);
}

Task OnCriticalError(string failure, Exception exception)
Expand Down Expand Up @@ -139,7 +131,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
// ignored
logger.Info("StopReceive cancelled");
}
}

Expand Down Expand Up @@ -176,7 +168,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
// ignored
logger.Info("StopReceive cancelled");
}
finally
{
Expand All @@ -202,51 +194,86 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await taskCompletionSource.Task;
}

async Task Loop()
public override async Task StartAsync(CancellationToken cancellationToken)
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
await watchdog.Start(() => applicationLifetime.StopApplication());
await base.StartAsync(cancellationToken);
}

while (await channel.Reader.WaitToReadAsync())
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
// will only enter here if there is something to read.
try
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
// will only enter here if there is something to read.
try
{
contexts.Add(context);
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
{
await auditIngestor.Ingest(contexts);
}
}
catch (Exception e)
{
// signal all message handling tasks to terminate
foreach (var context in contexts)
{
_ = context.GetTaskCompletionSource().TrySetException(e);
}

if (e is OperationCanceledException && stoppingToken.IsCancellationRequested)
{
logger.Info("Batch cancelled", e);
break;
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
logger.Info("Ingesting messages failed", e);
}
finally
{
await auditIngestor.Ingest(contexts);
contexts.Clear();
}
}
catch (OperationCanceledException)
{
//Do nothing as we are shutting down
continue;
}
catch (Exception e) // show must go on
// will fall out here when writer is completed
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// ExecuteAsync cancelled
}
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await watchdog.Stop();
channel.Writer.Complete();
await base.StopAsync(cancellationToken);
}
finally
{
if (transportInfrastructure != null)
{
if (logger.IsInfoEnabled)
try
{
logger.Info("Ingesting messages failed", e);
await transportInfrastructure.Shutdown(cancellationToken);
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
context.GetTaskCompletionSource().TrySetException(e);
logger.Info("Shutdown cancelled", e);
}
}
finally
{
contexts.Clear();
}
}
// will fall out here when writer is completed
}

TransportInfrastructure transportInfrastructure;
Expand All @@ -265,7 +292,6 @@ async Task Loop()
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
Expand Down
4 changes: 2 additions & 2 deletions src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ await failedAuditStore.ProcessFailedMessages(
Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}.");
}
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (token.IsCancellationRequested)
{
// no-op
Logger.Info("Cancelled", e);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NServiceBus;
using NServiceBus.Logging;
using Settings;

class ImportFailedAuditsCommand : AbstractCommand
{
readonly ILog logger = LogManager.GetLogger<ImportFailedAuditsCommand>();

public override async Task Execute(HostArguments args, Settings settings)
{
settings.IngestAuditMessages = false;
Expand All @@ -37,9 +40,9 @@ public override async Task Execute(HostArguments args, Settings settings)
{
await importer.Run(tokenSource.Token);
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested)
{
// no op
logger.Info("Cancelled", e);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Start()
await Task.Delay(interval, tokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//no-op
}
Expand Down
10 changes: 5 additions & 5 deletions src/ServiceControl.Infrastructure/AsyncTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback,

//Otherwise execute immediately
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// no-op
break;
}
catch (Exception ex)
{
errorCallback(ex);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// no-op
}
Expand All @@ -64,7 +64,7 @@ public async Task Stop()
return;
}

tokenSource.Cancel();
await tokenSource.CancelAsync().ConfigureAwait(false);
tokenSource.Dispose();

if (task != null)
Expand All @@ -73,7 +73,7 @@ public async Task Stop()
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//NOOP
}
Expand Down
6 changes: 3 additions & 3 deletions src/ServiceControl.Infrastructure/ReadOnlyStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio

return Task.CompletedTask;
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(e.CancellationToken);
}
Expand Down Expand Up @@ -113,7 +113,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel

return Task.FromResult(result);
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(e.CancellationToken);
}
Expand All @@ -136,7 +136,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken

return new ValueTask<int>(result);
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return new ValueTask<int>(Task.FromCanceled<int>(e.CancellationToken));
}
Expand Down
9 changes: 5 additions & 4 deletions src/ServiceControl.Infrastructure/Watchdog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public Task Start(Action onFailedOnStartup)

failedOnStartup ??= false;
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested)
{
//Do not Delay
// Continue, as OCE is not from caller
log.Info("Start cancelled, retrying...", e);
continue;
}
catch (Exception e)
Expand All @@ -81,9 +82,9 @@ public Task Start(Action onFailedOnStartup)
{
await Task.Delay(timeToWaitBetweenStartupAttempts, shutdownTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (shutdownTokenSource.IsCancellationRequested)
{
//Ignore
//Ignore, no need to log cancellation of delay
}
}
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(RemoveExpiredEndpointInstances)} timer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(ReportThroughputHostedService)} timer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
await signal.WaitHandle.WaitOneAsync(cancellationToken);
signal.Reset();
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
Expand All @@ -91,7 +91,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
}
while (!cancellationToken.IsCancellationRequested);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// ignore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public async Task ProcessFailedErrorImports(Func<FailedTransportMessage, Task> p
Logger.Debug($"Successfully re-imported failed error message {transportMessage.Id}.");
}
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
// no-op
Logger.Info("Cancelled", e);
}
catch (Exception e)
{
Expand Down
Loading

0 comments on commit 62c9b20

Please sign in to comment.