Skip to content

Commit

Permalink
Makes ScopePassingSpanSubscriber aware of pending spans
Browse files Browse the repository at this point in the history
without this change e.g. Reactor Netty spans are not treated as current in the reactor flow

with this change once Reactor Netty instrumentation creates a pending span it will be treated as a parent one in the reactor flow

fixes gh-2282
  • Loading branch information
marcingrzejszczak committed Apr 24, 2023
1 parent 3d19830 commit 45bdd9a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -64,6 +65,8 @@ public abstract class ReactorSleuth {

private static final Log log = LogFactory.getLog(ReactorSleuth.class);

private static final String PENDING_SPAN_KEY = "sleuth.pending-span";

private ReactorSleuth() {
}

Expand Down Expand Up @@ -607,6 +610,47 @@ public static Context wrapContext(Context context) {
return contextWrappingFunction.apply(context);
}

/**
* Retreives the {@link TraceContext} from the current context.
* @param context Reactor context
* @return {@link TraceContext} or {@code null} if none present
*/
@SuppressWarnings("unchecked")
public static TraceContext getParentTraceContext(Context context, TraceContext fallback) {
AtomicReference<Span> pendingSpanRef = getPendingSpan(context);
if (pendingSpanRef == null || pendingSpanRef.get() == null) {
return fallback;
}
return pendingSpanRef.get().context();
}

/**
* Retreives the pending span from the current context.
* @param context Reactor context
* @return {@code AtomicReference} to span or {@code null} if none present
* @see ReactorSleuth#putPendingSpan(Context, AtomicReference)
*/
@SuppressWarnings("unchecked")
public static AtomicReference<Span> getPendingSpan(ContextView context) {
Object objectSpan = context.getOrDefault(ReactorSleuth.PENDING_SPAN_KEY, null);
if ((objectSpan instanceof AtomicReference)) {
return ((AtomicReference<Span>) objectSpan);
}
return null;
}

/**
* Mutates the {@link Context} to include a mutable reference to a span. Can be used
* when you need to mutate the parent operator context with a span created by a child
* operator.
* @param context Reactor context
* @param span atomic reference of a span
* @return mutated context
*/
public static Context putPendingSpan(Context context, AtomicReference<Span> span) {
return context.put(PENDING_SPAN_KEY, span);
}

/**
* Retrieves span from Reactor context.
* @param tracer tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ final class ScopePassingSpanSubscriber<T> implements SpanSubscription<T>, Scanna
@Nullable TraceContext parent) {
this.subscriber = subscriber;
this.currentTraceContext = currentTraceContext;
this.parent = parent;
Context context = parent != null && !parent.equals(ctx.getOrDefault(TraceContext.class, null))
? ctx.put(TraceContext.class, parent) : ctx;
this.parent = ReactorSleuth.getParentTraceContext(ctx, parent);
Context context = this.parent != null && !this.parent.equals(ctx.getOrDefault(TraceContext.class, null))
? ctx.put(TraceContext.class, this.parent) : ctx;
this.context = ReactorSleuth.wrapContext(context);
if (log.isTraceEnabled()) {
log.trace("Parent span [" + parent + "], context [" + this.context + "]");
log.trace("Parent span [" + this.parent + "], context [" + this.context + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
// Read in this processor and also in ScopePassingSpanSubscriber
context = ReactorSleuth.wrapContext(context.put(TraceContext.class, invocationContext));
}
return context.put(PendingSpan.class, pendingSpan);
return ReactorSleuth.putPendingSpan(context, pendingSpan);
}).doOnCancel(() -> {
// Check to see if Subscription.cancel() happened before another signal,
// like onComplete() completed the span (clearing the reference).
Expand Down Expand Up @@ -153,7 +153,7 @@ HttpClientHandler handler() {

@Override
public void accept(HttpClientRequest req, Connection connection) {
PendingSpan pendingSpan = req.currentContextView().getOrDefault(PendingSpan.class, null);
AtomicReference<Span> pendingSpan = ReactorSleuth.getPendingSpan(req.currentContextView());
if (pendingSpan == null) {
return; // Somehow TracingMapConnect was not invoked.. skip out
}
Expand Down Expand Up @@ -240,7 +240,7 @@ HttpClientHandler handler() {
}

void handle(Context context, @Nullable HttpClientResponse resp, @Nullable Throwable error) {
PendingSpan pendingSpan = context.getOrDefault(PendingSpan.class, null);
AtomicReference<Span> pendingSpan = ReactorSleuth.getPendingSpan(context);
if (pendingSpan == null) {
return; // Somehow TracingMapConnect was not invoked.. skip out
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package org.springframework.cloud.sleuth.instrument.web.client;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import io.netty.bootstrap.Bootstrap;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -32,7 +35,6 @@
import reactor.netty.Connection;

import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.instrument.web.client.HttpClientBeanPostProcessor.PendingSpan;
import org.springframework.cloud.sleuth.instrument.web.client.HttpClientBeanPostProcessor.TracingMapConnect;

@ExtendWith(MockitoExtension.class)
Expand All @@ -58,35 +60,51 @@ public void setup() {
@Test
void mapConnect_should_setup_reactor_context_currentTraceContext() {
TracingMapConnect tracingMapConnect = new TracingMapConnect(() -> traceContext);
AtomicBoolean assertionPassed = new AtomicBoolean();

Mono<Connection> original = Mono.just(connection)
.handle(new BiConsumer<Connection, SynchronousSink<Connection>>() {
@Override
public void accept(Connection t, SynchronousSink<Connection> ctx) {
Assertions.assertThat(ctx.currentContext().get(TraceContext.class)).isSameAs(traceContext);
Assertions.assertThat(ctx.currentContext().get(PendingSpan.class)).isNotNull();
try {
Assertions.assertThat(ctx.currentContext().get(TraceContext.class)).isSameAs(traceContext);
Assertions.assertThat((Object) ctx.currentContext().get("sleuth.pending-span")).isNotNull();
assertionPassed.set(true);
}
catch (AssertionError ae) {
}
}
});

// Wrap and run the assertions
tracingMapConnect.apply(original).log().subscribe();

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilTrue(assertionPassed);
}

@Test
void mapConnect_should_setup_reactor_context_no_currentTraceContext() {
TracingMapConnect tracingMapConnect = new TracingMapConnect(() -> null);
AtomicBoolean assertionPassed = new AtomicBoolean();

Mono<Connection> original = Mono.just(connection)
.handle(new BiConsumer<Connection, SynchronousSink<Connection>>() {
@Override
public void accept(Connection t, SynchronousSink<Connection> ctx) {
Assertions.assertThat(ctx.currentContext().getOrEmpty(TraceContext.class)).isEmpty();
Assertions.assertThat(ctx.currentContext().get(PendingSpan.class)).isNotNull();
try {
Assertions.assertThat(ctx.currentContext().getOrEmpty(TraceContext.class)).isEmpty();
Assertions.assertThat((Object) ctx.currentContext().get("sleuth.pending-span")).isNotNull();
assertionPassed.set(true);
}
catch (AssertionError ae) {
}
}
});

// Wrap and run the assertions
tracingMapConnect.apply(original).log().subscribe();

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilTrue(assertionPassed);
}

}

0 comments on commit 45bdd9a

Please sign in to comment.