diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 739bf26a5e..3050d70db5 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec { override def receive = { case "ping" => - sender() ! "All fine" + sender() ! Thread.currentThread().getName } } @@ -43,14 +43,16 @@ object VirtualThreadPoolDispatcherSpec { class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { import VirtualThreadPoolDispatcherSpec._ - val Iterations = 1000 - "VirtualThreadPool support" must { "handle simple dispatch" in { val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher")) - innocentActor ! "ping" - expectMsg("All fine") + for (_ <- 1 to 1000) { + innocentActor ! "ping" + expectMsgPF() { case name: String => + name should include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread-") + } + } } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 40c7b4a772..bb9b301eb3 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -417,7 +417,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis val tf: ThreadFactory = threadFactory match { case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => new ThreadFactory { - private val vtFactory = newVirtualThreadFactory(name) + private val vtFactory = newVirtualThreadFactory(name + "-" + id) override def newThread(r: Runnable): Thread = { val vt = vtFactory.newThread(r) @@ -426,7 +426,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis vt } } - case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name); + case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name + "-" + id); } new ExecutorServiceFactory { import VirtualThreadSupport._ diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala index 3a77789155..484e600fc3 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -68,6 +68,7 @@ private[dispatch] object VirtualThreadSupport { newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] } catch { case NonFatal(e) => + // --add-opens java.base/java.lang=ALL-UNNAMED throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) } } diff --git a/docs/src/main/paradox/dispatchers.md b/docs/src/main/paradox/dispatchers.md index 5e80480396..cac3d0f8f3 100644 --- a/docs/src/main/paradox/dispatchers.md +++ b/docs/src/main/paradox/dispatchers.md @@ -42,6 +42,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + @@@ Another example that uses the "thread-pool-executor": diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index 2347dc7300..b3546a4416 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -127,6 +127,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool will keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + @@@ @@@ note diff --git a/project/PekkoBuild.scala b/project/PekkoBuild.scala index 03052f9c6f..b18682d6df 100644 --- a/project/PekkoBuild.scala +++ b/project/PekkoBuild.scala @@ -290,6 +290,7 @@ object PekkoBuild { UsefulTask("testQuick", "Runs all the tests. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testOnly *.AnySpec", "Only run a selected test"), + UsefulTask("TestJdk9 / testOnly *.AnySpec", "Only run a Jdk9+ selected test"), UsefulTask("testQuick *.AnySpec", "Only run a selected test. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testQuickUntilPassed", "Runs all tests in a continuous loop until all tests pass"),