From e9d76a1d687ba36939502f6e5d179da50709860f Mon Sep 17 00:00:00 2001 From: gabilang Date: Wed, 17 Apr 2024 10:53:27 +0530 Subject: [PATCH 1/4] Fix worker send hangs when worker early return within if --- .../runtime/internal/scheduling/Strand.java | 1 + .../bir/codegen/methodgen/MethodGen.java | 5 +++++ .../langlib/internal/WorkerChannels.java | 2 +- .../worker/WorkerConditionalSendTest.java | 3 ++- .../workers/workers_conditional_send.bal | 19 +++++++++++++++++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java index df347a897fef..4a967bf8ce36 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java @@ -74,6 +74,7 @@ public class Strand { public Scheduler scheduler; public Strand parent; public WDChannels wdChannels; + public int wdChannelIndex; public FlushDetail flushDetail; public boolean blockedOnExtern; public Set channelDetails; diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java index cf3003843da8..fbd500862ab3 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java @@ -161,6 +161,7 @@ public class MethodGen { protected static final String FUNCTION_INVOCATION = "functionInvocation"; private static final String INVOCATION_COUNT = "%invocationCount"; private static final String RESUME_INDEX = "resumeIndex"; + private static final String WD_CHANNEL_INDEX = "wdChannelIndex"; private final JvmPackageGen jvmPackageGen; private final SymbolTable symbolTable; private final Types types; @@ -399,6 +400,10 @@ private void setFunctionInvocationVar(int localVarOffset, MethodVisitor mv, int } else { // this means this is a function created for a worker mv.visitVarInsn(ILOAD, invocationCountArgVarIndex); + + mv.visitVarInsn(ALOAD, localVarOffset); + mv.visitVarInsn(ILOAD, invocationCountArgVarIndex); + mv.visitFieldInsn(PUTFIELD, STRAND_CLASS, WD_CHANNEL_INDEX, "I"); } mv.visitVarInsn(ISTORE, invocationVarIndex); } diff --git a/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java b/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java index 537ef1bb2c66..e21196944c9b 100644 --- a/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java +++ b/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java @@ -40,7 +40,7 @@ public static void autoClose(BString[] channelIds) { Strand currentStrand = Scheduler.getStrand(); Strand channelHoldingStrand = Objects.requireNonNullElse(currentStrand.parent, currentStrand); for (BString channelId : channelIds) { - String channelName = channelId.getValue() + ":" + (channelHoldingStrand.functionInvocation - 1); + String channelName = channelId.getValue() + ":" + currentStrand.wdChannelIndex; WorkerDataChannel workerDataChannel = channelHoldingStrand.wdChannels.getWorkerDataChannel(channelName); workerDataChannel.autoClose(); } diff --git a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/WorkerConditionalSendTest.java b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/WorkerConditionalSendTest.java index 720441d1454d..c3362ab06417 100644 --- a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/WorkerConditionalSendTest.java +++ b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/WorkerConditionalSendTest.java @@ -67,7 +67,8 @@ public static String[] functionProvider() { "multipleReceiveConditional", "multipleReceiveWithNonConditionalSend", "testNonTopLevelSend", - "testSendWithEarlyReturnError" + "testSendWithEarlyReturnError", + "testWorkerEarlyReturnWithinIf" }; } diff --git a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal index 34b59c11e8ee..fee19af2d237 100644 --- a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal +++ b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal @@ -475,3 +475,22 @@ function testSendWithEarlyReturnError() { test:assertEquals(mapResult["a"], false, "Invalid boolean result"); test:assertEquals(mapResult["b"], 2, "Invalid int result"); } + +public function testWorkerEarlyReturnWithinIf() { + worker w1 { + boolean b = true; + if b { + return; + } else { + 30 -> w2; + } + } + worker w2 { + int|errorLib:NoMessage m = <- w1; + test:assertTrue(m is error); + error err = m; + test:assertEquals(err.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w1' to worker 'w2'\"}", "Invalid error detail"); + } + wait w2; +} From 15d5b7e6123890801d167faba91cddcd1e67bb76 Mon Sep 17 00:00:00 2001 From: gabilang Date: Tue, 14 May 2024 10:38:15 +0530 Subject: [PATCH 2/4] Add tests with multiple worker channels --- .../workers/workers_conditional_send.bal | 88 +++++++++++++++++-- 1 file changed, 82 insertions(+), 6 deletions(-) diff --git a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal index fee19af2d237..4f8de25f161d 100644 --- a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal +++ b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal @@ -482,15 +482,91 @@ public function testWorkerEarlyReturnWithinIf() { if b { return; } else { - 30 -> w2; + 30 -> w4; + if (b) { + return; + } else { + 40 -> w4; + if (b) { + return; + } else { + 50 -> w5; + } + } } } + worker w2 { - int|errorLib:NoMessage m = <- w1; - test:assertTrue(m is error); - error err = m; + boolean b = true; + if (b) { + return; + } else { + 60 -> w4; + } + } + + worker w3 { + boolean b = true; + if (b) { + if (b) { + return; + } else { + 70 -> w4; + } + } else if (b) { + 80 -> w5; + } else { + 90 -> w4; + if (b) { + return; + } else { + 100 -> w5; + } + } + } + + worker w4 { + map m = <- {w1, w2, w3}; + test:assertTrue(m["w1"] is error); + error err = m["w1"]; + test:assertEquals(err.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w1' to worker 'w4'\"}", "Invalid error detail"); + test:assertTrue(m["w2"] is error); + err = m["w2"]; + test:assertEquals(err.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w2' to worker 'w4'\"}", "Invalid error detail"); + + int|errorLib:NoMessage res1 = <- w1; + test:assertTrue(res1 is error); + error err1 = res1; + test:assertEquals(err1.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err1.detail().toString(), "{\"message\":\"no message received from worker 'w1' to worker 'w4'\"}", "Invalid error detail"); + + int|errorLib:NoMessage res2 = <- w3; + test:assertTrue(res2 is error); + error err2 = res2; + test:assertEquals(err2.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err2.detail().toString(), "{\"message\":\"no message received from worker 'w3' to worker 'w4'\"}", "Invalid error detail"); + } + + worker w5 { + map m = <- {w1, w3}; + test:assertTrue(m["w1"] is error); + error err = m["w1"]; test:assertEquals(err.message(), "NoMessage", "Invalid error message"); - test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w1' to worker 'w2'\"}", "Invalid error detail"); + test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w1' to worker 'w5'\"}", "Invalid error detail"); + test:assertTrue(m["w3"] is error); + err = m["w3"]; + test:assertEquals(err.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err.detail().toString(), "{\"message\":\"no message received from worker 'w3' to worker 'w5'\"}", "Invalid error detail"); + + int|errorLib:NoMessage res1 = <- w3; + test:assertTrue(res1 is error); + error err1 = res1; + test:assertEquals(err1.message(), "NoMessage", "Invalid error message"); + test:assertEquals(err1.detail().toString(), "{\"message\":\"no message received from worker 'w3' to worker 'w5'\"}", "Invalid error detail"); } - wait w2; + + wait w4; + wait w5; } From 7e130ed83009c0a5e3515ac64c334f760e8d8eb1 Mon Sep 17 00:00:00 2001 From: gabilang Date: Thu, 16 May 2024 17:28:06 +0530 Subject: [PATCH 3/4] Refactor code and add more unit tests --- .../bir/codegen/methodgen/MethodGen.java | 8 +++--- .../test/worker/BasicWorkerTest.java | 3 ++- .../test-src/workers/basic-worker-actions.bal | 27 +++++++++++++++++++ .../workers/workers_conditional_send.bal | 5 ++++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java index fbd500862ab3..6d25e56b495c 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java @@ -400,10 +400,6 @@ private void setFunctionInvocationVar(int localVarOffset, MethodVisitor mv, int } else { // this means this is a function created for a worker mv.visitVarInsn(ILOAD, invocationCountArgVarIndex); - - mv.visitVarInsn(ALOAD, localVarOffset); - mv.visitVarInsn(ILOAD, invocationCountArgVarIndex); - mv.visitFieldInsn(PUTFIELD, STRAND_CLASS, WD_CHANNEL_INDEX, "I"); } mv.visitVarInsn(ISTORE, invocationVarIndex); } @@ -442,6 +438,10 @@ private void setChannelDetailsToStrand(BIRFunction func, int localVarOffset, Met JvmCodeGenUtil.loadChannelDetails(mv, Arrays.asList(func.workerChannels), invocationVarIndex); mv.visitMethodInsn(INVOKEVIRTUAL, STRAND_CLASS, "updateChannelDetails", UPDATE_CHANNEL_DETAILS, false); + // Update wdChannelIndex of the strand. + mv.visitVarInsn(ALOAD, localVarOffset); + mv.visitVarInsn(ILOAD, invocationVarIndex); + mv.visitFieldInsn(PUTFIELD, STRAND_CLASS, WD_CHANNEL_INDEX, "I"); } private void checkStrandCancelled(MethodVisitor mv, int localVarOffset) { diff --git a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/BasicWorkerTest.java b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/BasicWorkerTest.java index 351655a59661..9b9fa572f6d0 100644 --- a/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/BasicWorkerTest.java +++ b/tests/jballerina-unit-test/src/test/java/org/ballerinalang/test/worker/BasicWorkerTest.java @@ -131,7 +131,8 @@ public void testWorkerMessagePassing(String funcName) { public Object[] workerMessagePassingFunctions() { return new Object[]{ "testWorkerMessagePassingRepeatedly", - "testPanicWithMessagePassing" + "testPanicWithMessagePassing", + "testEarlyReturnWithMessagePassing" }; } diff --git a/tests/jballerina-unit-test/src/test/resources/test-src/workers/basic-worker-actions.bal b/tests/jballerina-unit-test/src/test/resources/test-src/workers/basic-worker-actions.bal index 529d4284bf7c..c592750694cd 100644 --- a/tests/jballerina-unit-test/src/test/resources/test-src/workers/basic-worker-actions.bal +++ b/tests/jballerina-unit-test/src/test/resources/test-src/workers/basic-worker-actions.bal @@ -15,6 +15,7 @@ // under the License. import ballerina/jballerina.java; +import ballerina/lang.'error as errorLib; import ballerina/lang.'value as value; import ballerina/test; @@ -389,6 +390,32 @@ function foo() { } } +function testEarlyReturnWithMessagePassing() { + foreach int i in 0 ... 100 { + func1(); + func2(); + } +} + +function func2() { + func1(); +} + +function func1() { + foo(); + worker w1 { + if (1 > 0) { + return; + } else { + 10 -> w2; + } + } + + worker w2 { + int|errorLib:NoMessage y = <- w1; + } +} + function testWorkerMessagePassingRepeatedly() { future<()> futureResult1 = @strand {thread: "any"} start testSyncSendRecursively(0, 10000); diff --git a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal index 4f8de25f161d..da47ccb97d83 100644 --- a/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal +++ b/tests/jballerina-unit-test/src/test/resources/test-src/workers/workers_conditional_send.bal @@ -477,6 +477,7 @@ function testSendWithEarlyReturnError() { } public function testWorkerEarlyReturnWithinIf() { + int _ = foo(1); worker w1 { boolean b = true; if b { @@ -570,3 +571,7 @@ public function testWorkerEarlyReturnWithinIf() { wait w4; wait w5; } + +function foo(int i) returns int { + return i; +} From f50ebd7e947c98ba79da8117a4f531c562a6b757 Mon Sep 17 00:00:00 2001 From: gabilang Date: Tue, 21 May 2024 13:04:48 +0530 Subject: [PATCH 4/4] Get channel name by finding the match --- .../runtime/internal/scheduling/Strand.java | 1 - .../src/main/java/module-info.java | 2 +- .../compiler/bir/codegen/methodgen/MethodGen.java | 5 ----- .../langlib/internal/WorkerChannels.java | 14 +++++++++++++- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java index 4a967bf8ce36..df347a897fef 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/scheduling/Strand.java @@ -74,7 +74,6 @@ public class Strand { public Scheduler scheduler; public Strand parent; public WDChannels wdChannels; - public int wdChannelIndex; public FlushDetail flushDetail; public boolean blockedOnExtern; public Set channelDetails; diff --git a/bvm/ballerina-runtime/src/main/java/module-info.java b/bvm/ballerina-runtime/src/main/java/module-info.java index 857d248a3977..55746db34385 100644 --- a/bvm/ballerina-runtime/src/main/java/module-info.java +++ b/bvm/ballerina-runtime/src/main/java/module-info.java @@ -66,7 +66,7 @@ io.ballerina.lang.regexp; exports io.ballerina.runtime.internal.values to io.ballerina.testerina.core, io.ballerina.testerina.runtime, io.ballerina.lang.xml, org.ballerinalang.debugadapter.runtime, io.ballerina.lang.query, - io.ballerina.lang.function, io.ballerina.lang.regexp, io.ballerina.lang.value; + io.ballerina.lang.function, io.ballerina.lang.regexp, io.ballerina.lang.value, io.ballerina.lang.internal; exports io.ballerina.runtime.internal.configurable to io.ballerina.lang.internal; exports io.ballerina.runtime.internal.types to io.ballerina.lang.typedesc, io.ballerina.testerina.runtime, org.ballerinalang.debugadapter.runtime, io.ballerina.lang.function, io.ballerina.lang.regexp, io.ballerina.testerina.core; diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java index 6d25e56b495c..cf3003843da8 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/bir/codegen/methodgen/MethodGen.java @@ -161,7 +161,6 @@ public class MethodGen { protected static final String FUNCTION_INVOCATION = "functionInvocation"; private static final String INVOCATION_COUNT = "%invocationCount"; private static final String RESUME_INDEX = "resumeIndex"; - private static final String WD_CHANNEL_INDEX = "wdChannelIndex"; private final JvmPackageGen jvmPackageGen; private final SymbolTable symbolTable; private final Types types; @@ -438,10 +437,6 @@ private void setChannelDetailsToStrand(BIRFunction func, int localVarOffset, Met JvmCodeGenUtil.loadChannelDetails(mv, Arrays.asList(func.workerChannels), invocationVarIndex); mv.visitMethodInsn(INVOKEVIRTUAL, STRAND_CLASS, "updateChannelDetails", UPDATE_CHANNEL_DETAILS, false); - // Update wdChannelIndex of the strand. - mv.visitVarInsn(ALOAD, localVarOffset); - mv.visitVarInsn(ILOAD, invocationVarIndex); - mv.visitFieldInsn(PUTFIELD, STRAND_CLASS, WD_CHANNEL_INDEX, "I"); } private void checkStrandCancelled(MethodVisitor mv, int localVarOffset) { diff --git a/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java b/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java index e21196944c9b..4f53841fd416 100644 --- a/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java +++ b/langlib/lang.__internal/src/main/java/org/ballerinalang/langlib/internal/WorkerChannels.java @@ -21,6 +21,7 @@ import io.ballerina.runtime.internal.scheduling.Scheduler; import io.ballerina.runtime.internal.scheduling.Strand; import io.ballerina.runtime.internal.scheduling.WorkerDataChannel; +import io.ballerina.runtime.internal.values.ChannelDetails; import java.util.Objects; @@ -40,9 +41,20 @@ public static void autoClose(BString[] channelIds) { Strand currentStrand = Scheduler.getStrand(); Strand channelHoldingStrand = Objects.requireNonNullElse(currentStrand.parent, currentStrand); for (BString channelId : channelIds) { - String channelName = channelId.getValue() + ":" + currentStrand.wdChannelIndex; + String channelName = getMatchingChannelName(channelId.getValue(), currentStrand); WorkerDataChannel workerDataChannel = channelHoldingStrand.wdChannels.getWorkerDataChannel(channelName); workerDataChannel.autoClose(); } } + + private static String getMatchingChannelName(String channelId, Strand currentStrand) { + String channelName = null; + for (ChannelDetails channelDetail : currentStrand.channelDetails) { + if (channelDetail.name.contains(channelId)) { + channelName = channelDetail.name; + break; + } + } + return channelName; + } }