Skip to content

Commit

Permalink
Remove usages of Unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed Jan 16, 2025
1 parent 018b7ae commit 6280dae
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 159 deletions.
147 changes: 60 additions & 87 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Thread.currentThread;
import static java.lang.invoke.MethodHandles.lookup;
import static java.security.AccessController.doPrivileged;
import static java.security.AccessController.getContext;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static java.util.concurrent.locks.LockSupport.unpark;
import static org.jboss.threads.JBossExecutors.unsafe;

import java.lang.invoke.ConstantBootstraps;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.management.ManagementFactory;
import java.security.AccessControlContext;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -72,6 +75,15 @@
public final class EnhancedQueueExecutor extends AbstractExecutorService implements ManageableThreadPoolExecutorService, ScheduledExecutorService {
private static final Thread[] NO_THREADS = new Thread[0];

private static final VarHandle objArrayHandle = MethodHandles.arrayElementVarHandle(TaskNode[].class);
private static final VarHandle longArrayHandle = MethodHandles.arrayElementVarHandle(long[].class);

private static final VarHandle terminationWaitersHandle = ConstantBootstraps.fieldVarHandle(lookup(), "terminationWaiters", VarHandle.class, EnhancedQueueExecutor.class, Waiter.class);

private static final VarHandle peakThreadCountHandle = ConstantBootstraps.fieldVarHandle(lookup(), "peakThreadCount", VarHandle.class, EnhancedQueueExecutor.class, int.class);
private static final VarHandle activeCountHandle = ConstantBootstraps.fieldVarHandle(lookup(), "activeCount", VarHandle.class, EnhancedQueueExecutor.class, int.class);
private static final VarHandle peakQueueSizeHandle = ConstantBootstraps.fieldVarHandle(lookup(), "peakQueueSize", VarHandle.class, EnhancedQueueExecutor.class, int.class);

static {
Version.getVersionString();
MBeanUnregisterAction.forceInit();
Expand Down Expand Up @@ -316,12 +328,6 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
private static final int numUnsharedLongs = 2;
private static final int numUnsharedObjects = 2;

private static final long terminationWaitersOffset;

private static final long peakThreadCountOffset;
private static final long activeCountOffset;
private static final long peakQueueSizeOffset;

// GraalVM should initialize this class at run time, which we instruct it to do in
// src/main/resources/META-INF/native-image/org.jboss.threads/jboss-threads/native-image.properties
// Please make sure to update that file if you remove or rename this class, or if runtime
Expand All @@ -330,11 +336,11 @@ private static final class RuntimeFields {
private static final int unsharedTaskNodesSize;
private static final int unsharedLongsSize;

private static final long headOffset;
private static final long tailOffset;
private static final int headIndex;
private static final int tailIndex;

private static final long threadStatusOffset;
private static final long queueSizeOffset;
private static final int threadStatusIndex;
private static final int queueSizeIndex;

static {
int cacheLine = CacheInfo.getSmallestDataCacheLineSize();
Expand All @@ -344,28 +350,16 @@ private static final class RuntimeFields {
}
// cpu spatial prefetcher can drag 2 cache-lines at once into L2
int pad = cacheLine > 128 ? cacheLine : 128;
int longScale = unsafe.arrayIndexScale(long[].class);
int taskNodeScale = unsafe.arrayIndexScale(TaskNode[].class);
// make some assumptions about array scale
int longScale = 8;
int taskNodeScale = 8;
// these fields are in units of array scale
unsharedTaskNodesSize = pad / taskNodeScale * (numUnsharedObjects + 1);
unsharedLongsSize = pad / longScale * (numUnsharedLongs + 1);
// these fields are in bytes
headOffset = unsafe.arrayBaseOffset(TaskNode[].class) + pad;
tailOffset = unsafe.arrayBaseOffset(TaskNode[].class) + pad * 2;
threadStatusOffset = unsafe.arrayBaseOffset(long[].class) + pad;
queueSizeOffset = unsafe.arrayBaseOffset(long[].class) + pad * 2;
}
}

static {
try {
terminationWaitersOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("terminationWaiters"));

peakThreadCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakThreadCount"));
activeCountOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("activeCount"));
peakQueueSizeOffset = unsafe.objectFieldOffset(EnhancedQueueExecutor.class.getDeclaredField("peakQueueSize"));
} catch (NoSuchFieldException e) {
throw new NoSuchFieldError(e.getMessage());
headIndex = pad / taskNodeScale;
tailIndex = pad / taskNodeScale * 2;
threadStatusIndex = pad / longScale;
queueSizeIndex = pad / longScale * 2;
}
}

Expand All @@ -383,7 +377,6 @@ private static final class RuntimeFields {
private static final long TS_ALLOW_CORE_TIMEOUT = 1L << 60;
private static final long TS_SHUTDOWN_REQUESTED = 1L << 61;
private static final long TS_SHUTDOWN_INTERRUPT = 1L << 62;
@SuppressWarnings("NumericOverflow")
private static final long TS_SHUTDOWN_COMPLETE = 1L << 63;

private static final int EXE_OK = 0;
Expand Down Expand Up @@ -929,8 +922,7 @@ public List<Runnable> shutdownNow() {
head = getHead(unsharedTaskNodes);
continue;
}
if (headNext instanceof TaskNode) {
TaskNode taskNode = (TaskNode) headNext;
if (headNext instanceof TaskNode taskNode) {
if (compareAndSetHead(unsharedTaskNodes, head, taskNode)) {
// save from GC nepotism
head.setNextOrdered(head);
Expand Down Expand Up @@ -1941,8 +1933,7 @@ private int tryExecute(final Task runnable) {
}
// otherwise the consumer gave up or was exited already, so fall out and...
}
} else if (tailNext instanceof TaskNode) {
TaskNode tailNextTaskNode = (TaskNode) tailNext;
} else if (tailNext instanceof TaskNode tailNextTaskNode) {
// Opportunistically update tail to the next node. If this operation has been handled by
// another thread we fall back to the loop and try again instead of duplicating effort.
if (compareAndSetTail(tail, tailNextTaskNode)) {
Expand Down Expand Up @@ -2013,79 +2004,79 @@ void completeTermination() {
// =======================================================

TaskNode getTail() {
return (TaskNode) unsafe.getObjectVolatile(unsharedTaskNodes, RuntimeFields.tailOffset);
return (TaskNode) objArrayHandle.getVolatile(unsharedTaskNodes, RuntimeFields.tailIndex);
}

TaskNode setTailPlain(TaskNode tail) {
unsafe.putObject(unsharedTaskNodes, RuntimeFields.tailOffset, tail);
unsharedTaskNodes[RuntimeFields.tailIndex] = tail;
return tail;
}

boolean compareAndSetTail(final EnhancedQueueExecutor.TaskNode expect, final EnhancedQueueExecutor.TaskNode update) {
return getTail() == expect && unsafe.compareAndSwapObject(unsharedTaskNodes, RuntimeFields.tailOffset, expect, update);
return getTail() == expect && objArrayHandle.compareAndSet(unsharedTaskNodes, RuntimeFields.tailIndex, expect, update);
}

TaskNode getHead(final TaskNode[] unsharedTaskNodes) {
return (TaskNode) unsafe.getObjectVolatile(unsharedTaskNodes, RuntimeFields.headOffset);
static TaskNode getHead(final TaskNode[] unsharedTaskNodes) {
return (TaskNode) objArrayHandle.getVolatile(unsharedTaskNodes, RuntimeFields.headIndex);
}

TaskNode setHeadPlain(TaskNode head) {
unsafe.putObject(unsharedTaskNodes, RuntimeFields.headOffset, head);
unsharedTaskNodes[RuntimeFields.headIndex] = head;
return head;
}

boolean compareAndSetHead(final TaskNode[] unsharedTaskNodes, final TaskNode expect, final TaskNode update) {
return unsafe.compareAndSwapObject(unsharedTaskNodes, RuntimeFields.headOffset, expect, update);
static boolean compareAndSetHead(final TaskNode[] unsharedTaskNodes, final TaskNode expect, final TaskNode update) {
return objArrayHandle.compareAndSet(unsharedTaskNodes, RuntimeFields.headIndex, expect, update);
}

long getThreadStatus() {
return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.threadStatusOffset);
return (long) longArrayHandle.getVolatile(unsharedLongs, RuntimeFields.threadStatusIndex);
}

long setThreadStatusPlain(long status) {
unsafe.putLong(unsharedLongs, RuntimeFields.threadStatusOffset, status);
unsharedLongs[RuntimeFields.threadStatusIndex] = status;
return status;
}

boolean compareAndSetThreadStatus(final long expect, final long update) {
return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.threadStatusOffset, expect, update);
return longArrayHandle.compareAndSet(unsharedLongs, RuntimeFields.threadStatusIndex, expect, update);
}

void incrementActiveCount() {
unsafe.getAndAddInt(this, activeCountOffset, 1);
activeCountHandle.getAndAdd(this, 1);
}

void decrementActiveCount() {
unsafe.getAndAddInt(this, activeCountOffset, -1);
activeCountHandle.getAndAdd(this, -1);
}

boolean compareAndSetPeakThreadCount(final int expect, final int update) {
return unsafe.compareAndSwapInt(this, peakThreadCountOffset, expect, update);
return peakThreadCountHandle.compareAndSet(this, expect, update);
}

boolean compareAndSetPeakQueueSize(final int expect, final int update) {
return unsafe.compareAndSwapInt(this, peakQueueSizeOffset, expect, update);
return peakQueueSizeHandle.compareAndSet(this, expect, update);
}

long getQueueSizeVolatile() {
return unsafe.getLongVolatile(unsharedLongs, RuntimeFields.queueSizeOffset);
return (long) longArrayHandle.getVolatile(unsharedLongs, RuntimeFields.queueSizeIndex);
}

long setQueueSizePlain(long queueSize) {
unsafe.putLong(unsharedLongs, RuntimeFields.queueSizeOffset, queueSize);
unsharedLongs[RuntimeFields.queueSizeIndex] = queueSize;
return queueSize;
}

boolean compareAndSetQueueSize(final long expect, final long update) {
return unsafe.compareAndSwapLong(unsharedLongs, RuntimeFields.queueSizeOffset, expect, update);
return longArrayHandle.compareAndSet(unsharedLongs, RuntimeFields.queueSizeIndex, expect, update);
}

boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) {
return unsafe.compareAndSwapObject(this, terminationWaitersOffset, expect, update);
return terminationWaitersHandle.compareAndSet(this, expect, update);
}

Waiter getAndSetTerminationWaiters(final Waiter update) {
return (Waiter) unsafe.getAndSetObject(this, terminationWaitersOffset, update);
return (Waiter) terminationWaitersHandle.getAndSet(this, update);
}

// =======================================================
Expand Down Expand Up @@ -2294,21 +2285,13 @@ void rejectShutdown(final Task task) {
// =======================================================

abstract static class QNode {
private static final long nextOffset;

static {
try {
nextOffset = unsafe.objectFieldOffset(QNode.class.getDeclaredField("next"));
} catch (NoSuchFieldException e) {
throw new NoSuchFieldError(e.getMessage());
}
}
private static final VarHandle nextHandle = ConstantBootstraps.fieldVarHandle(lookup(), "next", VarHandle.class, QNode.class, QNode.class);

@SuppressWarnings("unused")
private volatile QNode next;

boolean compareAndSetNext(QNode expect, QNode update) {
return unsafe.compareAndSwapObject(this, nextOffset, expect, update);
return nextHandle.compareAndSet(this, expect, update);
}

QNode getNext() {
Expand All @@ -2320,11 +2303,11 @@ void setNext(final QNode node) {
}

void setNextRelaxed(final QNode node) {
unsafe.putObject(this, nextOffset, node);
nextHandle.set(this, node);
}

void setNextOrdered(final QNode node) {
unsafe.putOrderedObject(this, nextOffset, node);
nextHandle.setOpaque(this, node);
}
}

Expand Down Expand Up @@ -2360,18 +2343,8 @@ static final class PoolThreadNode extends PoolThreadNodeBase {
*/
private static final int STATE_UNPARKED = 2;


private static final long taskOffset;
private static final long parkedOffset;

static {
try {
taskOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("task"));
parkedOffset = unsafe.objectFieldOffset(PoolThreadNode.class.getDeclaredField("parked"));
} catch (NoSuchFieldException e) {
throw new NoSuchFieldError(e.getMessage());
}
}
private static final VarHandle taskHandle = ConstantBootstraps.fieldVarHandle(lookup(), "task", VarHandle.class, PoolThreadNode.class, Runnable.class);
private static final VarHandle parkedHandle = ConstantBootstraps.fieldVarHandle(lookup(), "parked", VarHandle.class, PoolThreadNode.class, int.class);

private final Thread thread;

Expand All @@ -2390,7 +2363,7 @@ static final class PoolThreadNode extends PoolThreadNodeBase {
}

boolean compareAndSetTask(final Runnable expect, final Runnable update) {
return task == expect && unsafe.compareAndSwapObject(this, taskOffset, expect, update);
return task == expect && taskHandle.compareAndSet(this, expect, update);
}

Runnable getTask() {
Expand All @@ -2403,7 +2376,7 @@ PoolThreadNode getNext() {

void park(EnhancedQueueExecutor enhancedQueueExecutor) {
int spins = PARK_SPINS;
if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
if (parked == STATE_UNPARKED && parkedHandle.compareAndSet(this, STATE_UNPARKED, STATE_NORMAL)) {
return;
}
while (spins > 0) {
Expand All @@ -2413,11 +2386,11 @@ void park(EnhancedQueueExecutor enhancedQueueExecutor) {
Thread.onSpinWait();
}
spins--;
if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
if (parked == STATE_UNPARKED && parkedHandle.compareAndSet(this, STATE_UNPARKED, STATE_NORMAL)) {
return;
}
}
if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
if (parked == STATE_NORMAL && parkedHandle.compareAndSet(this, STATE_NORMAL, STATE_PARKED)) try {
LockSupport.park(enhancedQueueExecutor);
} finally {
// parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
Expand All @@ -2436,7 +2409,7 @@ void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
//as spin time is short and for our use cases it does not matter if the time
//overruns a bit (as the nano time is for thread timeout) we just spin then check
//to keep performance consistent between the two versions.
if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
if (parked == STATE_UNPARKED && parkedHandle.compareAndSet(this, STATE_UNPARKED, STATE_NORMAL)) {
return;
}
while (spins > 0) {
Expand All @@ -2445,7 +2418,7 @@ void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
} else {
Thread.onSpinWait();
}
if (parked == STATE_UNPARKED && unsafe.compareAndSwapInt(this, parkedOffset, STATE_UNPARKED, STATE_NORMAL)) {
if (parked == STATE_UNPARKED && parkedHandle.compareAndSet(this, STATE_UNPARKED, STATE_NORMAL)) {
return;
}
spins--;
Expand All @@ -2457,7 +2430,7 @@ void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
} else {
remaining = nanos;
}
if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_PARKED)) try {
if (parked == STATE_NORMAL && parkedHandle.compareAndSet(this, STATE_NORMAL, STATE_PARKED)) try {
LockSupport.parkNanos(enhancedQueueExecutor, remaining);
} finally {
// parked can be STATE_PARKED or STATE_UNPARKED, cannot possibly be STATE_NORMAL.
Expand All @@ -2468,7 +2441,7 @@ void park(EnhancedQueueExecutor enhancedQueueExecutor, long nanos) {
}

void unpark() {
if (parked == STATE_NORMAL && unsafe.compareAndSwapInt(this, parkedOffset, STATE_NORMAL, STATE_UNPARKED)) {
if (parked == STATE_NORMAL && parkedHandle.compareAndSet(this, STATE_NORMAL, STATE_UNPARKED)) {
return;
}
LockSupport.unpark(thread);
Expand Down
15 changes: 5 additions & 10 deletions src/main/java/org/jboss/threads/EnhancedViewExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.smallrye.common.constraint.Nullable;
import io.smallrye.common.cpu.ProcessorInfo;

import java.lang.invoke.ConstantBootstraps;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -18,7 +20,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.jboss.threads.JBossExecutors.unsafe;
import static java.lang.invoke.MethodHandles.lookup;

/**
* A View Executor implementation which avoids lock contention in the common path. This allows us to
Expand All @@ -29,14 +31,7 @@
*/
final class EnhancedViewExecutor extends ViewExecutor {
private static final Logger log = Logger.getLogger("org.jboss.threads.view-executor");
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(EnhancedViewExecutor.class.getDeclaredField("state"));
} catch (NoSuchFieldException e) {
throw new NoSuchFieldError(e.getMessage());
}
}
private static final VarHandle stateHandle = ConstantBootstraps.fieldVarHandle(lookup(), "state", VarHandle.class, EnhancedViewExecutor.class, long.class);

private static final int QUEUE_FAILURE_LOG_INTERVAL =
readIntPropertyPrefixed("queue.failure.log.interval", 1_000_000);
Expand Down Expand Up @@ -429,7 +424,7 @@ private static int getQueueSize(long state) {
}

private boolean compareAndSwapState(long expected, long update) {
return unsafe.compareAndSwapLong(this, stateOffset, expected, update);
return stateHandle.compareAndSet(this, expected, update);
}

private Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
Expand Down
Loading

0 comments on commit 6280dae

Please sign in to comment.