Skip to content

Commit

Permalink
feat: Add support for switching scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 17, 2025
1 parent a75bc7a commit 09fb4fd
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory

import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }
import pekko.util.JavaVersion

object ForkJoinPoolVirtualThreadSpec {
val config = ConfigFactory.parseString("""
|virtual {
| task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 5
| fork-join-executor {
| parallelism-factor = 2
| parallelism-max = 2
| parallelism-min = 2
| virtualize = on
| }
| }
|}
""".stripMargin)

class ThreadNameActor extends Actor {

override def receive = {
case "ping" =>
sender() ! Thread.currentThread().getName
}
}

}

class ForkJoinPoolVirtualThreadSpec extends PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
import ForkJoinPoolVirtualThreadSpec._

"PekkoForkJoinPool" must {

"support virtualization with Virtual Thread" in {
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
for (_ <- 1 to 1000) {
actor ! "ping"
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }

object ThreadPoolVirtualThreadSpec {
val config = ConfigFactory.parseString("""
|pekko.actor.default-dispatcher.executor = "thread-pool-executor"
|virtual {
| task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 1
| thread-pool-executor {
| fixed-pool-size = 2
| virtualize = on
| }
| }
|}
""".stripMargin)

class ThreadNameActor extends Actor {

override def receive = {
case "ping" =>
sender() ! Thread.currentThread().getName
}
}

}

class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender {
import ThreadPoolVirtualThreadSpec._

"PekkoThreadPoolExecutor" must {

"support virtualization with Virtual Thread" in {
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
for (_ <- 1 to 1000) {
actor ! "ping"
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec {

override def receive = {
case "ping" =>
sender() ! "All fine"
sender() ! Thread.currentThread().getName
}
}

Expand All @@ -43,14 +43,14 @@ object VirtualThreadPoolDispatcherSpec {
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
import VirtualThreadPoolDispatcherSpec._

val Iterations = 1000

"VirtualThreadPool support" must {

"handle simple dispatch" in {
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher"))
innocentActor ! "ping"
expectMsg("All fine")
for (_ <- 1 to 1000) {
innocentActor ! "ping"
expectMsgPF() { case name: String => name should include("virtual-thread") }
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# `unapply` method is actually never used.
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.dispatch.ThreadPoolConfig.unapply")
12 changes: 12 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ pekko {
# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
maximum-pool-size = 32767

# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
virtualize = off
}

# This will be used if you have set "executor = "thread-pool-executor""
Expand Down Expand Up @@ -538,6 +544,12 @@ pekko {

# Allow core threads to time out
allow-core-timeout = on

# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
virtualize = off
}

# This will be used if you have set "executor = "virtual-thread-executor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
@unused prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import org.apache.pekko.util.Helpers.ConfigOps
val builder =
ThreadPoolConfigBuilder(ThreadPoolConfig())
ThreadPoolConfigBuilder(ThreadPoolConfig(virtualize = false))
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout"))
.configure(Some(config.getInt("task-queue-size")).flatMap {
Expand Down Expand Up @@ -475,6 +475,10 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
config.getInt("max-pool-size-max"))
else
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))

if (config.getBoolean("virtualize")) {
builder.setVirtualize(true)
} else builder
}

def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
this.getClass,
"PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
config.getString("id"))))
ThreadPoolConfig()
ThreadPoolConfig(virtualize = false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,28 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}

class ForkJoinExecutorServiceFactory(
val id: String,
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
val maxPoolSize: Int)
val maxPoolSize: Int,
val virtualize: Boolean)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
maxPoolSize: Int,
virtualize: Boolean) =
this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize)

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
asyncMode: Boolean,
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)

private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
Expand All @@ -116,12 +129,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)

def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invoke(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
def createExecutorService: ExecutorService = {
val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invoke(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
if (virtualize) {
new VirtualizedExecutorService(id, forkJoinPool)
} else {
forkJoinPool
}
}
}

Expand All @@ -143,12 +163,14 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}

new ForkJoinExecutorServiceFactory(
id,
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
config.getInt("maximum-pool-size"))
config.getInt("maximum-pool-size"),
config.getBoolean("virtualize"))
}
}
Loading

0 comments on commit 09fb4fd

Please sign in to comment.