Skip to content

Commit

Permalink
feat: Change consumer operation name
Browse files Browse the repository at this point in the history
From receive to process as the middleware will be included at
the beginning
  • Loading branch information
simaoribeiro committed Sep 14, 2023
1 parent ee41c3c commit 95f25e7
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
internal class TracerConsumerMiddleware : IMessageMiddleware
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private static readonly string ReceiveString = "receive";
private static readonly string ProcessString = "process";

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ReceiveString}" : ReceiveString;
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString;

// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties);
Expand Down Expand Up @@ -59,7 +59,7 @@ private IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessageConte

private void SetConsumerTags(IMessageContext context, Activity activity)
{
activity.SetTag("messaging.operation", ReceiveString);
activity.SetTag("messaging.operation", ProcessString);
activity.SetTag("messaging.source.name", context.ConsumerContext.Topic);
activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId);
activity.SetTag("messaging.kafka.message.key", context.Message.Key);
Expand Down

0 comments on commit 95f25e7

Please sign in to comment.