diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 28f85b382..d66c72f13 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -11,13 +11,13 @@ internal static class OpenTelemetryConsumerEventsHandler { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - private static readonly string ProcessString = "process"; + private const string ProcessString = "process"; public static Task OnConsumeStarted(IMessageContext context) { try { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString; // Extract the PropagationContext of the upstream parent from the message headers. var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties); @@ -33,7 +33,7 @@ public static Task OnConsumeStarted(IMessageContext context) activity.AddBaggage(item.Key, item.Value); } - context.Items.Add(KafkaFlowActivitySourceHelper.ActivityString, activity); + context?.Items.Add(KafkaFlowActivitySourceHelper.ActivityString, activity); KafkaFlowActivitySourceHelper.SetGenericTags(activity); @@ -44,6 +44,7 @@ public static Task OnConsumeStarted(IMessageContext context) } catch { + // If there is any failure, do not propagate the context. } return Task.CompletedTask;