Skip to content

Commit

Permalink
[Tracer] Add API to extract SpanContext (#2694)
Browse files Browse the repository at this point in the history
* Add API to extract SpanContext

* Rename and add kafka tests

* Renaming and RabbitMQTests

* fix tests

* Apply suggestions from code review

Co-authored-by: Zach Montoya <[email protected]>

Co-authored-by: Zach Montoya <[email protected]>
  • Loading branch information
pierotibou and zacharycmontoya authored Apr 21, 2022
1 parent f74af6a commit a5656c4
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 41 deletions.
29 changes: 29 additions & 0 deletions tracer/src/Datadog.Trace/ISpanContextExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// <copyright file="ISpanContextExtractor.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System;
using System.Collections.Generic;

#nullable enable

namespace Datadog.Trace
{
/// <summary>
/// The ISpanContextExtractor is responsible for extracting SpanContext in the rare cases where the Tracer couldn't propagate it itself.
/// This can happen for instance when libraries add an extra layer above the instrumented ones
/// (eg consuming Kafka messages and enqueuing them prior to generate a span).
/// </summary>
public interface ISpanContextExtractor
{
/// <summary>
/// Given a SpanContext carrier and a function to access the values, this method will extract SpanContext if any
/// </summary>
/// <param name="carrier">The carrier of the SpanContext. Often a header (http, kafka message header...)</param>
/// <param name="getter">Given a key name, returns values from the carrier</param>
/// <typeparam name="TCarrier">Type of the carrier</typeparam>
/// <returns>A potentially null Datadog SpanContext</returns>
public ISpanContext? Extract<TCarrier>(TCarrier carrier, Func<TCarrier, string, IEnumerable<string?>> getter);
}
}
21 changes: 21 additions & 0 deletions tracer/src/Datadog.Trace/SpanContextExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// <copyright file="SpanContextExtractor.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System;
using System.Collections.Generic;
using Datadog.Trace.Propagators;

#nullable enable

namespace Datadog.Trace
{
/// <inheritdoc />
public class SpanContextExtractor : ISpanContextExtractor
{
/// <inheritdoc />
public ISpanContext? Extract<TCarrier>(TCarrier carrier, Func<TCarrier, string, IEnumerable<string?>> getter)
=> SpanContextPropagator.Instance.Extract(carrier, getter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void SubmitsTraces(string packageVersion)
}
#endif

var expectedSpanCount = 26;
var expectedSpanCount = 52;

int basicPublishCount = 0;
int basicGetCount = 0;
Expand Down Expand Up @@ -200,23 +200,24 @@ public void SubmitsTraces(string packageVersion)
{
Assert.Equal("Samples.RabbitMQ", span.Service);
Assert.Equal("1.0.0", span.Tags[Tags.Version]);
Assert.True(rabbitmqSpans.Count(s => s.TraceId == span.TraceId) > 0);
}
}

// Assert that all empty get results are expected
Assert.Equal(2, emptyBasicGetCount);
Assert.Equal(4, emptyBasicGetCount);

// Assert that each span that started a distributed trace (basic.publish)
// has only one child span (basic.deliver or basic.get)
Assert.All(distributedParentSpans, kvp => Assert.Equal(1, kvp.Value));

Assert.Equal(5, basicPublishCount);
Assert.Equal(4, basicGetCount);
Assert.Equal(3, basicDeliverCount);
Assert.Equal(10, basicPublishCount);
Assert.Equal(8, basicGetCount);
Assert.Equal(6, basicDeliverCount);

Assert.Equal(1, exchangeDeclareCount);
Assert.Equal(1, queueBindCount);
Assert.Equal(4, queueDeclareCount);
Assert.Equal(2, exchangeDeclareCount);
Assert.Equal(2, queueBindCount);
Assert.Equal(8, queueDeclareCount);
telemetry.AssertIntegrationEnabled(IntegrationId.RabbitMQ);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ namespace Datadog.Trace
ulong SpanId { get; }
ulong TraceId { get; }
}
public interface ISpanContextExtractor
{
Datadog.Trace.ISpanContext? Extract<TCarrier>(TCarrier carrier, System.Func<TCarrier, string, System.Collections.Generic.IEnumerable<string?>> getter);
}
public interface ITracer
{
Datadog.Trace.IScope ActiveScope { get; }
Expand All @@ -244,6 +248,11 @@ namespace Datadog.Trace
public ulong SpanId { get; }
public ulong TraceId { get; }
}
public class SpanContextExtractor : Datadog.Trace.ISpanContextExtractor
{
public SpanContextExtractor() { }
public Datadog.Trace.ISpanContext? Extract<TCarrier>(TCarrier carrier, System.Func<TCarrier, string, System.Collections.Generic.IEnumerable<string?>> getter) { }
}
public struct SpanCreationSettings
{
public bool? FinishOnClose { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Datadog.Trace;
using Newtonsoft.Json;

namespace Samples.Kafka
Expand Down Expand Up @@ -144,26 +147,36 @@ private void HandleMessage(ConsumeResult<string, string> consumeResult)
var kafkaMessage = consumeResult.Message;
Console.WriteLine($"{_consumerName}: Consuming {kafkaMessage.Key}, {consumeResult.TopicPartitionOffset}");

var headers = kafkaMessage.Headers;
ulong? traceId = headers.TryGetLastBytes("x-datadog-trace-id", out var traceIdBytes)
&& ulong.TryParse(Encoding.UTF8.GetString(traceIdBytes), out var extractedTraceId)
? extractedTraceId
: null;
var messageHeaders = kafkaMessage.Headers;
var contextPropagator = new SpanContextExtractor();
var spanContext = contextPropagator.Extract(messageHeaders, (h, s) => GetValues(messageHeaders, s));

ulong? parentId = headers.TryGetLastBytes("x-datadog-parent-id", out var parentBytes)
&& ulong.TryParse(Encoding.UTF8.GetString(parentBytes), out var extractedParentId)
? extractedParentId
: null;
IEnumerable<string> GetValues(Headers headers, string name)
{
if (headers.TryGetLastBytes(name, out var bytes))
{
try
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
catch (Exception)
{
// ignored
}
}

return Enumerable.Empty<string>();
}

if (traceId is null || parentId is null)
if (spanContext is null || spanContext.TraceId is 0 || spanContext.SpanId is 0)
{
// For kafka brokers < 0.11.0, we can't inject custom headers, so context will not be propagated
var errorMessage = $"Error extracting trace context for {kafkaMessage.Key}, {consumeResult.TopicPartitionOffset}";
Console.WriteLine(errorMessage);
}
else
{
Console.WriteLine($"Successfully extracted trace context from message: {traceId}, {parentId}");
Console.WriteLine($"Successfully extracted trace context from message: {spanContext.TraceId}, {spanContext.SpanId}");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@
<PackageReference Include="Confluent.Kafka" Version="$(ApiVersion)" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Datadog.Trace\Datadog.Trace.csproj" />
</ItemGroup>
</Project>
111 changes: 89 additions & 22 deletions tracer/test/test-applications/integrations/Samples.RabbitMQ/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Datadog.Trace;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Expand All @@ -15,35 +19,46 @@ public static class Program
private static readonly string routingKey = "test-routing-key";
private static readonly string queueName = "test-queue-name";

private static readonly ConcurrentQueue<BasicDeliverEventArgs> _queue = new ();
private static readonly Thread DequeueThread = new Thread(ConsumeFromQueue);
private static string Host()
{
return Environment.GetEnvironmentVariable("RABBITMQ_HOST") ?? "localhost";
}

public static void Main(string[] args)
{
string prefix = "";
if (args.Length > 0)
{
prefix = args[0];
}

RunRabbitMQ(prefix);
RunRabbitMQ();
}

private static void RunRabbitMQ(string prefix)
private static void RunRabbitMQ()
{
PublishAndGet();
PublishAndGetDefault();

var sendThread = new Thread(Send);
sendThread.Start();

var receiveThread = new Thread(Receive);
var receiveThread = new Thread(o => Receive(false));
receiveThread.Start();

sendThread.Join();
receiveThread.Join();

// Doing the test twice to make sure that both our context propagation works but also manual propagation (when users enqueue messages for instance)
PublishAndGet();
PublishAndGetDefault();

sendThread = new Thread(Send);
sendThread.Start();

receiveThread = new Thread(o => Receive(true));
receiveThread.Start();

sendThread.Join();
receiveThread.Join();
DequeueThread.Join();

}

private static void PublishAndGet()
Expand Down Expand Up @@ -171,7 +186,7 @@ private static void Send()
Console.WriteLine("[Send] Exiting Thread.");
}

private static void Receive()
private static void Receive(bool useQueue)
{
// Let's just wait for all sending activity to finish before doing any work
_sendFinished.WaitOne();
Expand All @@ -190,31 +205,83 @@ private static void Receive()
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
using (SampleHelpers.CreateScope("consumer.Received event"))
if (useQueue)
{
#if RABBITMQ_6_0
var body = ea.Body.ToArray();
#else
var body = ea.Body;
#endif
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[Receive] - [x] Received {0}", message);
_messageCount -= 1;
_queue.Enqueue(ea);
}
else
{
TraceOnTheReceivingEnd(ea);
}
};
channel.BasicConsume("hello",
true,
consumer);

if (useQueue)
{
DequeueThread.Start();
}

while (_messageCount != 0)
{
Thread.Sleep(1000);
Thread.Sleep(100);
}

Console.WriteLine("[Receive] Exiting Thread.");
}
}

private static void ConsumeFromQueue()
{
while (_queue.Count > 0 )
{
if (_queue.TryDequeue(out var ea))
{
TraceOnTheReceivingEnd(ea);
}
Thread.Sleep(100);
}
}

private static void TraceOnTheReceivingEnd(BasicDeliverEventArgs ea)
{
#if RABBITMQ_6_0
var body = ea.Body.ToArray();
#else
var body = ea.Body;
#endif
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[Receive] - [x] Received {0}", message);
_messageCount -= 1;

var messageHeaders = ea.BasicProperties?.Headers;
var contextPropagator = new SpanContextExtractor();
var spanContext = contextPropagator.Extract(messageHeaders, (h, s) => GetValues(messageHeaders, s));
var spanCreationSettings = new SpanCreationSettings() { Parent = spanContext };

if (spanContext is null || spanContext.TraceId is 0 || spanContext.SpanId is 0)
{
// For kafka brokers < 0.11.0, we can't inject custom headers, so context will not be propagated
var errorMessage = $"Error extracting trace context for {message}";
Console.WriteLine(errorMessage);
}
else
{
Console.WriteLine($"Successfully extracted trace context from message: {spanContext.TraceId}, {spanContext.SpanId}");
}

IEnumerable<string> GetValues(IDictionary<string, object> headers, string name)
{
if (headers.TryGetValue(name, out object value) && value is byte[] bytes)
{
return new[] { Encoding.UTF8.GetString(bytes) };
}

return Enumerable.Empty<string>();
}

using var scope = Tracer.Instance.StartActive("consumer.Received event", spanCreationSettings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@
<PackageReference Include="System.Net.NameResolution" Version="4.3.0" />
<PackageReference Include="System.Net.Primitives" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Datadog.Trace\Datadog.Trace.csproj" />
</ItemGroup>

</Project>

0 comments on commit a5656c4

Please sign in to comment.