Skip to content

Commit

Permalink
feat: support comparable runnable (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
YongwuHe authored Jun 28, 2024
1 parent a61784b commit 14687f7
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

import io.arex.agent.bootstrap.TraceContextManager;

import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicBoolean;


public class RunnableWrapper implements Runnable {
private static final ConcurrentHashMap<String, Optional<Field>> COMPARABLE_RUNNABLE_FIELD_MAP = new ConcurrentHashMap<>();
private static final AtomicBoolean EXECEPTION_LOGGED = new AtomicBoolean(false);
private final Runnable runnable;
private final TraceTransmitter traceTransmitter;

Expand Down Expand Up @@ -48,6 +54,50 @@ public static Runnable get(Runnable runnable) {
if (runnable instanceof RunnableWrapper || runnable instanceof ForkJoinTask) {
return runnable;
}

if (runnable instanceof Comparable) {
wrapRunnableField(runnable);
return runnable;
}
return new RunnableWrapper(runnable);
}

/**
* issue: https://github.com/arextest/arex-agent-java/issues/516
* executor with PriorityQueue
* ex: class PriorityRunnable extends Runnable implements Comparable<PriorityRunnable> {
* private final Runnable run;
* private final int priority;
* }
* can't wrap the PriorityRunnable because queue need runnable implement Comparable,
* so we need to wrap the original runnable field.
*/
private static void wrapRunnableField(Runnable runnable) {
try {
Class<? extends Runnable> runnableClass = runnable.getClass();
Optional<Field> originalRunnableFieldOpt = COMPARABLE_RUNNABLE_FIELD_MAP.computeIfAbsent(runnableClass.getName(), k -> {
for (Field declaredField : runnableClass.getDeclaredFields()) {
Class<?> declaredFieldType = declaredField.getType();
if (declaredFieldType.isAssignableFrom(Runnable.class) && !declaredFieldType.isAssignableFrom(Comparable.class)) {
declaredField.setAccessible(true);
return Optional.of(declaredField);
}
}
return Optional.empty();
});
if (originalRunnableFieldOpt.isPresent()) {
Field originalRunnableField = originalRunnableFieldOpt.get();
Runnable originalRunnable = (Runnable) originalRunnableField.get(runnable);
if (originalRunnable instanceof RunnableWrapper) {
return;
}
RunnableWrapper runnableWrapper = new RunnableWrapper(originalRunnable);
originalRunnableField.set(runnable, runnableWrapper);
}
} catch (Exception e) {
if (EXECEPTION_LOGGED.compareAndSet(false, true)) {
System.err.printf("wrap original runnable %s failed.%n", runnable.getClass().getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.arex.agent.bootstrap.TraceContextManager;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RunnableFuture;

Expand All @@ -11,7 +14,7 @@
class RunnableWrapperTest {

@Test
void get() {
void get() throws Exception {
assertNull(RunnableWrapper.get(null));
TraceContextManager.set("mock");
Runnable objectRunnable = RunnableWrapper.get(new RunnableTest<>());
Expand All @@ -20,7 +23,29 @@ void get() {
assertDoesNotThrow(emptyRunnable::run);
assertNotNull(emptyRunnable.toString());
assertTrue(emptyRunnable.hashCode() > 0);
assertFalse(emptyRunnable.equals(objectRunnable));
assertNotEquals(emptyRunnable, objectRunnable);

NoRunnableFieldPriorityRunnable noRunnableFieldComparableRunnable = new NoRunnableFieldPriorityRunnable(1);
RunnableWrapper.get(noRunnableFieldComparableRunnable);
Field comparableRunnableFieldMap = RunnableWrapper.class.getDeclaredField("COMPARABLE_RUNNABLE_FIELD_MAP");
comparableRunnableFieldMap.setAccessible(true);
Map<String, Optional<Field>> map = (Map<String, Optional<Field>>) comparableRunnableFieldMap.get(null);
assertNotNull(map);
assertFalse(map.get(noRunnableFieldComparableRunnable.getClass().getName()).isPresent());
Runnable originalRunnable = () -> {};
PriorityRunnable priorityRunnable = new PriorityRunnable(originalRunnable, 1);
Runnable runnable2 = RunnableWrapper.get(priorityRunnable);
assertSame(priorityRunnable, runnable2);
assertEquals("run", map.get(priorityRunnable.getClass().getName()).get().getName());
assertInstanceOf(RunnableWrapper.class, priorityRunnable.getRun());
// runnable field already wrapped
RunnableWrapper.get(priorityRunnable);
Runnable run = priorityRunnable.getRun();
assertInstanceOf(RunnableWrapper.class, run);
Field runnable = RunnableWrapper.class.getDeclaredField("runnable");
runnable.setAccessible(true);
assertFalse(runnable.get(run) instanceof RunnableWrapper);

TraceContextManager.remove();
}

Expand All @@ -30,4 +55,54 @@ public final void setRawResult(T v) {}
public final boolean exec() { return true; }
public final void run() {}
}

public static class NoRunnableFieldPriorityRunnable implements Runnable, Comparable<NoRunnableFieldPriorityRunnable> {
private final int priority;

public NoRunnableFieldPriorityRunnable(int priority) {
this.priority = priority;
}

@Override
public int compareTo(NoRunnableFieldPriorityRunnable other) {
int res = 0;
if (this.priority != other.priority) {
res = this.priority > other.priority ? -1 : 1;
}
return res;
}

@Override
public void run() {
System.out.println("PriorityRunnable.run");
}
}

static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private final Runnable run;
private final int priority;

public PriorityRunnable(Runnable run, int priority) {
this.run = run;
this.priority = priority;
}

@Override
public int compareTo(PriorityRunnable other) {
int res = 0;
if (this.priority != other.priority) {
res = this.priority > other.priority ? -1 : 1;
}
return res;
}

@Override
public void run() {
this.run.run();
}

public Runnable getRun() {
return run;
}
}
}

0 comments on commit 14687f7

Please sign in to comment.