Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java 21] Fix server hanging issue with isolated strand #43457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.ballerina.runtime.internal.launch;

import io.ballerina.runtime.api.Module;
import io.ballerina.runtime.api.launch.LaunchListener;
import io.ballerina.runtime.internal.configurable.ConfigMap;
import io.ballerina.runtime.internal.configurable.ConfigProvider;
import io.ballerina.runtime.internal.configurable.ConfigResolver;
Expand All @@ -43,7 +42,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;

import static io.ballerina.runtime.api.constants.RuntimeConstants.DOT;
Expand All @@ -66,14 +64,10 @@ public class LaunchUtils {
private LaunchUtils() {
}

public static void startListenersAndSignalHandler(boolean isService) {
// starts all listeners
startListeners(isService);

// start TRAP signal handler which produces the strand dump
startTrapSignalHandler();
}

@SuppressWarnings("unused")
/*
* Used for codegen. This will handle trap signals for strand dump.
*/
public static void startTrapSignalHandler() {
try {
Signal.handle(new Signal("TRAP"), signal -> outStream.println(StrandDump.getStrandDump()));
Expand All @@ -83,16 +77,10 @@ public static void startTrapSignalHandler() {
}
}

public static void startListeners(boolean isService) {
ServiceLoader<LaunchListener> listeners = ServiceLoader.load(LaunchListener.class);
listeners.forEach(listener -> listener.beforeRunProgram(isService));
}

public static void stopListeners(boolean isService) {
ServiceLoader<LaunchListener> listeners = ServiceLoader.load(LaunchListener.class);
listeners.forEach(listener -> listener.afterRunProgram(isService));
}

@SuppressWarnings("unused")
/*
* Used for codegen adding module configurable data.
*/
public static void addModuleConfigData(Map<Module, VariableKey[]> configurationData, Module m,
VariableKey[] variableKeys) {
VariableKey[] currKeys = configurationData.get(m);
Expand All @@ -107,6 +95,10 @@ public static void addModuleConfigData(Map<Module, VariableKey[]> configurationD
configurationData.put(m, mergedKeyArray);
}

@SuppressWarnings("unused")
/*
* Used for codegen initialize configurable variables.
*/
public static void initConfigurableVariables(Module rootModule, Map<Module, VariableKey[]> configurationData,
String[] args, Path[] configFilePaths, String configContent) {

Expand Down Expand Up @@ -159,6 +151,10 @@ private static String populateConfigDetails(List<Path> paths, Map<String, String
return null;
}

@SuppressWarnings("unused")
/*
* Used for codegen to get configurable input paths.
*/
public static ConfigDetails getTestConfigPaths(Module module, String pkgName, String sourceRoot) {
String moduleName = module.getName();
Path testConfigPath = Paths.get(sourceRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,17 @@
*/
public class AsyncUtils {

public static Object handleNonIsolatedStrand(Strand strand, Supplier<Boolean> conditionSupplier,
Supplier<?> resultSupplier) {
boolean waitDone = false;
while (!waitDone) {
try {
strand.yield();
waitDone = conditionSupplier.get();
} finally {
strand.resume();
}
public static Object handleNonIsolatedStrand(Strand strand, Supplier<?> resultSupplier) {
boolean runnable = strand.isRunnable();
if (runnable) {
strand.yield();

}
Object result = resultSupplier.get();
if (runnable) {
strand.resume();
}
return resultSupplier.get();
return result;
}

@SuppressWarnings("unused")
Expand All @@ -69,7 +68,7 @@ public static Object handleWait(Strand strand, CompletableFuture<Object> complet
if (strand.isIsolated) {
return getFutureResult(completableFuture);
}
return handleNonIsolatedStrand(strand, completableFuture::isDone, () -> getFutureResult(completableFuture));
return handleNonIsolatedStrand(strand, () -> getFutureResult(completableFuture));
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -112,16 +111,7 @@ public static void handleWaitMultiple(Strand strand, Map<String, FutureValue> fu
getAllFutureResult(futureMap, alreadyWaitedKeys, target);
}
handleNonIsolatedStrand(strand, () -> {
for (CompletableFuture<?> cFuture : cFutures) {
if (cFuture.isCompletedExceptionally()) {
getFutureResult(cFuture);
}
if (!cFuture.isDone()) {
return false;
}
}
return true;
}, () -> {
waitForAllFutureResult(cFutures.toArray(new CompletableFuture[0]));
getAllFutureResult(futureMap, alreadyWaitedKeys, target);
return null;
});
Expand All @@ -132,14 +122,7 @@ public static Object handleWaitAny(Strand strand, CompletableFuture<?>[] cFuture
if (strand.isIsolated) {
result = getAnyFutureResult(cFutures);
} else {
result = handleNonIsolatedStrand(strand, () -> {
for (CompletableFuture<?> completableFuture : cFutures) {
if (completableFuture.isDone()) {
return true;
}
}
return false;
}, () -> getAnyFutureResult(cFutures));
result = handleNonIsolatedStrand(strand, () -> getAnyFutureResult(cFutures));
}

if (cFutures.length > 1 && result instanceof BError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,48 @@ public static Strand getStrand() {
return daemonStrand;
}
public Object call(Module module, String functionName, Strand parentStrand, Object... args) {
boolean runnable = parentStrand.isRunnable();
if (!runnable) {
parentStrand.resume();
}
ValueCreatorAndFunctionType functionType = getGetValueCreatorAndFunctionType(module, functionName);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(functionType.valueCreator(),
functionType.functionType(), parentStrand, args);
return functionType.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
Object result = functionType.valueCreator().call(parentStrand, functionName, argsWithDefaultValues);
if (!runnable) {
parentStrand.yield();
}
return result;
}

public Object call(BObject object, String methodName, Strand parentStrand, Object... args) {
boolean runnable = parentStrand.isRunnable();
if (!runnable) {
parentStrand.resume();
}
ObjectType objectType = (ObjectType) TypeUtils.getImpliedType(object.getOriginalType());
MethodType methodType = getObjectMethodType(methodName, objectType);
Object[] argsWithDefaultValues = getArgsWithDefaultValues(objectType, methodType, parentStrand, args);
return ((ObjectValue) object).call(parentStrand, methodName, argsWithDefaultValues);
Object result = ((ObjectValue) object).call(parentStrand, methodName, argsWithDefaultValues);
if (!runnable) {
parentStrand.yield();
}
return result;
}

public Object call(FPValue fp, Strand parentStrand, Object... args) {
boolean runnable = parentStrand.isRunnable();
if (!runnable) {
parentStrand.resume();
}
FunctionType functionType = (FunctionType) TypeUtils.getImpliedType(TypeUtils.getType(fp));
Object[] argsWithDefaultValues = getArgsWithDefaultValues(parentStrand, args, functionType);
Object[] argsWithStrand = getArgsWithStrand(parentStrand, argsWithDefaultValues);
return fp.function.apply(argsWithStrand);
Object result = fp.function.apply(argsWithStrand);
if (!runnable) {
parentStrand.yield();
}
return result;
}

public FutureValue startIsolatedWorker(String functionName, Module module, Strand parentStrand, String strandName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class Strand {
private Map<String, Object> globalProps;

public final boolean isIsolated;
public State state = State.YIELD;
public boolean cancelled;
public Scheduler scheduler;
public Strand parent;
public TransactionLocalContext currentTrxContext;
Expand Down Expand Up @@ -89,26 +89,30 @@ public Strand(String name, StrandMetadata metadata, Scheduler scheduler, Strand

public void resume() {
checkStrandCancelled();
if (!this.isIsolated && this.state == State.YIELD) {
if (!this.isIsolated && !scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
this.scheduler.globalNonIsolatedLock.lock();
this.state = State.RUNNABLE;

}
}

public void yield() {
checkStrandCancelled();
if (!this.isIsolated && this.state == State.RUNNABLE) {
if (!this.isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
scheduler.globalNonIsolatedLock.unlock();
this.state = State.YIELD;
}
}

public void done() {
if (!isIsolated && this.state == State.RUNNABLE) {
if (!isIsolated && scheduler.globalNonIsolatedLock.isHeldByCurrentThread()) {
scheduler.globalNonIsolatedLock.unlock();
}
}

public boolean isRunnable() {
return isIsolated || this.scheduler.globalNonIsolatedLock.isHeldByCurrentThread();
}


private TransactionLocalContext createTrxContextBranch(TransactionLocalContext currentTrxContext,
int strandName) {
TransactionLocalContext trxCtx = TransactionLocalContext
Expand Down Expand Up @@ -181,20 +185,8 @@ public StrandMetadata getMetadata() {
}

public void checkStrandCancelled() {
if (this.state == State.CANCELLED) {
if (this.cancelled) {
throw ErrorUtils.createCancelledFutureError();
}
}

/**
* Maintains the Strand state.
*
* @since 2201.11.0
*/
public enum State {
RUNNABLE,
YIELD,
CANCELLED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ public static Object flush(Strand strand, WorkerChannelMap workerChannelMap, Str
if (strand.isIsolated) {
AsyncUtils.waitForAllFutureResult(futures);
} else {
AsyncUtils.handleNonIsolatedStrand(strand, () -> isAllFuturesCompleted(futures), () -> null);
AsyncUtils.handleNonIsolatedStrand(strand, () -> {
AsyncUtils.waitForAllFutureResult(futures);
return null;
});
}

for (WorkerChannel channel : channels) {
Expand All @@ -95,7 +98,7 @@ public static Object receive(Strand strand, WorkerChannelMap workerChannelMap, S
if (strand.isIsolated) {
return channel.read();
}
return AsyncUtils.handleNonIsolatedStrand(strand, channel.getResultFuture()::isDone, channel::read);
return AsyncUtils.handleNonIsolatedStrand(strand, channel::read);
}

/*
Expand Down Expand Up @@ -135,8 +138,11 @@ public static BMap<BString, Object> multipleReceive(Strand strand, WorkerChannel
AsyncUtils.waitForAllFutureResult(futures);
return getMultipleReceiveResult(workerChannelMap, channelFieldNameMap, targetType, channels);
}
return (BMap<BString, Object>) AsyncUtils.handleNonIsolatedStrand(strand, () -> isAllFuturesCompleted(futures),
() -> getMultipleReceiveResult(workerChannelMap, channelFieldNameMap, targetType, channels));
return (BMap<BString, Object>) AsyncUtils.handleNonIsolatedStrand(strand,
() -> {
AsyncUtils.waitForAllFutureResult(futures);
return getMultipleReceiveResult(workerChannelMap, channelFieldNameMap, targetType, channels);
});
}

/*
Expand Down Expand Up @@ -207,18 +213,6 @@ private static Object getAlternativeReceiveResult(Strand strand, CompletableFutu
return result;
}

private static Boolean isAllFuturesCompleted(CompletableFuture<?>[] futures) {
for (CompletableFuture<?> future : futures) {
if (future.isCompletedExceptionally()) {
AsyncUtils.getFutureResult(future);
}
if (!future.isDone()) {
return false;
}
}
return true;
}

private static BMap<BString, Object> getMultipleReceiveResult(WorkerChannelMap workerChannelMap,
Map<String, String> channelFieldNameMap,
Type targetType, WorkerChannel[] channels) {
Expand Down
Loading
Loading