diff --git a/build.sbt b/build.sbt index 403800938..cdd4109b5 100644 --- a/build.sbt +++ b/build.sbt @@ -99,6 +99,7 @@ lazy val kyoJVM = project `kyo-scheduler-zio`.jvm, `kyo-scheduler-cats`.jvm, `kyo-scheduler-finagle`.jvm, + `kyo-scheduler-pekko`.jvm, `kyo-data`.jvm, `kyo-kernel`.jvm, `kyo-prelude`.jvm, @@ -200,6 +201,7 @@ lazy val `kyo-scheduler-zio` = sbtcrossproject.CrossProject("kyo-scheduler-zio", scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value, crossScalaVersions := List(scala3Version, scala212Version, scala213Version) ) + lazy val `kyo-scheduler-cats` = crossProject(JVMPlatform) .withoutSuffixFor(JVMPlatform) @@ -216,6 +218,23 @@ lazy val `kyo-scheduler-cats` = crossScalaVersions := List(scala3Version, scala212Version, scala213Version) ) +lazy val `kyo-scheduler-pekko` = + crossProject(JVMPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(CrossType.Full) + .dependsOn(`kyo-scheduler`) + .in(file("kyo-scheduler-pekko")) + .settings( + `kyo-settings`, + libraryDependencies += "org.apache.pekko" %%% "pekko-actor" % "1.1.3", + libraryDependencies += "org.apache.pekko" %%% "pekko-testkit" % "1.1.3" % Test + ) + .jvmSettings(mimaCheck(false)) + .settings( + scalacOptions ++= scalacOptionToken(ScalacOptions.source3).value, + crossScalaVersions := List(scala3Version, scala212Version, scala213Version) + ) + lazy val `kyo-scheduler-finagle` = crossProject(JVMPlatform) .withoutSuffixFor(JVMPlatform) diff --git a/kyo-scheduler-pekko/jvm/src/main/scala/kyo/scheduler/KyoExecutorServiceConfigurator.scala b/kyo-scheduler-pekko/jvm/src/main/scala/kyo/scheduler/KyoExecutorServiceConfigurator.scala new file mode 100644 index 000000000..11f8bc3c5 --- /dev/null +++ b/kyo-scheduler-pekko/jvm/src/main/scala/kyo/scheduler/KyoExecutorServiceConfigurator.scala @@ -0,0 +1,53 @@ +package kyo.scheduler + +import com.typesafe.config.Config +import java.util.concurrent.ExecutorService +import java.util.concurrent.ThreadFactory +import org.apache.pekko.dispatch.DispatcherPrerequisites +import org.apache.pekko.dispatch.ExecutorServiceConfigurator +import org.apache.pekko.dispatch.ExecutorServiceFactory + +/** A Pekko ExecutorServiceConfigurator that integrates Kyo's adaptive scheduling capabilities with Pekko's dispatcher system. The + * configurator enables Kyo's scheduler to handle all actor executions within your Pekko application, allowing it to make optimal thread + * utilization decisions by having full visibility of the workload. + * + * To use Kyo's scheduler in your Pekko application, configure it as the default dispatcher: + * + * {{{ + * pekko.actor.default-dispatcher { + * type = "Dispatcher" + * executor = "kyo.scheduler.KyoExecutorServiceConfigurator" + * } + * }}} + * + * The configurator uses Kyo's scheduler singleton instance, allowing it to share resources and optimization decisions across the entire + * application. By handling all actor executions, it can efficiently adapt to varying workloads and system conditions, optimizing thread + * utilization across your entire application. + * + * For effective load management, use Kyo's admission control through Scheduler.get.reject() methods at the boundaries of your application + * where external work enters the system. See the Admission class documentation for details on admission control behavior and + * configuration. + * + * @param config + * The dispatcher configuration from Pekko + * @param prerequisites + * Core Pekko prerequisites for dispatcher creation + * + * @see + * [[kyo.scheduler.Scheduler]] for details on the underlying scheduling capabilities, admisison `reject` methods, and available + * configurations. + * @see + * [[kyo.scheduler.regulator.Admission]] for details on admission control behavior + * @see + * [[org.apache.pekko.dispatch.ExecutorServiceConfigurator]] for the Pekko dispatcher interface + */ +class KyoExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) { + + override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory) = { + val exec = Scheduler.get.asExecutorService + new ExecutorServiceFactory { + def createExecutorService = exec + } + } +} diff --git a/kyo-scheduler-pekko/jvm/src/test/scala/kyo/scheduler/KyoExecutorServiceConfiguratorTest.scala b/kyo-scheduler-pekko/jvm/src/test/scala/kyo/scheduler/KyoExecutorServiceConfiguratorTest.scala new file mode 100644 index 000000000..09961e92d --- /dev/null +++ b/kyo-scheduler-pekko/jvm/src/test/scala/kyo/scheduler/KyoExecutorServiceConfiguratorTest.scala @@ -0,0 +1,87 @@ +package kyo.scheduler + +import com.typesafe.config.ConfigFactory +import java.util.concurrent.CountDownLatch +import org.apache.pekko.actor.Actor +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.actor.Props +import org.apache.pekko.pattern.ask +import org.apache.pekko.testkit.TestKit +import org.apache.pekko.testkit.TestProbe +import org.apache.pekko.util.Timeout +import org.scalatest.BeforeAndAfterAll +import org.scalatest.NonImplicitAssertions +import org.scalatest.freespec.AnyFreeSpecLike +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration.* + +class KyoExecutorServiceConfiguratorTest + extends TestKit(ActorSystem( + "KyoTest", + ConfigFactory.parseString(""" + pekko.actor.default-dispatcher { + type = "Dispatcher" + executor = "kyo.scheduler.KyoExecutorServiceConfigurator" + } + """) + )) + with AnyFreeSpecLike + with NonImplicitAssertions + with BeforeAndAfterAll { + + implicit def timeout: Timeout = Timeout(5.seconds) + implicit def execCtx: ExecutionContext = Scheduler.get.asExecutionContext + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "executes tasks on kyo threads" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case msg => sender() ! Thread.currentThread().getName + } + })) + + val threadName = Await.result((actor ? "test").mapTo[String], 5.seconds) + assert(threadName.contains("kyo")) + } + + "handles concurrent messages" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case msg => sender() ! Thread.currentThread().getName + } + })) + + val futures = (1 to 1000).map(_ => (actor ? "test").mapTo[String]) + val threadNames = Await.result(Future.sequence(futures), 5.seconds) + + assert(threadNames.forall(_.contains("kyo"))) + assert(threadNames.toSet.size > 1) + } + + "handles multiple actors" in { + val actors = + (1 to 10).map { i => + system.actorOf(Props(new Actor { + def receive = { + case msg => sender() ! Thread.currentThread().getName + } + })) + } + + val futures = + for { + actor <- actors + _ <- 1 to 10 + } yield (actor ? "test").mapTo[String] + + val threadNames = Await.result(Future.sequence(futures), 5.seconds) + assert(threadNames.forall(_.contains("kyo"))) + assert(threadNames.toSet.size > 1) + } +}