Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic.Apm.Ingest, new PayloadSender implementation #2171

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/scripts/Build.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ open Tooling

module Build =

//TODO remove oldDiagnosticSourceVersion
let private oldDiagnosticSourceVersion = SemVer.parse "4.6.0"
let private diagnosticSourceVersion6 = SemVer.parse "6.0.0"

Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Apm/Api/IError.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Elastic.Apm.Api
/// Represents an error which was captured by the agent.
/// </summary>
[Specification("error.json")]
public interface IError
public interface IError : IIntakeRoot
{
/// <summary>
/// The culprit that caused this error.
Expand Down
8 changes: 8 additions & 0 deletions src/Elastic.Apm/Api/IIntakeRoot.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

namespace Elastic.Apm.Api;

public interface IIntakeRoot { }
2 changes: 1 addition & 1 deletion src/Elastic.Apm/Api/IMetricSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Elastic.Apm.Api
/// Data captured by the agent representing a metric occurring in a monitored service
/// </summary>
[Specification("metricset.json")]
public interface IMetricSet
public interface IMetricSet : IIntakeRoot
{
/// <summary>
/// List of captured metrics as key - value pairs
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Apm/Api/ISpan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Elastic.Apm.Api
/// An event captured by an agent occurring in a monitored service
/// </summary>
[Specification("span.json")]
public interface ISpan : IExecutionSegment
public interface ISpan : IExecutionSegment, IIntakeRoot
{
/// <summary>
/// The action of the span.
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Apm/Api/ITransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace Elastic.Apm.Api
/// provide different transaction implementations.
/// </remarks>
[Specification("transaction.json")]
public interface ITransaction : IExecutionSegment
public interface ITransaction : IExecutionSegment, IIntakeRoot
{
/// <summary>
/// Contains data related to FaaS (Function as a Service) events.
Expand Down
4 changes: 3 additions & 1 deletion src/Elastic.Apm/Elastic.Apm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
</ItemGroup>

<ItemGroup Condition="'$(DiagnosticSourceVersion)' == ''">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.0" />
<!--- TODO BUMP Elastic.Transport down to 6.0.0 -->
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
</ItemGroup>
<!-- DiagnosticSourceVersion MsBuild property can be used to compile the agent against a specific version of
System.Diagnostics.DiagnosticSource. Used when creating the ElasticApmAgentStartupHook zip file to
Expand All @@ -84,6 +85,7 @@
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Elastic.Ingest.Transport" Version="0.5.5" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
<!-- Used by Ben.Demystifier -->
<PackageReference Include="System.Reflection.Metadata" Version="5.0.0" />
Expand Down
186 changes: 186 additions & 0 deletions src/Elastic.Apm/Ingest/ApmChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm.Api;
using Elastic.Apm.Logging;
using Elastic.Apm.Report;
using Elastic.Channels;
using Elastic.Ingest.Transport;
using Elastic.Transport;

namespace Elastic.Apm.Ingest;

#nullable enable
internal static class ApmChannelStatics
{
public static readonly byte[] LineFeed = { (byte)'\n' };

public static readonly DefaultRequestParameters RequestParams = new()
{
RequestConfiguration = new RequestConfiguration { ContentType = "application/x-ndjson" }
};

public static readonly JsonSerializerOptions SerializerOptions = new()
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
MaxDepth = 64,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
};
}

/// <summary>
/// An <see cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/> implementation that sends V2 intake API data
/// to APM server.
/// </summary>
public class ApmChannel
: TransportChannelBase<ApmChannelOptions, IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
, IPayloadSender
{
private readonly List<Func<ITransaction, ITransaction?>> _transactionFilters = new();
private readonly List<Func<ISpan, ISpan?>> _spanFilters = new();
private readonly List<Func<IError, IError?>> _errorFilters = new();

/// <inheritdoc cref="ApmChannel"/>
public ApmChannel(ApmChannelOptions options, IApmLogger? logger = null) : base(options) =>
PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, null, logger ?? new TraceLogger(LogLevel.Trace));

public IError? Filter(IError error) => _errorFilters.Aggregate(error, (current, filter) => filter(current)!);

public ISpan? Filter(ISpan span) => _spanFilters.Aggregate(span, (current, filter) => filter(current)!);

public ITransaction? Filter(ITransaction span) => _transactionFilters.Aggregate(span, (current, filter) => filter(current)!);

public bool TryFilter(IError error, [NotNullWhen(true)] out IError? filtered)
{
filtered = _errorFilters.Select(f => f(error)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

public bool TryFilter(ISpan span, [NotNullWhen(true)] out ISpan? filtered)
{
filtered = _spanFilters.Select(f => f(span)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransaction? filtered)
{
filtered = _transactionFilters.Select(f => f(transaction)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

public virtual void QueueMetrics(IMetricSet metrics) => TryWrite(metrics);

public virtual void QueueError(IError error)
{
if (TryFilter(error, out var e))
TryWrite(e);
}

public virtual void QueueSpan(ISpan span)
{
if (TryFilter(span, out var s))
TryWrite(s);
}

public virtual void QueueTransaction(ITransaction transaction)
{
if (TryFilter(transaction, out var t))
TryWrite(t);
}

//retry if APM server returns 429
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
protected override bool Retry(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429;

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryAllItems"/>
protected override bool RetryAllItems(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429;

//APM does not return the status for all events sent. Therefor we always return an empty set for individual items to retry
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Zip"/>
protected override List<(IIntakeRoot, IntakeErrorItem)> Zip(EventIntakeResponse response, IReadOnlyCollection<IIntakeRoot> page) =>
_emptyZip;

private readonly List<(IIntakeRoot, IntakeErrorItem)> _emptyZip = new();

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryEvent"/>
protected override bool RetryEvent((IIntakeRoot, IntakeErrorItem) @event) => false;

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RejectEvent"/>
protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false;

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>
protected override Task<EventIntakeResponse>
ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
transport.RequestAsync<EventIntakeResponse>(HttpMethod.POST, "/intake/v2/events",
PostData.StreamHandler(page,
(_, _) =>
{
/* NOT USED */
},
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, ctx).ConfigureAwait(false); })
, ApmChannelStatics.RequestParams, ctx);

private async Task WriteStanzaToStreamAsync(Stream stream, CancellationToken ctx)
{
// {"metadata":{"process":{"pid":1234,"title":"/usr/lib/jvm/java-10-openjdk-amd64/bin/java","ppid":1,"argv":["-v"]},
// "system":{"architecture":"amd64","detected_hostname":"8ec7ceb99074","configured_hostname":"host1","platform":"Linux","container":{"id":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"},
// "kubernetes":{"namespace":"default","pod":{"uid":"b17f231da0ad128dc6c6c0b2e82f6f303d3893e3","name":"instrumented-java-service"},"node":{"name":"node-name"}}},
// "service":{"name":"1234_service-12a3","version":"4.3.0","node":{"configured_name":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"},"environment":"production","language":{"name":"Java","version":"10.0.2"},
// "agent":{"version":"1.10.0","name":"java","ephemeral_id":"e71be9ac-93b0-44b9-a997-5638f6ccfc36"},"framework":{"name":"spring","version":"5.0.0"},"runtime":{"name":"Java","version":"10.0.2"}},"labels":{"group":"experimental","ab_testing":true,"segment":5}}}
// TODO cache
var p = Process.GetCurrentProcess();
var metadata = new
{
metadata = new
{
process = new { pid = p.Id, title = p.ProcessName },
service = new
{
name = System.Text.RegularExpressions.Regex.Replace(p.ProcessName, "[^a-zA-Z0-9 _-]", "_"),
version = "1.0.0",
agent = new { name = "dotnet", version = "0.0.1" }
}
}
};
await JsonSerializer.SerializeAsync(stream, metadata, metadata.GetType(), ApmChannelStatics.SerializerOptions, ctx)
.ConfigureAwait(false);
await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
}

private async Task WriteBufferToStreamAsync(IReadOnlyCollection<IIntakeRoot> b, Stream stream, CancellationToken ctx)
{
await WriteStanzaToStreamAsync(stream, ctx).ConfigureAwait(false);
foreach (var @event in b)
{
if (@event == null)
continue;

var type = @event switch
{
ITransaction => "transaction",
ISpan => "span",
IError => "error",
IMetricSet => "metricset",
_ => "unknown"
};
var dictionary = new Dictionary<string, object>() { { type, @event } };

await JsonSerializer.SerializeAsync(stream, dictionary, dictionary.GetType(), ApmChannelStatics.SerializerOptions, ctx)
.ConfigureAwait(false);

await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}
}
25 changes: 25 additions & 0 deletions src/Elastic.Apm/Ingest/ApmChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using Elastic.Apm.Api;
using Elastic.Ingest.Transport;
using Elastic.Transport;

namespace Elastic.Apm.Ingest;

/// <summary>
/// Channel options for <see cref="ApmChannel"/>
/// </summary>
public class ApmChannelOptions : TransportChannelOptionsBase<IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
{
/// <inheritdoc cref="ApmChannelOptions"/>
private ApmChannelOptions(HttpTransport transport) : base(transport) { }

public ApmChannelOptions(Uri serverEndpoint, TransportClient transportClient = null)
: this(new DefaultHttpTransport(new TransportConfiguration(new SingleNodePool(serverEndpoint), connection: transportClient!)))
{

}
}
34 changes: 34 additions & 0 deletions src/Elastic.Apm/Ingest/IngestResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Collections.Generic;
using System.Text.Json.Serialization;
using Elastic.Transport;

namespace Elastic.Apm.Ingest;

/// <summary> </summary>
public class EventIntakeResponse : TransportResponse
{
/// <summary> </summary>
[JsonPropertyName("accepted")]
public long Accepted { get; set; }

/// <summary> </summary>
[JsonPropertyName("errors")]
//[JsonConverter(typeof(ResponseItemsConverter))]
public IReadOnlyCollection<IntakeErrorItem> Errors { get; set; } = null!;
}

/// <summary> </summary>
public class IntakeErrorItem
{
/// <summary> </summary>
[JsonPropertyName("message")]
public string Message { get; set; } = null!;

/// <summary> </summary>
[JsonPropertyName("document")]
public string Document { get; set; } = null!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#endregion

#nullable enable
#if !NET6_0_OR_GREATER
namespace System.Diagnostics.CodeAnalysis
{
/// <summary>Specifies that an output will not be null even if the corresponding type allows it.</summary>
Expand Down Expand Up @@ -78,3 +79,4 @@ internal class DoesNotReturnIfAttribute : Attribute
public bool ParameterValue { get; }
}
}
#endif
21 changes: 21 additions & 0 deletions src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Elastic.Ingest.Transport" Version="0.5.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Elastic.Apm\Elastic.Apm.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Model\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<!-- TODO make this netstandard 2.0 and ref System.Runtime.Loader -->
<TargetFramework>netcoreapp2.2</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

Expand All @@ -19,5 +18,9 @@
<Link>StartupHookLogger.cs</Link>
</Compile>
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
</ItemGroup>

</Project>
Loading