Skip to content

Commit

Permalink
Emit metrics via otel instead of custom format (#4762)
Browse files Browse the repository at this point in the history
* Emit metrics via otel instead of custom format

* Minimize diff by reverting whitespace changes

* Approvals

* Fix formatting

* Apply suggestions from code review

Co-authored-by: Ramon Smits <[email protected]>

* fix formatting

* Only use standar otel

* Set instance id

* Set unit

* Better metrics names

* Emit body size

* Fix meter name

* Log that OpenTelemetry metrics exporter is enabled

* Stop using prefixes

* Better name

* Go back to using instance name as service name

* better metrics names

* More cleanup

* Revert app.config

* Add duration and failure counters

* Add consecutive batch failures

* Fix namespace for metrics

* Skip interlocked increment

* Added descriptions for metric instruments

* Metric setup moved to extension method

---------

Co-authored-by: Ramon Smits <[email protected]>
  • Loading branch information
andreasohlund and ramonsmits authored Feb 10, 2025
1 parent c9a8215 commit b8c6fbf
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 189 deletions.
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
<PackageVersion Include="NUnit" Version="4.3.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="5.0.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
<PackageVersion Include="Particular.Licensing.Sources" Version="6.0.1" />
<PackageVersion Include="Particular.LicensingComponent.Report" Version="1.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context)
{
var id = messageContext.NativeMessageId;
var headers = messageContext.Headers;

var log = NServiceBus.Logging.LogManager.GetLogger<ServiceControlComponentRunner>();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"ApiUrl": "http://localhost:8888/api",
"Port": 8888,
"PrintMetrics": false,
"OtlpEndpointUrl": null,
"Hostname": "localhost",
"VirtualDirectory": "",
"TransportType": "LearningTransport",
Expand Down
1 change: 0 additions & 1 deletion src/ServiceControl.Audit/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ These settings are only here so that we can debug ServiceControl while developin
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
<add key="ServiceControl.Audit/HostName" value="localhost" />
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />

<!-- DEVS - Pick a transport to run Auditing instance on -->
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->
Expand Down
54 changes: 31 additions & 23 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -14,18 +14,14 @@
using Persistence;
using Persistence.UnitOfWork;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

public AuditIngestion(
Settings settings,
ITransportCustomization transportCustomization,
TransportSettings transportSettings,
Metrics metrics,
IFailedAuditStorage failedImportsStorage,
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
Expand All @@ -40,10 +36,6 @@ public AuditIngestion(
this.settings = settings;
this.applicationLifetime = applicationLifetime;

batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
receivedMeter = metrics.GetCounter("Audit ingestion - received");

if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
Expand Down Expand Up @@ -102,6 +94,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}

return;
}

Expand Down Expand Up @@ -168,6 +161,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}

var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
Expand All @@ -188,18 +182,22 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)

async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
using (new DurationRecorder(ingestionDuration))
{
return;
}
if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
return;
}

var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

receivedMeter.Mark();
await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;

await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
ingestedMessagesCounter.Add(1);
messageSize.Record(messageContext.Body.Length / 1024.0);
}
}

async Task Loop()
Expand All @@ -217,11 +215,14 @@ async Task Loop()
contexts.Add(context);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
auditBatchSize.Record(contexts.Count);

using (new DurationRecorder(auditBatchDuration))
{
await auditIngestor.Ingest(contexts);
}

consecutiveBatchFailuresCounter.Record(0);
}
catch (OperationCanceledException)
{
Expand All @@ -240,6 +241,9 @@ async Task Loop()
{
context.GetTaskCompletionSource().TrySetException(e);
}

// no need to do interlocked increment since this is running sequential
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
Expand All @@ -251,8 +255,9 @@ async Task Loop()

TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
long consecutiveBatchFailures = 0;

readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
readonly TransportSettings transportSettings;
Expand All @@ -261,9 +266,12 @@ async Task Loop()
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Meter batchSizeMeter;
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Histogram<long> auditBatchSize = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size");
readonly Histogram<double> auditBatchDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
readonly Histogram<double> messageSize = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size");
readonly Counter<long> ingestedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count");
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Versioning;
Expand Down Expand Up @@ -37,10 +38,13 @@ public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, Cancella
//Same as recoverability policy in NServiceBusFactory
if (errorContext.ImmediateProcessingFailures < 3)
{
retryCounter.Add(1);
return ErrorHandleResult.RetryRequired;
}

await StoreFailedMessageDocument(errorContext, cancellationToken);

failedCounter.Add(1);
return ErrorHandleResult.Handled;
}

Expand Down Expand Up @@ -100,6 +104,9 @@ void WriteToEventLog(string message)
EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error);
}

readonly Counter<long> retryCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "retry"), description: "Audit ingestion retries count");
readonly Counter<long> failedCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "failed"), description: "Audit ingestion failure count");

static readonly ILog log = LogManager.GetLogger<AuditIngestionFaultPolicy>();
}
}
49 changes: 11 additions & 38 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Tasks;
using Infrastructure.Settings;
Expand All @@ -14,13 +14,11 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure.Metrics;
using ServiceControl.Transports;

public class AuditIngestor
{
public AuditIngestor(
Metrics metrics,
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
Expand All @@ -32,50 +30,28 @@ ITransportCustomization transportCustomization
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;

var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit");
var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit");
var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds);
var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds);
var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds);

var enrichers = new IEnrichImportedAuditMessages[]
{
new MessageTypeEnricher(),
new EnrichWithTrackingIds(),
new ProcessingStatisticsEnricher(),
new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring),
new DetectSuccessfulRetriesEnricher(),
new SagaRelationshipsEnricher()
}.Concat(auditEnrichers).ToArray();
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();

logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);

auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
auditPersister = new AuditPersister(
unitOfWorkFactory,
enrichers,
messageSession,
messageDispatcher
);
}

public async Task Ingest(List<MessageContext> contexts)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Ingesting {contexts.Count} message contexts");
}

var stored = await auditPersister.Persist(contexts);

try
{
if (settings.ForwardAuditMessages)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Forwarding {stored.Count} messages");
}
await Forward(stored, logQueueAddress);
if (Log.IsDebugEnabled)
{
Log.Debug("Forwarded messages");
}
forwardedMessagesCounter.Add(stored.Count);
}

foreach (var context in contexts)
Expand All @@ -85,10 +61,7 @@ public async Task Ingest(List<MessageContext> contexts)
}
catch (Exception e)
{
if (Log.IsWarnEnabled)
{
Log.Warn("Forwarding messages failed", e);
}
Log.Warn("Forwarding messages failed", e);

// making sure to rethrow so that all messages get marked as failed
throw;
Expand Down Expand Up @@ -158,8 +131,8 @@ public async Task VerifyCanReachForwardingAddress()
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;
readonly Counter<long> forwardedMessagesCounter = Telemetry.Meter.CreateCounter<long>(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count");

static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}
Loading

0 comments on commit b8c6fbf

Please sign in to comment.