diff --git a/src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs b/src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs index fdba4d6a81..e60a4531db 100644 --- a/src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs +++ b/src/Particular.LicensingComponent/AuditThroughput/AuditThroughputCollectorHostedService.cs @@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } } while (await timer.WaitForNextTickAsync(cancellationToken)); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}"); } diff --git a/src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs b/src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs index c61740ce97..8bb88190fe 100644 --- a/src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs +++ b/src/Particular.LicensingComponent/BrokerThroughput/BrokerThroughputCollectorHostedService.cs @@ -53,7 +53,7 @@ static ReadOnlyDictionary LoadBrokerSettingValues(IEnumerable Loop(), CancellationToken.None); - } - - public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); - - public async Task StopAsync(CancellationToken cancellationToken) - { - await watchdog.Stop(); - channel.Writer.Complete(); - await ingestionWorker; - - if (transportInfrastructure != null) - { - await transportInfrastructure.Shutdown(cancellationToken); - } + watchdog = new Watchdog( + "audit message ingestion", + EnsureStarted, + EnsureStopped, + ingestionState.ReportError, + ingestionState.Clear, + settings.TimeToRestartAuditIngestionAfterFailure, + logger + ); } Task OnCriticalError(string failure, Exception exception) @@ -132,7 +124,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) } catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { - // ignored + logger.Info("StopReceive cancelled"); } } @@ -170,7 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) } catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { - // ignored + logger.Info("StopReceive cancelled"); } finally { @@ -200,57 +192,92 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati } } - async Task Loop() + public override async Task StartAsync(CancellationToken cancellationToken) { - var contexts = new List(transportSettings.MaxConcurrency.Value); + await watchdog.Start(() => applicationLifetime.StopApplication()); + await base.StartAsync(cancellationToken); + } - while (await channel.Reader.WaitToReadAsync()) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try { - // will only enter here if there is something to read. - try + var contexts = new List(transportSettings.MaxConcurrency.Value); + + while (await channel.Reader.WaitToReadAsync(stoppingToken)) { - // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) + // will only enter here if there is something to read. + try { - contexts.Add(context); + // as long as there is something to read this will fetch up to MaximumConcurrency items + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + } + + auditBatchSize.Record(contexts.Count); + + using (new DurationRecorder(auditBatchDuration)) + { + await auditIngestor.Ingest(contexts); + } + + consecutiveBatchFailuresCounter.Record(0); } + catch (Exception e) + { + // signal all message handling tasks to terminate + foreach (var context in contexts) + { + _ = context.GetTaskCompletionSource().TrySetException(e); + } + + if (e is OperationCanceledException && stoppingToken.IsCancellationRequested) + { + logger.Info("Batch cancelled", e); + break; + } - auditBatchSize.Record(contexts.Count); + logger.Info("Ingesting messages failed", e); - using (new DurationRecorder(auditBatchDuration)) + // no need to do interlocked increment since this is running sequential + consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); + } + finally { - await auditIngestor.Ingest(contexts); + contexts.Clear(); } - - consecutiveBatchFailuresCounter.Record(0); } - catch (OperationCanceledException) - { - //Do nothing as we are shutting down - continue; - } - catch (Exception e) // show must go on + // will fall out here when writer is completed + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // ExecuteAsync cancelled + } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + try + { + await watchdog.Stop(); + channel.Writer.Complete(); + await base.StopAsync(cancellationToken); + } + finally + { + if (transportInfrastructure != null) { - if (logger.IsInfoEnabled) + try { - logger.Info("Ingesting messages failed", e); + await transportInfrastructure.Shutdown(cancellationToken); } - - // signal all message handling tasks to terminate - foreach (var context in contexts) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { - context.GetTaskCompletionSource().TrySetException(e); + logger.Info("Shutdown cancelled", e); } - - // no need to do interlocked increment since this is running sequential - consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); - } - finally - { - contexts.Clear(); } } - // will fall out here when writer is completed } TransportInfrastructure transportInfrastructure; @@ -273,7 +300,6 @@ async Task Loop() readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; - readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; static readonly ILog logger = LogManager.GetLogger(); diff --git a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs index 7da46df703..5e51add78a 100644 --- a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs +++ b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs @@ -48,9 +48,9 @@ await failedAuditStore.ProcessFailedMessages( Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}."); } } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (token.IsCancellationRequested) { - // no-op + Logger.Info("Cancelled", e); } catch (Exception e) { diff --git a/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs b/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs index a3ef6d289e..3ddc7ef055 100644 --- a/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs +++ b/src/ServiceControl.Audit/Infrastructure/Hosting/Commands/ImportFailedAuditsCommand.cs @@ -7,10 +7,13 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NServiceBus; + using NServiceBus.Logging; using Settings; class ImportFailedAuditsCommand : AbstractCommand { + readonly ILog logger = LogManager.GetLogger(); + public override async Task Execute(HostArguments args, Settings settings) { settings.IngestAuditMessages = false; @@ -37,9 +40,9 @@ public override async Task Execute(HostArguments args, Settings settings) { await importer.Run(tokenSource.Token); } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested) { - // no op + logger.Info("Cancelled", e); } finally { diff --git a/src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs b/src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs index 2b9e1ec97f..73dd600550 100644 --- a/src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs +++ b/src/ServiceControl.Infrastructure.Metrics/MetricsReporter.cs @@ -32,7 +32,7 @@ public void Start() await Task.Delay(interval, tokenSource.Token).ConfigureAwait(false); } } - catch (OperationCanceledException) + catch (OperationCanceledException) when (tokenSource.IsCancellationRequested) { //no-op } diff --git a/src/ServiceControl.Infrastructure/AsyncTimer.cs b/src/ServiceControl.Infrastructure/AsyncTimer.cs index 73dc2cf8be..34aa3b62b9 100644 --- a/src/ServiceControl.Infrastructure/AsyncTimer.cs +++ b/src/ServiceControl.Infrastructure/AsyncTimer.cs @@ -40,9 +40,9 @@ public TimerJob(Func> callback, //Otherwise execute immediately } - catch (OperationCanceledException) + catch (OperationCanceledException) when (token.IsCancellationRequested) { - // no-op + break; } catch (Exception ex) { @@ -50,7 +50,7 @@ public TimerJob(Func> callback, } } } - catch (OperationCanceledException) + catch (OperationCanceledException) when (token.IsCancellationRequested) { // no-op } @@ -64,7 +64,7 @@ public async Task Stop() return; } - tokenSource.Cancel(); + await tokenSource.CancelAsync().ConfigureAwait(false); tokenSource.Dispose(); if (task != null) @@ -73,7 +73,7 @@ public async Task Stop() { await task.ConfigureAwait(false); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (tokenSource.IsCancellationRequested) { //NOOP } diff --git a/src/ServiceControl.Infrastructure/ReadOnlyStream.cs b/src/ServiceControl.Infrastructure/ReadOnlyStream.cs index fc6b57ee94..6c5983bfae 100644 --- a/src/ServiceControl.Infrastructure/ReadOnlyStream.cs +++ b/src/ServiceControl.Infrastructure/ReadOnlyStream.cs @@ -51,7 +51,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio return Task.CompletedTask; } - catch (OperationCanceledException e) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(e.CancellationToken); } @@ -113,7 +113,7 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel return Task.FromResult(result); } - catch (OperationCanceledException e) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(e.CancellationToken); } @@ -136,7 +136,7 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken return new ValueTask(result); } - catch (OperationCanceledException e) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { return new ValueTask(Task.FromCanceled(e.CancellationToken)); } diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index 0fe0641892..c91ae2b3bf 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -54,9 +54,10 @@ public Task Start(Action onFailedOnStartup) failedOnStartup ??= false; } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested) { - //Do not Delay + // Continue, as OCE is not from caller + log.Info("Start cancelled, retrying...", e); continue; } catch (Exception e) @@ -81,9 +82,9 @@ public Task Start(Action onFailedOnStartup) { await Task.Delay(timeToWaitBetweenStartupAttempts, shutdownTokenSource.Token).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (shutdownTokenSource.IsCancellationRequested) { - //Ignore + //Ignore, no need to log cancellation of delay } } try diff --git a/src/ServiceControl.Monitoring/Infrastructure/RemoveExpiredEndpointInstances.cs b/src/ServiceControl.Monitoring/Infrastructure/RemoveExpiredEndpointInstances.cs index d23460a26e..991cf534e3 100644 --- a/src/ServiceControl.Monitoring/Infrastructure/RemoveExpiredEndpointInstances.cs +++ b/src/ServiceControl.Monitoring/Infrastructure/RemoveExpiredEndpointInstances.cs @@ -48,7 +48,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } } while (await timer.WaitForNextTickAsync(cancellationToken)); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogInformation($"Stopping {nameof(RemoveExpiredEndpointInstances)} timer"); } diff --git a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs index 70606ed4c0..fb26a708c3 100644 --- a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs +++ b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs @@ -42,7 +42,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } } while (await timer.WaitForNextTickAsync(cancellationToken)); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogInformation($"Stopping {nameof(ReportThroughputHostedService)} timer"); } diff --git a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs index 0535dc05b2..ba8f8ec646 100644 --- a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs @@ -82,7 +82,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken) await signal.WaitHandle.WaitOneAsync(cancellationToken); signal.Reset(); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { break; } @@ -91,7 +91,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken) } while (!cancellationToken.IsCancellationRequested); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // ignore } diff --git a/src/ServiceControl.Persistence.RavenDB/FailedErrorImportDataStore.cs b/src/ServiceControl.Persistence.RavenDB/FailedErrorImportDataStore.cs index d1723adaa2..440468a790 100644 --- a/src/ServiceControl.Persistence.RavenDB/FailedErrorImportDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDB/FailedErrorImportDataStore.cs @@ -35,9 +35,9 @@ public async Task ProcessFailedErrorImports(Func p Logger.Debug($"Successfully re-imported failed error message {transportMessage.Id}."); } } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { - // no-op + Logger.Info("Cancelled", e); } catch (Exception e) { diff --git a/src/ServiceControl.RavenDB/EmbeddedDatabase.cs b/src/ServiceControl.RavenDB/EmbeddedDatabase.cs index 9e4b711b0b..afadda76ea 100644 --- a/src/ServiceControl.RavenDB/EmbeddedDatabase.cs +++ b/src/ServiceControl.RavenDB/EmbeddedDatabase.cs @@ -110,7 +110,7 @@ void Start(ServerOptions serverOptions) Logger.Info("RavenDB server process restarted successfully."); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (shutdownCancellationToken.IsCancellationRequested) { //no-op } diff --git a/src/ServiceControl.Transports.ASBS/QueueLengthProvider.cs b/src/ServiceControl.Transports.ASBS/QueueLengthProvider.cs index df19845a11..24021da49a 100644 --- a/src/ServiceControl.Transports.ASBS/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.ASBS/QueueLengthProvider.cs @@ -43,7 +43,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) UpdateAllQueueLengths(queueRuntimeInfos); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl.Transports.ASQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.ASQ/QueueLengthProvider.cs index 32e55e3bfd..58a7b5783d 100644 --- a/src/ServiceControl.Transports.ASQ/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.ASQ/QueueLengthProvider.cs @@ -42,7 +42,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(QueryDelayInterval, stoppingToken); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } @@ -67,7 +67,7 @@ async Task FetchLength(QueueLengthValue queueLength, CancellationToken cancellat problematicQueuesNames.TryRemove(queueLength.QueueName, out _); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl.Transports.Learning/QueueLengthProvider.cs b/src/ServiceControl.Transports.Learning/QueueLengthProvider.cs index 931c989f20..68959ada38 100644 --- a/src/ServiceControl.Transports.Learning/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.Learning/QueueLengthProvider.cs @@ -30,7 +30,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(QueryDelayInterval, stoppingToken); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // It's OK. We're shutting down } diff --git a/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs index 743f31e7e1..d58b5c526c 100644 --- a/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs @@ -49,7 +49,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) UpdateQueueLengthStore(); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs index 9656ef9065..e7e97025c7 100644 --- a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs @@ -39,7 +39,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(QueryDelayInterval, stoppingToken); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } @@ -141,7 +141,7 @@ public async Task Execute(Action action, CancellationToken cancellationT action(model); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl.Transports.SQS/QueueLengthProvider.cs b/src/ServiceControl.Transports.SQS/QueueLengthProvider.cs index cc733d2eed..431d5358ac 100644 --- a/src/ServiceControl.Transports.SQS/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SQS/QueueLengthProvider.cs @@ -65,7 +65,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.Delay(QueryDelayInterval, stoppingToken); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } @@ -104,7 +104,7 @@ async Task FetchLength(string queue, IAmazonSQS client, QueueAttributesRequestCa var response = await client.GetQueueAttributesAsync(attReq, cancellationToken); sizes[queue] = response.ApproximateNumberOfMessages; } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs index bfcd15062e..196905a678 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs @@ -47,7 +47,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) UpdateQueueLengthStore(); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // no-op } diff --git a/src/ServiceControl/CompositeViews/Messages/ScatterGatherApi.cs b/src/ServiceControl/CompositeViews/Messages/ScatterGatherApi.cs index a3b60f9a8b..57735cd247 100644 --- a/src/ServiceControl/CompositeViews/Messages/ScatterGatherApi.cs +++ b/src/ServiceControl/CompositeViews/Messages/ScatterGatherApi.cs @@ -125,7 +125,7 @@ async Task> FetchAndParse(HttpClient httpClient, string pathAn httpRequestException); return QueryResult.Empty(); } - catch (OperationCanceledException) + catch (OperationCanceledException) // Intentional, used to gracefully handle timeout { logger.Warn($"Failed to query remote instance at {remoteInstanceSetting.BaseAddress} due to a timeout"); return QueryResult.Empty(); diff --git a/src/ServiceControl/CustomChecks/InternalCustomChecks/InternalCustomCheckManager.cs b/src/ServiceControl/CustomChecks/InternalCustomChecks/InternalCustomCheckManager.cs index 2ed431e753..38853a667c 100644 --- a/src/ServiceControl/CustomChecks/InternalCustomChecks/InternalCustomCheckManager.cs +++ b/src/ServiceControl/CustomChecks/InternalCustomChecks/InternalCustomCheckManager.cs @@ -40,9 +40,9 @@ async Task Run(CancellationToken cancellationToken) { result = await check.PerformCheck(cancellationToken); } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { - //Do nothing as we are shutting down + Logger.Info("Cancelled", e); } catch (Exception ex) { diff --git a/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs b/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs index 585711b497..e9b89213e0 100644 --- a/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs +++ b/src/ServiceControl/Hosting/Commands/ImportFailedErrorsCommand.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NServiceBus; + using NServiceBus.Logging; using Operations; using Particular.ServiceControl; using Particular.ServiceControl.Hosting; @@ -14,6 +15,8 @@ class ImportFailedErrorsCommand : AbstractCommand { + readonly ILog Log = LogManager.GetLogger(); + public override async Task Execute(HostArguments args, Settings settings) { settings.IngestErrorMessages = false; @@ -38,9 +41,9 @@ public override async Task Execute(HostArguments args, Settings settings) { await importFailedErrors.Run(tokenSource.Token); } - catch (OperationCanceledException) + catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested) { - // no-op + Log.Info("Cancelled", e); } finally { diff --git a/src/ServiceControl/Infrastructure/Metrics/MetricsReporterHostedService.cs b/src/ServiceControl/Infrastructure/Metrics/MetricsReporterHostedService.cs index 96e1822cf4..bf886d712f 100644 --- a/src/ServiceControl/Infrastructure/Metrics/MetricsReporterHostedService.cs +++ b/src/ServiceControl/Infrastructure/Metrics/MetricsReporterHostedService.cs @@ -30,7 +30,7 @@ public async Task StopAsync(CancellationToken cancellationToken) { await reporter.Stop(); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { //NOOP } diff --git a/src/ServiceControl/Monitoring/HeartbeatEndpointSettingsSyncHostedService.cs b/src/ServiceControl/Monitoring/HeartbeatEndpointSettingsSyncHostedService.cs index 12d79c00a3..31898ecbbb 100644 --- a/src/ServiceControl/Monitoring/HeartbeatEndpointSettingsSyncHostedService.cs +++ b/src/ServiceControl/Monitoring/HeartbeatEndpointSettingsSyncHostedService.cs @@ -45,7 +45,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } } while (await timer.WaitForNextTickAsync(cancellationToken)); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogInformation($"Stopping {nameof(HeartbeatEndpointSettingsSyncHostedService)}"); } diff --git a/src/ServiceControl/Monitoring/HeartbeatMonitoringHostedService.cs b/src/ServiceControl/Monitoring/HeartbeatMonitoringHostedService.cs index be11f2f486..a3d86a01ba 100644 --- a/src/ServiceControl/Monitoring/HeartbeatMonitoringHostedService.cs +++ b/src/ServiceControl/Monitoring/HeartbeatMonitoringHostedService.cs @@ -30,9 +30,9 @@ public async Task StopAsync(CancellationToken cancellationToken) { await timer.Stop(); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { - //NOOP + //NOOP, invoked Stop does not } } diff --git a/src/ServiceControl/Operations/CheckRemotes.cs b/src/ServiceControl/Operations/CheckRemotes.cs index 508bb0dee7..1aad310ba5 100644 --- a/src/ServiceControl/Operations/CheckRemotes.cs +++ b/src/ServiceControl/Operations/CheckRemotes.cs @@ -73,7 +73,11 @@ static async Task CheckSuccessStatusCode(IHttpClientFactory httpClientFactory, R remoteSettings.TemporarilyUnavailable = true; throw new TimeoutException($"The remote instance at '{remoteSettings.BaseAddress}' doesn't seem to be available. It will be temporarily disabled. Reason: {e.Message}", e); } - catch (OperationCanceledException e) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Cancelled, noop + } + catch (OperationCanceledException e) // Intentional, OCE gracefully handled by other catch { remoteSettings.TemporarilyUnavailable = true; throw new TimeoutException($"The remote at '{remoteSettings.BaseAddress}' did not respond within the allotted time of '{queryTimeout}'. It will be temporarily disabled.", e); diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index f7416c8cb8..00676b95a7 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -17,7 +17,7 @@ using ServiceBus.Management.Infrastructure.Settings; using Transports; - class ErrorIngestion : IHostedService + class ErrorIngestion : BackgroundService { static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; @@ -59,69 +59,97 @@ public ErrorIngestion( errorHandlingPolicy = new ErrorIngestionFaultPolicy(dataStore, settings.LoggingSettings, OnCriticalError); - watchdog = new Watchdog("failed message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartErrorIngestionAfterFailure, Logger); - - ingestionWorker = Task.Run(() => Loop(), CancellationToken.None); + watchdog = new Watchdog( + "failed message ingestion", + EnsureStarted, + EnsureStopped, + ingestionState.ReportError, + ingestionState.Clear, + settings.TimeToRestartErrorIngestionAfterFailure, + Logger + ); } - public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); - - public async Task StopAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { - await watchdog.Stop(); - channel.Writer.Complete(); - await ingestionWorker; - if (transportInfrastructure != null) - { - await transportInfrastructure.Shutdown(cancellationToken); - } + await watchdog.Start(() => applicationLifetime.StopApplication()); + await base.StartAsync(cancellationToken); } - async Task Loop() + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var contexts = new List(transportSettings.MaxConcurrency.Value); - - while (await channel.Reader.WaitToReadAsync()) + try { - // will only enter here if there is something to read. - try + var contexts = new List(transportSettings.MaxConcurrency.Value); + + while (await channel.Reader.WaitToReadAsync(stoppingToken)) { - // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) + // will only enter here if there is something to read. + try { - contexts.Add(context); + // as long as there is something to read this will fetch up to MaximumConcurrency items + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + } + + batchSizeMeter.Mark(contexts.Count); + using (batchDurationMeter.Measure()) + { + await ingestor.Ingest(contexts); + } } + catch (Exception e) + { + // signal all message handling tasks to terminate + foreach (var context in contexts) + { + _ = context.GetTaskCompletionSource().TrySetException(e); + } + + if (e is OperationCanceledException && stoppingToken.IsCancellationRequested) + { + Logger.Info("Batch cancelled", e); + break; + } - batchSizeMeter.Mark(contexts.Count); - using (batchDurationMeter.Measure()) + Logger.Info("Ingesting messages failed", e); + } + finally { - await ingestor.Ingest(contexts); + contexts.Clear(); } } - catch (OperationCanceledException) - { - //Do nothing as we are shutting down - continue; - } - catch (Exception e) // show must go on + // will fall out here when writer is completed + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // ExecuteAsync cancelled + } + } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + try + { + await watchdog.Stop(); + channel.Writer.Complete(); + await base.StopAsync(cancellationToken); + } + finally + { + if (transportInfrastructure != null) { - if (Logger.IsInfoEnabled) + try { - Logger.Info("Ingesting messages failed", e); + await transportInfrastructure.Shutdown(cancellationToken); } - - // signal all message handling tasks to terminate - foreach (var context in contexts) + catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested) { - context.GetTaskCompletionSource().TrySetException(e); + Logger.Info("Shutdown cancelled", e); } } - finally - { - contexts.Clear(); - } } - // will fall out here when writer is completed } async Task EnsureStarted(CancellationToken cancellationToken = default) @@ -236,13 +264,13 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) readonly TransportSettings transportSettings; readonly Watchdog watchdog; readonly Channel channel; - readonly Task ingestionWorker; readonly Meter batchDurationMeter; readonly Meter batchSizeMeter; readonly Counter receivedMeter; readonly ErrorIngestor ingestor; readonly IIngestionUnitOfWorkFactory unitOfWorkFactory; readonly IHostApplicationLifetime applicationLifetime; + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file