From 0f5fd68831f7795d823addcb683647b357a70f8b Mon Sep 17 00:00:00 2001 From: j-xinminjian-jk Date: Fri, 10 May 2024 18:06:22 +0800 Subject: [PATCH] add TracedCompletableFuture and TracedForkJoinPool --- .../concurrent/TracedCompletableFuture.java | 76 ++++++++++++++++ .../concurrent/TracedForkJoinPool.java | 86 +++++++++++++++++++ .../TracedCompletableFutureTest.java | 38 ++++++++ .../concurrent/TracedForkJoinPoolTest.java | 67 +++++++++++++++ 4 files changed, 267 insertions(+) create mode 100644 src/main/java/io/opentracing/contrib/concurrent/TracedCompletableFuture.java create mode 100644 src/main/java/io/opentracing/contrib/concurrent/TracedForkJoinPool.java create mode 100644 src/test/java/io/opentracing/contrib/concurrent/TracedCompletableFutureTest.java create mode 100644 src/test/java/io/opentracing/contrib/concurrent/TracedForkJoinPoolTest.java diff --git a/src/main/java/io/opentracing/contrib/concurrent/TracedCompletableFuture.java b/src/main/java/io/opentracing/contrib/concurrent/TracedCompletableFuture.java new file mode 100644 index 0000000..1815aad --- /dev/null +++ b/src/main/java/io/opentracing/contrib/concurrent/TracedCompletableFuture.java @@ -0,0 +1,76 @@ +package io.opentracing.contrib.concurrent; + +import io.opentracing.Tracer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * @author chanfan + * @date 2024-05-09 17:40 + */ +public class TracedCompletableFuture extends CompletableFuture { + private final boolean traceWithActiveSpanOnly; + protected static Tracer tracer; + + + public TracedCompletableFuture(Tracer tracer) { + super(); + this.traceWithActiveSpanOnly = true; + this.tracer = tracer; + } + + public static TracedCompletableFuture buildTrace(Tracer tracer){ + return new TracedCompletableFuture(tracer); + } + + public static CompletableFuture supplyAsync(Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, asyncPool); + } + + public static CompletableFuture supplyAsync(Supplier supplier, + Executor executor) { + TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer); + return CompletableFuture.supplyAsync(supplier, tracedExecutor); + } + + public static CompletableFuture thenApplyAsync(CompletableFuture future, Function function) { + + return future.thenApplyAsync(function, asyncPool); + } + public static CompletableFuture thenApplyAsync(CompletableFuture future, Function function, Executor executor) { + + TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer); + return future.thenApplyAsync(function, tracedExecutor); + } + + + public static CompletableFuture runAsync(Runnable runnable) { + + return CompletableFuture.runAsync(runnable, asyncPool); + } + + public static CompletableFuture runAsync(Runnable runnable, + Executor executor) { + + TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer); + return CompletableFuture.runAsync(runnable, tracedExecutor); + } + + + private static final boolean useCommonPool = + (TracedForkJoinPool.getCommonPoolParallelism() > 1); + + private static final Executor asyncPool = useCommonPool ? + TracedForkJoinPool.commonPool(tracer) : new ThreadPerTaskExecutor(); + + final static class ThreadPerTaskExecutor implements Executor { + public void execute(Runnable r) { + Runnable runnableTask = new TracedRunnable(r, tracer); + new Thread(runnableTask).start(); + } + } + +} \ No newline at end of file diff --git a/src/main/java/io/opentracing/contrib/concurrent/TracedForkJoinPool.java b/src/main/java/io/opentracing/contrib/concurrent/TracedForkJoinPool.java new file mode 100644 index 0000000..0f6cc62 --- /dev/null +++ b/src/main/java/io/opentracing/contrib/concurrent/TracedForkJoinPool.java @@ -0,0 +1,86 @@ +package io.opentracing.contrib.concurrent; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +/** + * @author chanfan + * @date 2024-05-09 18:43 + */ +public class TracedForkJoinPool extends ForkJoinPool { + private final Tracer tracer; + private final boolean traceWithActiveSpanOnly; + public TracedForkJoinPool(Tracer tracer) { + super(); + this.traceWithActiveSpanOnly = true; + this.tracer = tracer; + } + + public TracedForkJoinPool(Tracer tracer, boolean traceWithActiveSpanOnly) { + super(); + this.traceWithActiveSpanOnly = traceWithActiveSpanOnly; + this.tracer = tracer; + } + + public TracedForkJoinPool(int parallelism, Tracer tracer, boolean traceWithActiveSpanOnly) { + super(parallelism); + this.traceWithActiveSpanOnly = traceWithActiveSpanOnly; + this.tracer = tracer; + } + + + @Override + public void execute(ForkJoinTask task) { + Span span = createSpan("execute"); + Span toActivate = span != null ? span : tracer.activeSpan(); + try (Scope scope = tracer.activateSpan(toActivate)) { + super.execute(task); + } finally { + if (span != null) { + span.finish(); + } + } + } + + @Override + public ForkJoinTask submit(ForkJoinTask task) { + Span span = createSpan("submit"); + Span toActivate = span != null ? span : tracer.activeSpan(); + try (Scope scope = tracer.activateSpan(toActivate)) { + return super.submit(task); + } finally { + if (span != null) { + span.finish(); + } + } + } + + public static TracedForkJoinPool commonPool(Tracer tracer) { + return new TracedForkJoinPool(Runtime.getRuntime().availableProcessors(), tracer, true); + } + + @Override + public T invoke(ForkJoinTask task) { + Span span = createSpan("invoke"); + Span toActivate = span != null ? span : tracer.activeSpan(); + try (Scope scope = tracer.activateSpan(toActivate)) { + return super.invoke(task); + } finally { + if (span != null) { + span.finish(); + } + } + } + + private Span createSpan(String operationName) { + if (tracer.activeSpan() == null && !traceWithActiveSpanOnly) { + return tracer.buildSpan(operationName).start(); + } + return null; + } + +} diff --git a/src/test/java/io/opentracing/contrib/concurrent/TracedCompletableFutureTest.java b/src/test/java/io/opentracing/contrib/concurrent/TracedCompletableFutureTest.java new file mode 100644 index 0000000..343401e --- /dev/null +++ b/src/test/java/io/opentracing/contrib/concurrent/TracedCompletableFutureTest.java @@ -0,0 +1,38 @@ +package io.opentracing.contrib.concurrent; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import org.junit.Test; + +import java.util.concurrent.*; + +import static org.junit.Assert.assertEquals; + +/** + * @author Pavol Loffay + */ +public class TracedCompletableFutureTest extends AbstractConcurrentTest { + + protected TracedCompletableFuture toTraced() { + return TracedCompletableFuture.buildTrace(mockTracer); + } + protected FutureTask toFutureTask(Callable callable) { + return new FutureTask(callable); + } + + @Test + public void testRunAsync() { + TracedCompletableFuture tracedCompletableFuture = toTraced(); + + MockSpan parentSpan = mockTracer.buildSpan("foo").start(); + Scope scope = mockTracer.scopeManager().activate(parentSpan); + + tracedCompletableFuture.runAsync(new TestRunnable()); + + scope.close(); + + assertParentSpan(parentSpan); + assertEquals(0, mockTracer.finishedSpans().size()); + } + +} diff --git a/src/test/java/io/opentracing/contrib/concurrent/TracedForkJoinPoolTest.java b/src/test/java/io/opentracing/contrib/concurrent/TracedForkJoinPoolTest.java new file mode 100644 index 0000000..41d8896 --- /dev/null +++ b/src/test/java/io/opentracing/contrib/concurrent/TracedForkJoinPoolTest.java @@ -0,0 +1,67 @@ +package io.opentracing.contrib.concurrent; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import org.junit.Test; + +import java.util.concurrent.*; + +import static org.junit.Assert.assertEquals; + +/** + * @author Pavol Loffay + */ +public class TracedForkJoinPoolTest extends AbstractConcurrentTest { + + protected TracedForkJoinPool toTraced() { + return TracedForkJoinPool.commonPool(mockTracer); + } + protected FutureTask toFutureTask(Callable callable) { + return new FutureTask(callable); + } + + @Test + public void testExecute() throws InterruptedException, ExecutionException { + TracedForkJoinPool tracedForkJoinPool = toTraced(); + + RecursiveTask task = new RecursiveTask() { + @Override + protected Integer compute() { + System.out.println("Task running in TracedForkJoinPool"); + return 42; + } + }; + MockSpan parentSpan = mockTracer.buildSpan("foo").start(); + Scope scope = mockTracer.scopeManager().activate(parentSpan); + + tracedForkJoinPool.execute(task); + task.get(); + + scope.close(); + + assertParentSpan(parentSpan); + assertEquals(0, mockTracer.finishedSpans().size()); + } + + @Test + public void testSubmit() throws InterruptedException, ExecutionException { + TracedForkJoinPool tracedForkJoinPool = toTraced(); + RecursiveTask task = new RecursiveTask() { + @Override + protected Integer compute() { + System.out.println("Task running in TracedForkJoinPool"); + return 42; + } + }; + MockSpan parentSpan = mockTracer.buildSpan("foo").start(); + Scope scope = mockTracer.scopeManager().activate(parentSpan); + ForkJoinTask result = tracedForkJoinPool.submit(task); + result.get(); + scope.close(); + + countDownLatch.await(); + assertParentSpan(parentSpan); + assertEquals(0, mockTracer.finishedSpans().size()); + } + +}