Skip to content

Commit

Permalink
[scheduler] pekko integration (#1032)
Browse files Browse the repository at this point in the history
Fixes #1028

---------

Co-authored-by: Ondra Pelech <[email protected]>
  • Loading branch information
fwbrasil and sideeffffect authored Jan 20, 2025
1 parent 2f07f80 commit af67df8
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
19 changes: 19 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ lazy val kyoJVM = project
`kyo-scheduler-zio`.jvm,
`kyo-scheduler-cats`.jvm,
`kyo-scheduler-finagle`.jvm,
`kyo-scheduler-pekko`.jvm,
`kyo-data`.jvm,
`kyo-kernel`.jvm,
`kyo-prelude`.jvm,
Expand Down Expand Up @@ -200,6 +201,7 @@ lazy val `kyo-scheduler-zio` = sbtcrossproject.CrossProject("kyo-scheduler-zio",
scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value,
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-cats` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
Expand All @@ -216,6 +218,23 @@ lazy val `kyo-scheduler-cats` =
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-pekko` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.dependsOn(`kyo-scheduler`)
.in(file("kyo-scheduler-pekko"))
.settings(
`kyo-settings`,
libraryDependencies += "org.apache.pekko" %%% "pekko-actor" % "1.1.3",
libraryDependencies += "org.apache.pekko" %%% "pekko-testkit" % "1.1.3" % Test
)
.jvmSettings(mimaCheck(false))
.settings(
scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value,
crossScalaVersions := List(scala3Version, scala212Version, scala213Version)
)

lazy val `kyo-scheduler-finagle` =
crossProject(JVMPlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kyo.scheduler

import com.typesafe.config.Config
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadFactory
import org.apache.pekko.dispatch.DispatcherPrerequisites
import org.apache.pekko.dispatch.ExecutorServiceConfigurator
import org.apache.pekko.dispatch.ExecutorServiceFactory

/** A Pekko ExecutorServiceConfigurator that integrates Kyo's adaptive scheduling capabilities with Pekko's dispatcher system. The
* configurator enables Kyo's scheduler to handle all actor executions within your Pekko application, allowing it to make optimal thread
* utilization decisions by having full visibility of the workload.
*
* To use Kyo's scheduler in your Pekko application, configure it as the default dispatcher:
*
* {{{
* pekko.actor.default-dispatcher {
* type = "Dispatcher"
* executor = "kyo.scheduler.KyoExecutorServiceConfigurator"
* }
* }}}
*
* The configurator uses Kyo's scheduler singleton instance, allowing it to share resources and optimization decisions across the entire
* application. By handling all actor executions, it can efficiently adapt to varying workloads and system conditions, optimizing thread
* utilization across your entire application.
*
* For effective load management, use Kyo's admission control through Scheduler.get.reject() methods at the boundaries of your application
* where external work enters the system. See the Admission class documentation for details on admission control behavior and
* configuration.
*
* @param config
* The dispatcher configuration from Pekko
* @param prerequisites
* Core Pekko prerequisites for dispatcher creation
*
* @see
* [[kyo.scheduler.Scheduler]] for details on the underlying scheduling capabilities, admisison `reject` methods, and available
* configurations.
* @see
* [[kyo.scheduler.regulator.Admission]] for details on admission control behavior
* @see
* [[org.apache.pekko.dispatch.ExecutorServiceConfigurator]] for the Pekko dispatcher interface
*/
class KyoExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ExecutorServiceConfigurator(config, prerequisites) {

override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory) = {
val exec = Scheduler.get.asExecutorService
new ExecutorServiceFactory {
def createExecutorService = exec
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package kyo.scheduler

import com.typesafe.config.ConfigFactory
import java.util.concurrent.CountDownLatch
import org.apache.pekko.actor.Actor
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.actor.Props
import org.apache.pekko.pattern.ask
import org.apache.pekko.testkit.TestKit
import org.apache.pekko.testkit.TestProbe
import org.apache.pekko.util.Timeout
import org.scalatest.BeforeAndAfterAll
import org.scalatest.NonImplicitAssertions
import org.scalatest.freespec.AnyFreeSpecLike
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.*

class KyoExecutorServiceConfiguratorTest
extends TestKit(ActorSystem(
"KyoTest",
ConfigFactory.parseString("""
pekko.actor.default-dispatcher {
type = "Dispatcher"
executor = "kyo.scheduler.KyoExecutorServiceConfigurator"
}
""")
))
with AnyFreeSpecLike
with NonImplicitAssertions
with BeforeAndAfterAll {

implicit def timeout: Timeout = Timeout(5.seconds)
implicit def execCtx: ExecutionContext = Scheduler.get.asExecutionContext

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

"executes tasks on kyo threads" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))

val threadName = Await.result((actor ? "test").mapTo[String], 5.seconds)
assert(threadName.contains("kyo"))
}

"handles concurrent messages" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))

val futures = (1 to 1000).map(_ => (actor ? "test").mapTo[String])
val threadNames = Await.result(Future.sequence(futures), 5.seconds)

assert(threadNames.forall(_.contains("kyo")))
assert(threadNames.toSet.size > 1)
}

"handles multiple actors" in {
val actors =
(1 to 10).map { i =>
system.actorOf(Props(new Actor {
def receive = {
case msg => sender() ! Thread.currentThread().getName
}
}))
}

val futures =
for {
actor <- actors
_ <- 1 to 10
} yield (actor ? "test").mapTo[String]

val threadNames = Await.result(Future.sequence(futures), 5.seconds)
assert(threadNames.forall(_.contains("kyo")))
assert(threadNames.toSet.size > 1)
}
}

0 comments on commit af67df8

Please sign in to comment.