From 1cadb61e566c3fc9f7cd6665113704db95312d2f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 30 Aug 2023 11:51:22 +0200 Subject: [PATCH 1/3] Add APM server ingestion channel Added `ApmChannel` class that extends the `TransportChannelBase` to provide event ingestion to APM server, which sends V2 intake API data. Also updated the implementation of `IMetricSet`, `ITransaction`, `IError` and `ISpan` to include an `IIntakeRoot` interface which is used to define the incoming root JSON structure to APM server. Updated several dependency versions in some projects. Also added reference to `Elastic.Ingest.Transport` in `Elastic.Apm.csproj`, enabling data transport using our shared infrastructure. Added new files `ApmChannelOptions` and `IIntakeRoot` to manage channel options and declare the interface for incoming JSON data structure respectively. `IntakeErrorItem` and `EventIntakeResponse` are created to handle responses from ingestion processes. --- build/scripts/Build.fs | 1 + src/Elastic.Apm/Api/IError.cs | 2 +- src/Elastic.Apm/Api/IIntakeRoot.cs | 8 + src/Elastic.Apm/Api/IMetricSet.cs | 2 +- src/Elastic.Apm/Api/ISpan.cs | 2 +- src/Elastic.Apm/Api/ITransaction.cs | 2 +- src/Elastic.Apm/Elastic.Apm.csproj | 4 +- src/Elastic.Apm/Ingest/ApmChannel.cs | 137 ++++++++++++++++++ src/Elastic.Apm/Ingest/ApmChannelOptions.cs | 18 +++ src/Elastic.Apm/Ingest/IngestResponse.cs | 34 +++++ .../Elastic.Ingest.Apm.csproj | 21 +++ .../Elastic.Apm.StartupHook.Loader.csproj | 7 +- .../Elastic.AzureFunctionApp.InProcess.csproj | 2 +- .../Elastic.AzureFunctionApp.Isolated.csproj | 8 +- 14 files changed, 236 insertions(+), 12 deletions(-) create mode 100644 src/Elastic.Apm/Api/IIntakeRoot.cs create mode 100644 src/Elastic.Apm/Ingest/ApmChannel.cs create mode 100644 src/Elastic.Apm/Ingest/ApmChannelOptions.cs create mode 100644 src/Elastic.Apm/Ingest/IngestResponse.cs create mode 100644 src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj diff --git a/build/scripts/Build.fs b/build/scripts/Build.fs index 79442de1b..bddb5dd61 100644 --- a/build/scripts/Build.fs +++ b/build/scripts/Build.fs @@ -21,6 +21,7 @@ open Tooling module Build = + //TODO remove oldDiagnosticSourceVersion let private oldDiagnosticSourceVersion = SemVer.parse "4.6.0" let private diagnosticSourceVersion6 = SemVer.parse "6.0.0" diff --git a/src/Elastic.Apm/Api/IError.cs b/src/Elastic.Apm/Api/IError.cs index 5bee5569b..d20e5942c 100644 --- a/src/Elastic.Apm/Api/IError.cs +++ b/src/Elastic.Apm/Api/IError.cs @@ -12,7 +12,7 @@ namespace Elastic.Apm.Api /// Represents an error which was captured by the agent. /// [Specification("error.json")] - public interface IError + public interface IError : IIntakeRoot { /// /// The culprit that caused this error. diff --git a/src/Elastic.Apm/Api/IIntakeRoot.cs b/src/Elastic.Apm/Api/IIntakeRoot.cs new file mode 100644 index 000000000..0da34b588 --- /dev/null +++ b/src/Elastic.Apm/Api/IIntakeRoot.cs @@ -0,0 +1,8 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Apm.Api; + +public interface IIntakeRoot { } diff --git a/src/Elastic.Apm/Api/IMetricSet.cs b/src/Elastic.Apm/Api/IMetricSet.cs index 3cc25c7e8..ee862cfeb 100644 --- a/src/Elastic.Apm/Api/IMetricSet.cs +++ b/src/Elastic.Apm/Api/IMetricSet.cs @@ -12,7 +12,7 @@ namespace Elastic.Apm.Api /// Data captured by the agent representing a metric occurring in a monitored service /// [Specification("metricset.json")] - public interface IMetricSet + public interface IMetricSet : IIntakeRoot { /// /// List of captured metrics as key - value pairs diff --git a/src/Elastic.Apm/Api/ISpan.cs b/src/Elastic.Apm/Api/ISpan.cs index 6c42f2bab..902fee066 100644 --- a/src/Elastic.Apm/Api/ISpan.cs +++ b/src/Elastic.Apm/Api/ISpan.cs @@ -10,7 +10,7 @@ namespace Elastic.Apm.Api /// An event captured by an agent occurring in a monitored service /// [Specification("span.json")] - public interface ISpan : IExecutionSegment + public interface ISpan : IExecutionSegment, IIntakeRoot { /// /// The action of the span. diff --git a/src/Elastic.Apm/Api/ITransaction.cs b/src/Elastic.Apm/Api/ITransaction.cs index 14cd5b61e..8ca4171f0 100644 --- a/src/Elastic.Apm/Api/ITransaction.cs +++ b/src/Elastic.Apm/Api/ITransaction.cs @@ -19,7 +19,7 @@ namespace Elastic.Apm.Api /// provide different transaction implementations. /// [Specification("transaction.json")] - public interface ITransaction : IExecutionSegment + public interface ITransaction : IExecutionSegment, IIntakeRoot { /// /// Contains data related to FaaS (Function as a Service) events. diff --git a/src/Elastic.Apm/Elastic.Apm.csproj b/src/Elastic.Apm/Elastic.Apm.csproj index 3570a8165..7844238ba 100644 --- a/src/Elastic.Apm/Elastic.Apm.csproj +++ b/src/Elastic.Apm/Elastic.Apm.csproj @@ -68,7 +68,8 @@ - + + diff --git a/src/Elastic.Apm/Ingest/ApmChannel.cs b/src/Elastic.Apm/Ingest/ApmChannel.cs new file mode 100644 index 000000000..c9ba16295 --- /dev/null +++ b/src/Elastic.Apm/Ingest/ApmChannel.cs @@ -0,0 +1,137 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Apm.Api; +using Elastic.Apm.Report; +using Elastic.Channels; +using Elastic.Ingest.Transport; +using Elastic.Transport; + +namespace Elastic.Apm.Ingest; + +internal static class ApmChannelStatics +{ + public static readonly byte[] LineFeed = { (byte)'\n' }; + + public static readonly DefaultRequestParameters RequestParams = new() + { + RequestConfiguration = new RequestConfiguration { ContentType = "application/x-ndjson" } + }; + + public static readonly JsonSerializerOptions SerializerOptions = new() + { + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, MaxDepth = 64, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, + }; +} + +/// +/// An implementation that sends V2 intake API data +/// to APM server. +/// +public class ApmChannel + : TransportChannelBase + , IPayloadSender +{ + /// + public ApmChannel(ApmChannelOptions options) : base(options) { } + + void IPayloadSender.QueueError(IError error) => TryWrite(error); + + void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics); + + void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span); + + void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction); + + //retry if APM server returns 429 + /// + protected override bool Retry(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429; + + /// + protected override bool RetryAllItems(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429; + + //APM does not return the status for all events sent. Therefor we always return an empty set for individual items to retry + /// + protected override List<(IIntakeRoot, IntakeErrorItem)> Zip(EventIntakeResponse response, IReadOnlyCollection page) => + _emptyZip; + + private readonly List<(IIntakeRoot, IntakeErrorItem)> _emptyZip = new(); + + /// + protected override bool RetryEvent((IIntakeRoot, IntakeErrorItem) @event) => false; + + /// + protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false; + + /// + protected override Task ExportAsync(HttpTransport transport, ArraySegment page, CancellationToken ctx = default) => + transport.RequestAsync(HttpMethod.POST, "/intake/v2/events", + PostData.StreamHandler(page, + (_, _) => + { + /* NOT USED */ + }, + async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, ctx).ConfigureAwait(false); }) + , ApmChannelStatics.RequestParams, ctx); + + private async Task WriteStanzaToStreamAsync(Stream stream, CancellationToken ctx) + { + // {"metadata":{"process":{"pid":1234,"title":"/usr/lib/jvm/java-10-openjdk-amd64/bin/java","ppid":1,"argv":["-v"]}, + // "system":{"architecture":"amd64","detected_hostname":"8ec7ceb99074","configured_hostname":"host1","platform":"Linux","container":{"id":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"}, + // "kubernetes":{"namespace":"default","pod":{"uid":"b17f231da0ad128dc6c6c0b2e82f6f303d3893e3","name":"instrumented-java-service"},"node":{"name":"node-name"}}}, + // "service":{"name":"1234_service-12a3","version":"4.3.0","node":{"configured_name":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"},"environment":"production","language":{"name":"Java","version":"10.0.2"}, + // "agent":{"version":"1.10.0","name":"java","ephemeral_id":"e71be9ac-93b0-44b9-a997-5638f6ccfc36"},"framework":{"name":"spring","version":"5.0.0"},"runtime":{"name":"Java","version":"10.0.2"}},"labels":{"group":"experimental","ab_testing":true,"segment":5}}} + // TODO cache + var p = Process.GetCurrentProcess(); + var metadata = new + { + metadata = new + { + process = new { pid = p.Id, title = p.ProcessName }, + service = new + { + name = System.Text.RegularExpressions.Regex.Replace(p.ProcessName, "[^a-zA-Z0-9 _-]", "_"), + version = "1.0.0", + agent = new { name = "dotnet", version = "0.0.1" } + } + } + }; + await JsonSerializer.SerializeAsync(stream, metadata, metadata.GetType(), ApmChannelStatics.SerializerOptions, ctx) + .ConfigureAwait(false); + await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false); + } + + private async Task WriteBufferToStreamAsync(IReadOnlyCollection b, Stream stream, CancellationToken ctx) + { + await WriteStanzaToStreamAsync(stream, ctx).ConfigureAwait(false); + foreach (var @event in b) + { + if (@event == null) continue; + + var type = @event switch + { + ITransaction => "transaction", + ISpan => "span", + IError => "error", + IMetricSet => "metricset", + _ => "unknown" + }; + var dictionary = new Dictionary() { { type, @event } }; + + await JsonSerializer.SerializeAsync(stream, dictionary, dictionary.GetType(), ApmChannelStatics.SerializerOptions, ctx) + .ConfigureAwait(false); + + await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false); + } + } +} diff --git a/src/Elastic.Apm/Ingest/ApmChannelOptions.cs b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs new file mode 100644 index 000000000..835b24ca2 --- /dev/null +++ b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs @@ -0,0 +1,18 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Apm.Api; +using Elastic.Ingest.Transport; +using Elastic.Transport; + +namespace Elastic.Apm.Ingest; + +/// +/// Channel options for +/// +public class ApmChannelOptions : TransportChannelOptionsBase +{ + /// + public ApmChannelOptions(HttpTransport transport) : base(transport) { } +} diff --git a/src/Elastic.Apm/Ingest/IngestResponse.cs b/src/Elastic.Apm/Ingest/IngestResponse.cs new file mode 100644 index 000000000..c8321af9e --- /dev/null +++ b/src/Elastic.Apm/Ingest/IngestResponse.cs @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections.Generic; +using System.Text.Json.Serialization; +using Elastic.Transport; + +namespace Elastic.Apm.Ingest; + +/// +public class EventIntakeResponse : TransportResponse +{ + /// + [JsonPropertyName("accepted")] + public long Accepted { get; set; } + + /// + [JsonPropertyName("errors")] + //[JsonConverter(typeof(ResponseItemsConverter))] + public IReadOnlyCollection Errors { get; set; } = null!; +} + +/// +public class IntakeErrorItem +{ + /// + [JsonPropertyName("message")] + public string Message { get; set; } = null!; + + /// + [JsonPropertyName("document")] + public string Document { get; set; } = null!; +} diff --git a/src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj b/src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj new file mode 100644 index 000000000..fe7a4ff17 --- /dev/null +++ b/src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj @@ -0,0 +1,21 @@ + + + + net7.0 + enable + enable + + + + + + + + + + + + + + + diff --git a/src/startuphook/Elastic.Apm.StartupHook.Loader/Elastic.Apm.StartupHook.Loader.csproj b/src/startuphook/Elastic.Apm.StartupHook.Loader/Elastic.Apm.StartupHook.Loader.csproj index 109cdb28d..6fa722a09 100644 --- a/src/startuphook/Elastic.Apm.StartupHook.Loader/Elastic.Apm.StartupHook.Loader.csproj +++ b/src/startuphook/Elastic.Apm.StartupHook.Loader/Elastic.Apm.StartupHook.Loader.csproj @@ -1,8 +1,7 @@ - - netcoreapp2.2 + netstandard2.0 false @@ -19,5 +18,9 @@ StartupHookLogger.cs + + + + diff --git a/test/azure/applications/Elastic.AzureFunctionApp.InProcess/Elastic.AzureFunctionApp.InProcess.csproj b/test/azure/applications/Elastic.AzureFunctionApp.InProcess/Elastic.AzureFunctionApp.InProcess.csproj index a6d937ee1..c7cf2039b 100644 --- a/test/azure/applications/Elastic.AzureFunctionApp.InProcess/Elastic.AzureFunctionApp.InProcess.csproj +++ b/test/azure/applications/Elastic.AzureFunctionApp.InProcess/Elastic.AzureFunctionApp.InProcess.csproj @@ -8,7 +8,7 @@ false - + diff --git a/test/azure/applications/Elastic.AzureFunctionApp.Isolated/Elastic.AzureFunctionApp.Isolated.csproj b/test/azure/applications/Elastic.AzureFunctionApp.Isolated/Elastic.AzureFunctionApp.Isolated.csproj index 766752f4b..1b477734a 100644 --- a/test/azure/applications/Elastic.AzureFunctionApp.Isolated/Elastic.AzureFunctionApp.Isolated.csproj +++ b/test/azure/applications/Elastic.AzureFunctionApp.Isolated/Elastic.AzureFunctionApp.Isolated.csproj @@ -8,11 +8,11 @@ false - - + + - - + + From df0e53344c3c248d597345218c799a749680fa60 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 30 Aug 2023 16:14:36 +0200 Subject: [PATCH 2/3] Make MockPayloadSender a subclass of ApmChannel to assert tests --- src/Elastic.Apm/Ingest/ApmChannel.cs | 57 ++++++- src/Elastic.Apm/Ingest/ApmChannelOptions.cs | 9 +- .../Utilities/NullableAttributes.cs | 2 + .../MockPayloadSender.cs | 145 ++++++++++-------- .../OpenTelemetryTests.cs | 26 ++-- 5 files changed, 150 insertions(+), 89 deletions(-) diff --git a/src/Elastic.Apm/Ingest/ApmChannel.cs b/src/Elastic.Apm/Ingest/ApmChannel.cs index c9ba16295..f8d080c98 100644 --- a/src/Elastic.Apm/Ingest/ApmChannel.cs +++ b/src/Elastic.Apm/Ingest/ApmChannel.cs @@ -5,13 +5,16 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Linq; using System.Text.Encodings.Web; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using Elastic.Apm.Api; +using Elastic.Apm.Logging; using Elastic.Apm.Report; using Elastic.Channels; using Elastic.Ingest.Transport; @@ -19,6 +22,7 @@ namespace Elastic.Apm.Ingest; +#nullable enable internal static class ApmChannelStatics { public static readonly byte[] LineFeed = { (byte)'\n' }; @@ -40,18 +44,56 @@ internal static class ApmChannelStatics /// public class ApmChannel : TransportChannelBase - , IPayloadSender + , IPayloadSender { + private readonly List> _transactionFilters = new(); + private readonly List> _spanFilters = new(); + private readonly List> _errorFilters = new(); + /// - public ApmChannel(ApmChannelOptions options) : base(options) { } + public ApmChannel(ApmChannelOptions options, IApmLogger? logger = null) : base(options) => + PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, null, logger ?? new TraceLogger(LogLevel.Trace)); + + public IError? Filter(IError error) => _errorFilters.Aggregate(error, (current, filter) => filter(current)!); + + public ISpan? Filter(ISpan span) => _spanFilters.Aggregate(span, (current, filter) => filter(current)!); - void IPayloadSender.QueueError(IError error) => TryWrite(error); + public ITransaction? Filter(ITransaction span) => _transactionFilters.Aggregate(span, (current, filter) => filter(current)!); + + public bool TryFilter(IError error, [NotNullWhen(true)] out IError? filtered) + { + filtered = _errorFilters.Select(f => f(error)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics); + public bool TryFilter(ISpan span, [NotNullWhen(true)] out ISpan? filtered) + { + filtered = _spanFilters.Select(f => f(span)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span); + public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransaction? filtered) + { + filtered = _transactionFilters.Select(f => f(transaction)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction); + public virtual void QueueMetrics(IMetricSet metrics) => TryWrite(metrics); + + public virtual void QueueError(IError error) + { + if (TryFilter(error, out var e)) TryWrite(e); + } + + public virtual void QueueSpan(ISpan span) + { + if (TryFilter(span, out var s)) TryWrite(s); + } + + public virtual void QueueTransaction(ITransaction transaction) + { + if (TryFilter(transaction, out var t)) TryWrite(t); + } //retry if APM server returns 429 /// @@ -74,7 +116,8 @@ public ApmChannel(ApmChannelOptions options) : base(options) { } protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false; /// - protected override Task ExportAsync(HttpTransport transport, ArraySegment page, CancellationToken ctx = default) => + protected override Task + ExportAsync(HttpTransport transport, ArraySegment page, CancellationToken ctx = default) => transport.RequestAsync(HttpMethod.POST, "/intake/v2/events", PostData.StreamHandler(page, (_, _) => diff --git a/src/Elastic.Apm/Ingest/ApmChannelOptions.cs b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs index 835b24ca2..f86e52490 100644 --- a/src/Elastic.Apm/Ingest/ApmChannelOptions.cs +++ b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using Elastic.Apm.Api; using Elastic.Ingest.Transport; using Elastic.Transport; @@ -14,5 +15,11 @@ namespace Elastic.Apm.Ingest; public class ApmChannelOptions : TransportChannelOptionsBase { /// - public ApmChannelOptions(HttpTransport transport) : base(transport) { } + private ApmChannelOptions(HttpTransport transport) : base(transport) { } + + public ApmChannelOptions(Uri serverEndpoint, TransportClient transportClient = null) + : this(new DefaultHttpTransport(new TransportConfiguration(new SingleNodePool(serverEndpoint), connection: transportClient!))) + { + + } } diff --git a/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs b/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs index 0816c1e8c..560d01ade 100644 --- a/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs +++ b/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs @@ -26,6 +26,7 @@ #endregion #nullable enable +#if !NET6_0_OR_GREATER namespace System.Diagnostics.CodeAnalysis { /// Specifies that an output will not be null even if the corresponding type allows it. @@ -78,3 +79,4 @@ internal class DoesNotReturnIfAttribute : Attribute public bool ParameterValue { get; } } } +#endif diff --git a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs index 715ecf737..e3cfe610f 100644 --- a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs +++ b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs @@ -10,33 +10,34 @@ using System.Linq; using System.Threading; using Elastic.Apm.Api; +using Elastic.Apm.Ingest; using Elastic.Apm.Libraries.Newtonsoft.Json.Linq; using Elastic.Apm.Logging; using Elastic.Apm.Metrics; using Elastic.Apm.Model; using Elastic.Apm.Report; +using Elastic.Transport; using FluentAssertions; +#nullable enable namespace Elastic.Apm.Tests.Utilities { - internal class MockPayloadSender : IPayloadSender + internal class MockPayloadSender : ApmChannel { private static readonly JObject JsonSpanTypesData = JObject.Parse(File.ReadAllText("./TestResources/json-specs/span_types.json")); - private readonly List _errors = new List(); - private readonly List> _errorFilters = new List>(); - private readonly object _spanLock = new object(); - private readonly object _transactionLock = new object(); - private readonly object _metricsLock = new object(); - private readonly object _errorLock = new object(); - private readonly List _metrics = new List(); - private readonly List> _spanFilters = new List>(); - private readonly List _spans = new List(); - private readonly List> _transactionFilters = new List>(); - private readonly List _transactions = new List(); - - public MockPayloadSender(IApmLogger logger = null) + private readonly object _spanLock = new(); + private readonly object _transactionLock = new(); + private readonly object _metricsLock = new(); + private readonly object _errorLock = new(); + private readonly List _metrics = new(); + private readonly List _errors = new(); + private readonly List _spans = new(); + private readonly List _transactions = new(); + + public MockPayloadSender(IApmLogger? logger = null) + : base(new ApmChannelOptions(new Uri("http://localhost:8080"), transportClient: new InMemoryConnection()), logger) { _waitHandles = new[] { new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false) }; @@ -45,7 +46,6 @@ public MockPayloadSender(IApmLogger logger = null) _errorWaitHandle = _waitHandles[2]; _metricSetWaitHandle = _waitHandles[3]; - PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, MockApmServerInfo.Version710, logger ?? new NoopLogger()); } /// @@ -61,6 +61,54 @@ public MockPayloadSender(IApmLogger logger = null) private readonly AutoResetEvent[] _waitHandles; private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1); + public override bool TryWrite(IIntakeRoot item) + { + var written = base.TryWrite(item); + switch (item) + { + case IError error: + _errors.Add(error); + _errorWaitHandle.Set(); + break; + case ITransaction transaction: + _transactions.Add(transaction); + _transactionWaitHandle.Set(); + break; + case ISpan span: + _spans.Add(span); + _spanWaitHandle.Set(); + break; + case IMetricSet metricSet: + _metrics.Add(metricSet); + _metricSetWaitHandle.Set(); + break; + } + return written; + } + + public override void QueueError(IError error) + { + lock (_errorLock) base.QueueError(error); + } + + public override void QueueTransaction(ITransaction transaction) + { + lock (_transactionLock) base.QueueTransaction(transaction); + } + + public override void QueueSpan(ISpan span) + { + VerifySpan(span); + lock (_spanLock) base.QueueSpan(span); + } + + public override void QueueMetrics(IMetricSet metricSet) + { + lock (_metricsLock) base.QueueMetrics(metricSet); + } + + + /// /// Waits for any events to be queued /// @@ -191,19 +239,19 @@ public IReadOnlyList Errors get { lock (_errorLock) - return CreateImmutableSnapshot(_errors); + return CreateImmutableSnapshot(_errors); } } - public Error FirstError => Errors.FirstOrDefault() as Error; - public MetricSet FirstMetric => Metrics.FirstOrDefault() as MetricSet; + public Error? FirstError => Errors.FirstOrDefault() as Error; + public MetricSet? FirstMetric => Metrics.FirstOrDefault() as MetricSet; /// /// The 1. Span on the 1. Transaction /// - public Span FirstSpan => Spans.FirstOrDefault() as Span; + public Span? FirstSpan => Spans.FirstOrDefault() as Span; - public Transaction FirstTransaction => + public Transaction? FirstTransaction => Transactions.FirstOrDefault() as Transaction; public IReadOnlyList Metrics @@ -211,7 +259,7 @@ public IReadOnlyList Metrics get { lock (_metricsLock) - return CreateImmutableSnapshot(_metrics); + return CreateImmutableSnapshot(_metrics); } } @@ -220,7 +268,7 @@ public IReadOnlyList Spans get { lock (_spanLock) - return CreateImmutableSnapshot(_spans); + return CreateImmutableSnapshot(_spans); } } @@ -229,45 +277,15 @@ public IReadOnlyList Transactions get { lock (_transactionLock) - return CreateImmutableSnapshot(_transactions); + return CreateImmutableSnapshot(_transactions); } } public Span[] SpansOnFirstTransaction => - Spans.Where(n => n.TransactionId == Transactions.First().Id).Select(n => n as Span).ToArray(); - - public void QueueError(IError error) - { - lock (_errorLock) - { - error = _errorFilters.Aggregate(error, - (current, filter) => filter(current)); - _errors.Add(error); - _errorWaitHandle.Set(); - } - } - - public virtual void QueueTransaction(ITransaction transaction) - { - lock (_transactionLock) - { - transaction = _transactionFilters.Aggregate(transaction, - (current, filter) => filter(current)); - _transactions.Add(transaction); - _transactionWaitHandle.Set(); - } - } - - public void QueueSpan(ISpan span) - { - VerifySpan(span); - lock (_spanLock) - { - span = _spanFilters.Aggregate(span, (current, filter) => filter(current)); - _spans.Add(span); - _spanWaitHandle.Set(); - } - } + Spans + .Where(n => n.TransactionId == Transactions.First().Id) + .Select(n => (Span)n) + .ToArray(); private void VerifySpan(ISpan span) { @@ -279,7 +297,7 @@ private void VerifySpan(ISpan span) var spanTypeInfo = JsonSpanTypesData[type] as JObject; spanTypeInfo.Should().NotBeNull($"span type '{type}' is not allowed by the spec"); - var allowNullSubtype = spanTypeInfo["allow_null_subtype"]?.Value(); + var allowNullSubtype = spanTypeInfo!["allow_null_subtype"]?.Value(); var allowUnlistedSubtype = spanTypeInfo["allow_unlisted_subtype"]?.Value(); var subTypes = spanTypeInfo["subtypes"]; var hasSubtypes = subTypes != null && subTypes.Any(); @@ -289,7 +307,7 @@ private void VerifySpan(ISpan span) { if (!allowUnlistedSubtype.GetValueOrDefault() && hasSubtypes) { - var subTypeInfo = subTypes[subType]; + var subTypeInfo = subTypes![subType]; subTypeInfo.Should() .NotBeNull($"span subtype '{subType}' is not allowed by the spec for type '{type}'"); } @@ -305,15 +323,6 @@ private void VerifySpan(ISpan span) } } - public void QueueMetrics(IMetricSet metricSet) - { - lock (_metricsLock) - { - _metrics.Add(metricSet); - _metricSetWaitHandle.Set(); - } - } - public void Clear() { lock (_spanLock) diff --git a/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs b/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs index 1537eb25d..da3c2103c 100644 --- a/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs +++ b/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs @@ -24,10 +24,10 @@ public void MixApisTest1() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample2(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample2"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample2"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("foo"); + payloadSender.FirstSpan!.Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).Name.Should().Be("ElasticApmSpan"); payloadSender.FirstSpan.ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -39,7 +39,7 @@ public void MixApisTest1() private void AssertOnTraceIds(MockPayloadSender payloadSender) { foreach (var span in payloadSender.Spans) - span.TraceId.Should().Be(payloadSender.FirstTransaction.TraceId); + span.TraceId.Should().Be(payloadSender.FirstTransaction!.TraceId); } [Fact] @@ -50,10 +50,10 @@ public void MixApisTest2() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample3(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample3"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample3"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("ElasticApmSpan"); + payloadSender.FirstSpan!.Name.Should().Be("ElasticApmSpan"); payloadSender.Spans.ElementAt(1).Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -72,10 +72,10 @@ public void MixApisTest3() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample4(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample4"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample4"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("ElasticApmSpan"); + payloadSender.FirstSpan!.Name.Should().Be("ElasticApmSpan"); payloadSender.Spans.ElementAt(1).Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -92,7 +92,7 @@ public void TestOtelFieldsWith1Span() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.OneSpanWithAttributes(); - payloadSender.FirstTransaction.Name.Should().Be("foo"); + payloadSender.FirstTransaction!.Name.Should().Be("foo"); payloadSender.FirstTransaction.Otel.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.SpanKind.Should().Be("Server"); payloadSender.FirstTransaction.Otel.Attributes.Should().NotBeNull(); @@ -107,13 +107,13 @@ public void TestOtelFieldsWith3Spans() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.TwoSpansWithAttributes(); - payloadSender.FirstTransaction.Name.Should().Be("foo"); + payloadSender.FirstTransaction!.Name.Should().Be("foo"); payloadSender.FirstTransaction.Otel.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.SpanKind.Should().Be("Server"); payloadSender.FirstTransaction.Otel.Attributes.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.Attributes.Should().Contain("foo1", "bar1"); - payloadSender.FirstSpan.Name.Should().Be("bar"); + payloadSender.FirstSpan!.Name.Should().Be("bar"); payloadSender.FirstSpan.Otel.Should().NotBeNull(); payloadSender.FirstSpan.Otel.SpanKind.Should().Be("Internal"); payloadSender.FirstSpan.Otel.Attributes.Should().NotBeNull(); @@ -128,7 +128,7 @@ public void SpanKindTests() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.SpanKindSample(); - payloadSender.FirstSpan.Type.Should().Be(ApiConstants.TypeExternal); + payloadSender.FirstSpan!.Type.Should().Be(ApiConstants.TypeExternal); payloadSender.FirstSpan.Subtype.Should().Be(ApiConstants.SubtypeHttp); payloadSender.Spans.ElementAt(1).Type.Should().Be(ApiConstants.TypeDb); @@ -178,7 +178,7 @@ public void DistributedTracingTest() payloadSender.WaitForTransactions(count: 2); - payloadSender.FirstTransaction.TraceId.Should() + payloadSender.FirstTransaction!.TraceId.Should() .Be(payloadSender.Transactions[1].TraceId, because: "The transactions should be under the same trace."); } @@ -196,7 +196,7 @@ public void ActivityAndTransactionTraceIdSynced() using (var activity = src.StartActivity("foo", ActivityKind.Server)) traceId = activity?.TraceId.ToString(); traceId.Should().NotBeNull(); - payloadSender.FirstTransaction.TraceId.Should().Be(traceId); + payloadSender.FirstTransaction!.TraceId.Should().Be(traceId); } [Fact] From 83e8d95b2c6963af361c0b78907d4e328438a430 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Mon, 11 Sep 2023 10:55:41 +0200 Subject: [PATCH 3/3] dotnet format --- src/Elastic.Apm/Ingest/ApmChannel.cs | 16 +++++--- .../MockPayloadSender.cs | 38 ++++++++++--------- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/Elastic.Apm/Ingest/ApmChannel.cs b/src/Elastic.Apm/Ingest/ApmChannel.cs index f8d080c98..d78780210 100644 --- a/src/Elastic.Apm/Ingest/ApmChannel.cs +++ b/src/Elastic.Apm/Ingest/ApmChannel.cs @@ -34,7 +34,9 @@ internal static class ApmChannelStatics public static readonly JsonSerializerOptions SerializerOptions = new() { - DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, MaxDepth = 64, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + MaxDepth = 64, + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, }; } @@ -82,17 +84,20 @@ public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransac public virtual void QueueError(IError error) { - if (TryFilter(error, out var e)) TryWrite(e); + if (TryFilter(error, out var e)) + TryWrite(e); } public virtual void QueueSpan(ISpan span) { - if (TryFilter(span, out var s)) TryWrite(s); + if (TryFilter(span, out var s)) + TryWrite(s); } public virtual void QueueTransaction(ITransaction transaction) { - if (TryFilter(transaction, out var t)) TryWrite(t); + if (TryFilter(transaction, out var t)) + TryWrite(t); } //retry if APM server returns 429 @@ -159,7 +164,8 @@ private async Task WriteBufferToStreamAsync(IReadOnlyCollection b, await WriteStanzaToStreamAsync(stream, ctx).ConfigureAwait(false); foreach (var @event in b) { - if (@event == null) continue; + if (@event == null) + continue; var type = @event switch { diff --git a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs index e3cfe610f..8a5dfd87f 100644 --- a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs +++ b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs @@ -19,7 +19,7 @@ using Elastic.Transport; using FluentAssertions; -#nullable enable +#nullable enable namespace Elastic.Apm.Tests.Utilities { internal class MockPayloadSender : ApmChannel @@ -67,44 +67,48 @@ public override bool TryWrite(IIntakeRoot item) switch (item) { case IError error: - _errors.Add(error); - _errorWaitHandle.Set(); - break; + _errors.Add(error); + _errorWaitHandle.Set(); + break; case ITransaction transaction: - _transactions.Add(transaction); - _transactionWaitHandle.Set(); - break; + _transactions.Add(transaction); + _transactionWaitHandle.Set(); + break; case ISpan span: - _spans.Add(span); - _spanWaitHandle.Set(); - break; + _spans.Add(span); + _spanWaitHandle.Set(); + break; case IMetricSet metricSet: - _metrics.Add(metricSet); - _metricSetWaitHandle.Set(); - break; + _metrics.Add(metricSet); + _metricSetWaitHandle.Set(); + break; } return written; } public override void QueueError(IError error) { - lock (_errorLock) base.QueueError(error); + lock (_errorLock) + base.QueueError(error); } public override void QueueTransaction(ITransaction transaction) { - lock (_transactionLock) base.QueueTransaction(transaction); + lock (_transactionLock) + base.QueueTransaction(transaction); } public override void QueueSpan(ISpan span) { VerifySpan(span); - lock (_spanLock) base.QueueSpan(span); + lock (_spanLock) + base.QueueSpan(span); } public override void QueueMetrics(IMetricSet metricSet) { - lock (_metricsLock) base.QueueMetrics(metricSet); + lock (_metricsLock) + base.QueueMetrics(metricSet); }