Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TracedCompletableFuture and TracedForkJoinPool #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Tracer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* @author chanfan
* @date 2024-05-09 17:40
*/
public class TracedCompletableFuture<T> extends CompletableFuture<T> {
private final boolean traceWithActiveSpanOnly;
protected static Tracer tracer;


public TracedCompletableFuture(Tracer tracer) {
super();
this.traceWithActiveSpanOnly = true;
this.tracer = tracer;
}

public static TracedCompletableFuture buildTrace(Tracer tracer){
return new TracedCompletableFuture(tracer);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return CompletableFuture.supplyAsync(supplier, asyncPool);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer);
return CompletableFuture.supplyAsync(supplier, tracedExecutor);
}

public static <T> CompletableFuture<T> thenApplyAsync(CompletableFuture<T> future, Function<T, T> function) {

return future.thenApplyAsync(function, asyncPool);
}
public static <T> CompletableFuture<T> thenApplyAsync(CompletableFuture<T> future, Function<T, T> function, Executor executor) {

TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer);
return future.thenApplyAsync(function, tracedExecutor);
}


public static CompletableFuture<Void> runAsync(Runnable runnable) {

return CompletableFuture.runAsync(runnable, asyncPool);
}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {

TracedExecutor tracedExecutor = new TracedExecutor(executor, tracer);
return CompletableFuture.runAsync(runnable, tracedExecutor);
}


private static final boolean useCommonPool =
(TracedForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor asyncPool = useCommonPool ?
TracedForkJoinPool.commonPool(tracer) : new ThreadPerTaskExecutor();

final static class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
Runnable runnableTask = new TracedRunnable(r, tracer);
new Thread(runnableTask).start();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

/**
* @author chanfan
* @date 2024-05-09 18:43
*/
public class TracedForkJoinPool extends ForkJoinPool {
private final Tracer tracer;
private final boolean traceWithActiveSpanOnly;
public TracedForkJoinPool(Tracer tracer) {
super();
this.traceWithActiveSpanOnly = true;
this.tracer = tracer;
}

public TracedForkJoinPool(Tracer tracer, boolean traceWithActiveSpanOnly) {
super();
this.traceWithActiveSpanOnly = traceWithActiveSpanOnly;
this.tracer = tracer;
}

public TracedForkJoinPool(int parallelism, Tracer tracer, boolean traceWithActiveSpanOnly) {
super(parallelism);
this.traceWithActiveSpanOnly = traceWithActiveSpanOnly;
this.tracer = tracer;
}


@Override
public void execute(ForkJoinTask<?> task) {
Span span = createSpan("execute");
Span toActivate = span != null ? span : tracer.activeSpan();
try (Scope scope = tracer.activateSpan(toActivate)) {
super.execute(task);
} finally {
if (span != null) {
span.finish();
}
}
}

@Override
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
Span span = createSpan("submit");
Span toActivate = span != null ? span : tracer.activeSpan();
try (Scope scope = tracer.activateSpan(toActivate)) {
return super.submit(task);
} finally {
if (span != null) {
span.finish();
}
}
}

public static TracedForkJoinPool commonPool(Tracer tracer) {
return new TracedForkJoinPool(Runtime.getRuntime().availableProcessors(), tracer, true);
}

@Override
public <T> T invoke(ForkJoinTask<T> task) {
Span span = createSpan("invoke");
Span toActivate = span != null ? span : tracer.activeSpan();
try (Scope scope = tracer.activateSpan(toActivate)) {
return super.invoke(task);
} finally {
if (span != null) {
span.finish();
}
}
}

private Span createSpan(String operationName) {
if (tracer.activeSpan() == null && !traceWithActiveSpanOnly) {
return tracer.buildSpan(operationName).start();
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Scope;
import io.opentracing.mock.MockSpan;
import org.junit.Test;

import java.util.concurrent.*;

import static org.junit.Assert.assertEquals;

/**
* @author Pavol Loffay
*/
public class TracedCompletableFutureTest extends AbstractConcurrentTest {

protected TracedCompletableFuture toTraced() {
return TracedCompletableFuture.buildTrace(mockTracer);
}
protected <V> FutureTask<V> toFutureTask(Callable<V> callable) {
return new FutureTask<V>(callable);
}

@Test
public void testRunAsync() {
TracedCompletableFuture tracedCompletableFuture = toTraced();

MockSpan parentSpan = mockTracer.buildSpan("foo").start();
Scope scope = mockTracer.scopeManager().activate(parentSpan);

tracedCompletableFuture.runAsync(new TestRunnable());

scope.close();

assertParentSpan(parentSpan);
assertEquals(0, mockTracer.finishedSpans().size());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.opentracing.contrib.concurrent;

import io.opentracing.Scope;
import io.opentracing.mock.MockSpan;
import org.junit.Test;

import java.util.concurrent.*;

import static org.junit.Assert.assertEquals;

/**
* @author Pavol Loffay
*/
public class TracedForkJoinPoolTest extends AbstractConcurrentTest {

protected TracedForkJoinPool toTraced() {
return TracedForkJoinPool.commonPool(mockTracer);
}
protected <V> FutureTask<V> toFutureTask(Callable<V> callable) {
return new FutureTask<V>(callable);
}

@Test
public void testExecute() throws InterruptedException, ExecutionException {
TracedForkJoinPool tracedForkJoinPool = toTraced();

RecursiveTask<Integer> task = new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
System.out.println("Task running in TracedForkJoinPool");
return 42;
}
};
MockSpan parentSpan = mockTracer.buildSpan("foo").start();
Scope scope = mockTracer.scopeManager().activate(parentSpan);

tracedForkJoinPool.execute(task);
task.get();

scope.close();

assertParentSpan(parentSpan);
assertEquals(0, mockTracer.finishedSpans().size());
}

@Test
public void testSubmit() throws InterruptedException, ExecutionException {
TracedForkJoinPool tracedForkJoinPool = toTraced();
RecursiveTask<Integer> task = new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
System.out.println("Task running in TracedForkJoinPool");
return 42;
}
};
MockSpan parentSpan = mockTracer.buildSpan("foo").start();
Scope scope = mockTracer.scopeManager().activate(parentSpan);
ForkJoinTask<?> result = tracedForkJoinPool.submit(task);
result.get();
scope.close();

countDownLatch.await();
assertParentSpan(parentSpan);
assertEquals(0, mockTracer.finishedSpans().size());
}

}