Skip to content

Commit

Permalink
[Wisp] Port wisp2 shutdown fix to JDK17.
Browse files Browse the repository at this point in the history
Summary:
The change is to port wisp shutdown fix including:
-Throw universal tenant death exception at shutdown
-Delay coroutineSupport destruction and protect runningTaskCount
-Use loop instead of recursion to check wisp critical

Test Plan: jtreg TenantExceptionTest.java PreemptTest.java

Reviewed-by: yulei

Issue:
#101
  • Loading branch information
ZhaiMo15 authored and yuleil committed Jul 24, 2023
1 parent 298b24a commit a7d9d84
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 96 deletions.
8 changes: 5 additions & 3 deletions src/hotspot/share/prims/unsafe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,11 +1010,13 @@ JVM_ENTRY(jboolean, CoroutineSupport_stealCoroutine(JNIEnv* env, jclass klass, j
return true;
JVM_END

JVM_ENTRY (jboolean, CoroutineSupport_isInClinit0(JNIEnv* env, jclass klass, jlong coroPtr))
JVM_ENTRY (void, CoroutineSupport_checkAndThrowException0(JNIEnv* env, jclass klass, jlong coroPtr))
assert(EnableCoroutine, "pre-condition");
Coroutine* coro = (Coroutine*)coroPtr;
assert(coro == thread->current_coroutine(), "invariant");
return coro->clinit_call_count() != 0;
if (!coro->is_yielding() && coro->clinit_call_count() == 0) {
throw_new(env, "java/lang/ThreadDeath");
}
JVM_END

JVM_ENTRY (jobjectArray, CoroutineSupport_getCoroutineStack(JNIEnv* env, jclass klass, jlong coroPtr))
Expand Down Expand Up @@ -1129,7 +1131,7 @@ JNINativeMethod coroutine_support_methods[] = {
{CC"moveCoroutine", CC "(JJ)V", FN_PTR(CoroutineSupport_moveCoroutine)},
{CC"markThreadCoroutine", CC "(J" COBA ")V", FN_PTR(CoroutineSupport_markThreadCoroutine)},
{CC"getCoroutineStack", CC "(J)[" STE, FN_PTR(CoroutineSupport_getCoroutineStack)},
{CC"isInClinit0", CC "(J)Z", FN_PTR(CoroutineSupport_isInClinit0)},
{CC"checkAndThrowException0", CC "(J)V", FN_PTR(CoroutineSupport_checkAndThrowException0)},
};

#define COMPILE_CORO_METHODS_BEFORE (3)
Expand Down
148 changes: 69 additions & 79 deletions src/hotspot/share/runtime/coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,74 @@ void Coroutine::frames_do(FrameClosure* fc) {
}
}

bool Coroutine::is_coroutine_frame(vframe* f) {
javaVFrame* jvf = javaVFrame::cast(f);
InstanceKlass* k = jvf->method()->method_holder();
return (k == vmClasses::com_alibaba_wisp_engine_WispTask_klass()
|| k == vmClasses::com_alibaba_wisp_engine_WispEngine_klass()
|| k->is_subtype_of(vmClasses::com_alibaba_wisp_engine_WispEngine_klass())
|| k == vmClasses::com_alibaba_wisp_engine_WispEventPump_klass()
|| k == vmClasses::com_alibaba_wisp_engine_WispTask_CacheableCoroutine_klass()
|| k == vmClasses::java_dyn_CoroutineBase_klass());
}

/* a typical wisp stack looks like:
at java.dyn.CoroutineSupport.unsafeSymmetricYieldTo(CoroutineSupport.java:134)
- parking to wait for <0x00000007303e1c28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at com.alibaba.wisp.engine.WispTask.switchTo(WispTask.java:254)
at com.alibaba.wisp.engine.WispEngine.yieldTo(WispEngine.java:613)
at com.alibaba.wisp.engine.Wisp2Engine.yieldToNext(Wisp2Engine.java:211)
at com.alibaba.wisp.engine.WispEngine.yieldOnBlocking(WispEngine.java:574)
at com.alibaba.wisp.engine.WispTask.parkInternal(WispTask.java:350)
at com.alibaba.wisp.engine.WispTask.jdkPark(WispTask.java:403)
at com.alibaba.wisp.engine.WispEngine$4.park(WispEngine.java:224)
at sun.misc.Unsafe.park(Unsafe.java:1029)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:176)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1077)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:627)
at java.lang.Thread.run(Thread.java:861)
at com.alibaba.wisp.engine.WispTask.coroutineCacheLoop(WispTask.java:213)
at com.alibaba.wisp.engine.WispTask.access$000(WispTask.java:33)
at com.alibaba.wisp.engine.WispTask$CacheableCoroutine.run(WispTask.java:153)
at java.dyn.CoroutineBase.startInternal(CoroutineBase.java:60)
Preempt should only happened when we're executing the non-wisp part.
*/
bool Coroutine::in_critical(JavaThread* thread) {
RegisterMap reg_map(thread);
bool has_wisp_frame = false;
bool has_other_frame = false;
for (vframe* f = thread->last_java_vframe(&reg_map); f; f = f->sender()) {
if (!f->is_java_frame()) {
continue;
}
/*
wisp_frame: wisp internal schedule related frames:
other_frame: non wisp_frame
under these two senarios current stack are consiedered in critical:
1. all frames are wisp_frame
2. a wisp_frame's sender is an other_frame
*/
if (is_coroutine_frame(f)) {
has_wisp_frame = true;
} else {
if (has_wisp_frame) {
if (VerboseWisp) {
tty->print_cr("[WISP] preempt was blocked, because wisp internal method on the stack");
}
return true;
}
has_other_frame = true;
}
}
if (VerboseWisp && has_wisp_frame && !has_other_frame) {
tty->print_cr("[WISP] preempt was blocked, because only wisp method on the stack");
}
return has_wisp_frame && !has_other_frame;
}

class oops_do_Closure: public FrameClosure {
private:
OopClosure* _f;
Expand Down Expand Up @@ -858,84 +926,6 @@ const char* WispThread::print_blocking_status(int status) {
}
}

class WispCriticalVerifier : public StackObj {
friend Coroutine;
/* a typical wisp stack looks like:
at java.dyn.CoroutineSupport.unsafeSymmetricYieldTo(CoroutineSupport.java:134)
- parking to wait for <0x00000007303e1c28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at com.alibaba.wisp.engine.WispTask.switchTo(WispTask.java:254)
at com.alibaba.wisp.engine.WispEngine.yieldTo(WispEngine.java:613)
at com.alibaba.wisp.engine.Wisp2Engine.yieldToNext(Wisp2Engine.java:211)
at com.alibaba.wisp.engine.WispEngine.yieldOnBlocking(WispEngine.java:574)
at com.alibaba.wisp.engine.WispTask.parkInternal(WispTask.java:350)
at com.alibaba.wisp.engine.WispTask.jdkPark(WispTask.java:403)
at com.alibaba.wisp.engine.WispEngine$4.park(WispEngine.java:224)
at sun.misc.Unsafe.park(Unsafe.java:1029)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:176)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1077)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:627)
at java.lang.Thread.run(Thread.java:861)
at com.alibaba.wisp.engine.WispTask.coroutineCacheLoop(WispTask.java:213)
at com.alibaba.wisp.engine.WispTask.access$000(WispTask.java:33)
at com.alibaba.wisp.engine.WispTask$CacheableCoroutine.run(WispTask.java:153)
at java.dyn.CoroutineBase.startInternal(CoroutineBase.java:60)
Preempt should only happened when we're executing the non-wisp part.
*/

bool _is_wisp_entry;
// see above, all the wisp task stacks are started with 'fixed' wisp frames,
// e.g. java.dyn.CoroutineBase, com.alibaba.wisp.engine.WispTask etc.
bool _found_non_bottom_wisp_frame;

WispCriticalVerifier(JavaThread* thread)
: _is_wisp_entry(true), _found_non_bottom_wisp_frame(false) {
if (!thread->has_last_Java_frame()) {
_is_wisp_entry = false;
return;
}

ResourceMark rm;
HandleMark hm(thread);
RegisterMap reg_map(thread);
transverse_frame(thread->last_java_vframe(&reg_map));
}

bool in_critical() { return _is_wisp_entry || _found_non_bottom_wisp_frame; }

void transverse_frame(vframe* f);
};

void WispCriticalVerifier::transverse_frame(vframe* f) {
if (!f) return;
transverse_frame(f->sender()); // from bottom to top

// 1. skip wisp task entry frames
// 2. if a wisp frame is found, we're in critical
if (f->is_java_frame()) {
javaVFrame* jvf = javaVFrame::cast(f);
InstanceKlass* k = jvf->method()->method_holder();
if (_is_wisp_entry) {
if (k != vmClasses::com_alibaba_wisp_engine_WispTask_klass()
&& k != vmClasses::com_alibaba_wisp_engine_WispTask_CacheableCoroutine_klass()
&& k != vmClasses::java_dyn_CoroutineBase_klass()) {
_is_wisp_entry = false;
}
} else if (k == vmClasses::com_alibaba_wisp_engine_WispTask_klass()
|| k == vmClasses::com_alibaba_wisp_engine_WispEngine_klass()
|| k->is_subtype_of(vmClasses::com_alibaba_wisp_engine_WispEngine_klass())
|| k == vmClasses::com_alibaba_wisp_engine_WispEventPump_klass()) {
_found_non_bottom_wisp_frame = true;
if (VerboseWisp) {
tty->print_cr("[WISP] preempt was blocked, because wisp internal method on the stack");
}
}
}
}

void Coroutine::after_safepoint(JavaThread* thread) {
assert(Thread::current() == thread, "sanity check");

Expand All @@ -948,7 +938,7 @@ void Coroutine::after_safepoint(JavaThread* thread) {
// rather than thread state transition.
coroutine->_is_yielding || !thread->wisp_preempted() ||
thread->has_pending_exception() || thread->has_async_exception_condition() ||
WispCriticalVerifier(thread).in_critical()) {
coroutine->in_critical(thread)) {
return;
}

Expand Down
4 changes: 4 additions & 0 deletions src/hotspot/share/runtime/coroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class Coroutine: public CHeapObj<mtThread>, public DoublyLinkedList<Coroutine> {
void set_state(CoroutineState x) { _state = x; }

bool is_thread_coroutine() const { return _is_thread_coroutine; }
bool is_yielding() const { return _is_yielding; }

JavaThread* thread() const { return _thread; }
void set_thread(JavaThread* x) { _thread = x; }
Expand Down Expand Up @@ -213,6 +214,9 @@ class Coroutine: public CHeapObj<mtThread>, public DoublyLinkedList<Coroutine> {
oop print_stack_header_on(outputStream* st);
void print_stack_on(outputStream* st);

bool is_coroutine_frame(vframe* f);
bool in_critical(JavaThread* thread);

// GC support
void oops_do(OopClosure* f, CodeBlobClosure* cf);
void nmethods_do(CodeBlobClosure* cf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.dyn.CoroutineSupport;

/**
* Per-thread self-scheduled implementation of WispEngine.
Expand Down Expand Up @@ -106,8 +107,11 @@ protected void dispatchTask(Runnable target, String name, Thread t) {
return false;
} else {
WispEngine.current().countEnqueueTime(enqueueTime);
runTaskInternal(target, name, t, ctxClassLoader);
return true; // means real switch happened
/*
null is expected when we are trying to add new wispTask to a engine
that has been shut down
*/
return runTaskInternal(target, name, t, ctxClassLoader) != null; // means real switch happened
}
};
wakeupTask(pseudo);
Expand Down Expand Up @@ -601,7 +605,7 @@ void doShutdown() {
thread.getCoroutineSupport().drain();
shutdownFuture.countDown();
} else {
UNSAFE.throwException(new ThreadDeath());
CoroutineSupport.checkAndThrowException(current.ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.dyn.CoroutineSupport;

/**
* In wisp1 WispTask could run in ANY user created thread.
Expand Down Expand Up @@ -123,7 +124,16 @@ protected void cancelTimer() {
* @return if success
*/
private static boolean steal(WispTask task, WispEngine current, boolean failOnContention) {
if (current.hasBeenShutdown) {
/* shutdown is an async operation in wisp2, SHUTDOWN task relies on runningTaskCount to
determine whether it's okay to exit the carrier, hence we need to make sure no more new
wispTasks are created or stolen for hasBeenShutdown engines
for example:
1. SHUTDOWN task found runningTaskCount equals 0 and exit
2. carrier's task queue may still has some remaining tasks, when tried to steal these tasks
we may encounter jvm crash.
TODO:// merge shutdown and handoff dependencies check
*/
if (current.hasBeenShutdown || task.engine.hasBeenShutdown) {
return false;
}
assert current == WispEngine.current();
Expand Down Expand Up @@ -367,10 +377,14 @@ private void notifyAndWaitTasksForShutdown(Wisp2Engine engine) {
@Override
public void run() {
// set shutdownFinished true when SHUTDOWN-TASK finished
// SHUTDOWN-TASK would keep interrupting running tasks until
// SHUTDOWN TASK would keep interrupting running tasks until
// it is the only running one
assert runningTaskCount == 0;

try {
group.shutdownBarrier.await();
} catch (Exception e) {
System.out.println("[Wisp] unexpected barrier exception " + e + ", thread: " + thread);
}
thread.getCoroutineSupport().drain();
engine.shutdownFuture.countDown();

Expand All @@ -386,15 +400,12 @@ public boolean isStealEnable() {

@Override
protected void runTaskYieldEpilog() {
if (hasBeenShutdown) {
assert WispEngine.current().current.engine == this;
if (WispTask.SHUTDOWN_TASK_NAME.equals(current.getName())
|| current == threadTask
|| CoroutineSupport.isInClinit(current.ctx)) {
return;
}
UNSAFE.throwException(new ThreadDeath());
if (!hasBeenShutdown || WispTask.SHUTDOWN_TASK_NAME.equals(current.getName())
|| current == threadTask) {
return;
}
assert WispEngine.current().current.engine == this;
CoroutineSupport.checkAndThrowException(current.ctx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class Wisp2Group extends AbstractExecutorService {
final Wisp2Scheduler scheduler;
final Set<Wisp2Engine> carrierEngines;
final Queue<WispTask> groupTaskCache = new ConcurrentLinkedDeque<>();
final CyclicBarrier shutdownBarrier;

/**
* Create a new WispV2Group for executing tasks.
Expand All @@ -53,6 +54,8 @@ public int compare(Wisp2Engine o1, Wisp2Engine o2) {
*/
private Wisp2Group(String name) {
carrierEngines = createEngineSet();
// detached group won't shut down
shutdownBarrier = null;
scheduler = new Wisp2Scheduler(
WispConfiguration.WORKER_COUNT,
WispConfiguration.WISP_SCHEDULE_STEAL_RETRY,
Expand All @@ -72,6 +75,7 @@ public Thread newThread(Runnable r) {

private Wisp2Group(int size, ThreadFactory tf) {
carrierEngines = createEngineSet();
shutdownBarrier = new CyclicBarrier(size);
scheduler = new Wisp2Scheduler(size, tf, this);
}

Expand All @@ -85,6 +89,7 @@ public List<Long> getWispEngineIDs() {

@Override
public void shutdown() {
assert this != Wisp2Group.WISP2_ROOT_GROUP;
for (Wisp2Engine worker : carrierEngines) {
worker.shutdown();
}
Expand Down
10 changes: 10 additions & 0 deletions src/java.base/share/classes/java/dyn/CoroutineSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ Thread getThread() {
return thread;
}

/**
* check if we should throw a TenantDeath or ThreqadDeathException
* @param coroutine the coroutine
*/
public static void checkAndThrowException(Coroutine coroutine) {
checkAndThrowException0(coroutine.nativeCoroutine);
}


/**
* Telling if current coroutine is executing clinit
Expand Down Expand Up @@ -413,4 +421,6 @@ public CoroutineBase getCurrent() {
public static native StackTraceElement[] getCoroutineStack(long coroPtr);

private static native boolean isInClinit0(long coroPtr);

private static native void checkAndThrowException0(long coroPtr);
}
19 changes: 19 additions & 0 deletions test/jdk/com/alibaba/wisp/thread/TestPreempt.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public class TestPreempt {
public static void main(String[] args) throws Exception {
doTest(TestPreempt::complexLoop);
doTest(TestPreempt::simpleLoop);
doTest(new Runnable() {
@Override
public void run() {
stackTrace(0);
}
});
}

private static void doTest(Runnable r) throws Exception {
Expand Down Expand Up @@ -49,6 +55,19 @@ private static void simpleLoop() {
} while (x == n);
}


private static void stackTrace(int i) {
if (i == 10000) {
int x;
do {
x = n;
} while (x == n);
} else {
stackTrace(i + 1);
}
}


// TODO: handle safepoint consumed by state switch

volatile static int n;
Expand Down

0 comments on commit a7d9d84

Please sign in to comment.