From 2c4288b17b4bcc80126af65bfd8cd29720f03121 Mon Sep 17 00:00:00 2001 From: johnhungerford Date: Fri, 31 Jan 2025 13:36:14 -0500 Subject: [PATCH 1/5] Overloaded Stream methods for pure/effectful transformations (#1045) Fixes #1026 ### Problem Stream transformation methods like `map`, `filter`, etc. currently use Kyo's effectful traverse utilities to support effectful transformations across chunks. This approach is not optimal for pure transformations, however, where each chunk could be more performantly transformed using Chunk's own methods. ### Solution This PR overloads a number of Stream methods to provide separate implementations for pure and effectful transformations. The performance improvement is reflected in `StreamBench`. ### Notes - In one case, `runFold`, a separate `runFoldKyo` method was added because overloading `runFold` interfered with type inference on the folding function. - In other cases, overloading required adding implicit evidence. In several of these cases (all except `map`), this evidence is (as far as I can tell) trivial. ### Checklist - [x] Unit test all changes - [x] Update scaladocs if needed - [x] Update the README if needed --------- Co-authored-by: Flavio Brasil --- README.md | 2 + .../scala/kyo/bench/arena/StreamBench.scala | 1 - .../scala/kyo/bench/arena/StreamIOBench.scala | 32 +++++ .../kyo/bench/arena/WarmupJITProfile.scala | 3 +- .../shared/src/main/scala/kyo/Stream.scala | 122 +++++++++++++++++- .../src/test/scala/kyo/StreamTest.scala | 10 +- .../src/test/scala/kyo/debug/DebugTest.scala | 4 +- 7 files changed, 163 insertions(+), 11 deletions(-) create mode 100644 kyo-bench/src/main/scala/kyo/bench/arena/StreamIOBench.scala diff --git a/README.md b/README.md index d7a232785..c10f62a31 100644 --- a/README.md +++ b/README.md @@ -1340,6 +1340,8 @@ val result: Chunk[String] < (Env[Config] & Async) = The `Stream` effect is useful for processing large amounts of data in a memory-efficient manner, as it allows for lazy evaluation and only keeps a small portion of the data in memory at any given time. It's also composable, allowing you to build complex data processing pipelines by chaining stream operations. +Note that a number of `Stream` methods (e.g., `map`, `filter`, `mapChunk`) are overloaded to provide different implementations for pure vs effectful transformations. This can make a big difference for performance, so take care that the functions you pass to these methods are typed to return pure values if they do not include effects. Unncecessarily lifting them to return `A < Any` will result in perfomance loss. + ### Var: Stateful Computations The `Var` effect allows for stateful computations, similar to the `State` monad. It enables the management of state within a computation in a purely functional manner. diff --git a/kyo-bench/src/main/scala/kyo/bench/arena/StreamBench.scala b/kyo-bench/src/main/scala/kyo/bench/arena/StreamBench.scala index 7cb506f4d..7dc1fa0ac 100644 --- a/kyo-bench/src/main/scala/kyo/bench/arena/StreamBench.scala +++ b/kyo-bench/src/main/scala/kyo/bench/arena/StreamBench.scala @@ -21,7 +21,6 @@ class StreamBench extends ArenaBench.SyncAndFork(25000000): .filter(_ % 2 == 0) .map(_ + 1) .runFold(0)(_ + _) - .eval end kyoBench def zioBench() = diff --git a/kyo-bench/src/main/scala/kyo/bench/arena/StreamIOBench.scala b/kyo-bench/src/main/scala/kyo/bench/arena/StreamIOBench.scala new file mode 100644 index 000000000..9fe2c5d22 --- /dev/null +++ b/kyo-bench/src/main/scala/kyo/bench/arena/StreamIOBench.scala @@ -0,0 +1,32 @@ +package kyo.bench.arena + +class StreamIOBench extends ArenaBench.SyncAndFork(25000000): + val seq = (0 until 10000).toVector + + def catsBench() = + import cats.effect.* + import fs2.* + Stream.emits(seq) + .evalFilter(v => IO(v % 2 == 0)) + .evalMap(v => IO(v + 1)) + .compile + .fold(0)(_ + _) + end catsBench + + def kyoBench() = + import kyo.* + Stream.init(seq) + .filter(v => IO(v % 2 == 0)) + .map(v => IO(v + 1)) + .runFold(0)(_ + _) + end kyoBench + + def zioBench() = + import zio.* + import zio.stream.* + ZStream.fromIterable(seq) + .filterZIO(v => ZIO.succeed(v % 2 == 0)) + .mapZIO(v => ZIO.succeed(v + 1)) + .runSum + end zioBench +end StreamIOBench diff --git a/kyo-bench/src/main/scala/kyo/bench/arena/WarmupJITProfile.scala b/kyo-bench/src/main/scala/kyo/bench/arena/WarmupJITProfile.scala index ced2bf235..d339ff30d 100644 --- a/kyo-bench/src/main/scala/kyo/bench/arena/WarmupJITProfile.scala +++ b/kyo-bench/src/main/scala/kyo/bench/arena/WarmupJITProfile.scala @@ -103,6 +103,7 @@ object WarmupJITProfile: new RandomBench, new SemaphoreBench, new StateMapBench, - new StreamBench + new StreamBench, + new StreamIOBench ) end WarmupJITProfile diff --git a/kyo-prelude/shared/src/main/scala/kyo/Stream.scala b/kyo-prelude/shared/src/main/scala/kyo/Stream.scala index 273b63606..f64e9a7f2 100644 --- a/kyo-prelude/shared/src/main/scala/kyo/Stream.scala +++ b/kyo-prelude/shared/src/main/scala/kyo/Stream.scala @@ -4,6 +4,7 @@ import kyo.Tag import kyo.kernel.ArrowEffect import scala.annotation.nowarn import scala.annotation.targetName +import scala.util.NotGiven /** Represents a stream of values of type `V` with effects of type `S`. * @@ -33,7 +34,28 @@ sealed abstract class Stream[V, -S]: def concat[S2](other: Stream[V, S2])(using Frame): Stream[V, S & S2] = Stream(emit.map(_ => other.emit)) - /** Transforms each value in the stream using the given function. + /** Transforms each value in the stream using the given pure function. + * + * @param f + * The function to apply to each value + * @return + * A new stream with transformed values + */ + def map[V2](f: V => V2)(using + t1: Tag[Emit[Chunk[V]]], + t2: Tag[Emit[Chunk[V2]]], + ev: NotGiven[V2 <:< (Any < Nothing)], + fr: Frame + ): Stream[V2, S] = + Stream[V2, S](ArrowEffect.handleState(t1, (), emit)( + [C] => + (input, _, cont) => + val c = input.map(f) + if c.isEmpty then ((), cont(())) + else Emit.valueWith(c)(((), cont(()))) + )) + + /** Transforms each value in the stream using the given effectful function. * * @param f * The function to apply to each value @@ -43,7 +65,33 @@ sealed abstract class Stream[V, -S]: def map[V2, S2](f: V => V2 < S2)(using Tag[Emit[Chunk[V]]], Tag[Emit[Chunk[V2]]], Frame): Stream[V2, S & S2] = mapChunk(c => Kyo.foreach(c)(f)) - /** Transforms each chunk in the stream using the given function. + /** Transforms each chunk in the stream using the given pure function. + * + * @param f + * The function to apply to each chunk + * @return + * A new stream with transformed chunks + */ + def mapChunk[V2](f: Chunk[V] => Seq[V2])( + using + tagV: Tag[Emit[Chunk[V]]], + tagV2: Tag[Emit[Chunk[V2]]], + discr: Stream.Dummy, + frame: Frame + ): Stream[V2, S] = + Stream[V2, S](ArrowEffect.handleState(tagV, (), emit)( + [C] => + (input, _, cont) => + if input.isEmpty then ((), cont(())) + else + val s = f(input) + if s.isEmpty then ((), cont(())) + else + Emit.valueWith(Chunk.from(s))(((), cont(()))) + end if + )) + + /** Transforms each chunk in the stream using the given effectful function. * * @param f * The function to apply to each chunk @@ -194,10 +242,33 @@ sealed abstract class Stream[V, -S]: else Emit.valueWith(c)((0, cont(()))) )) - /** Takes elements from the stream while the predicate is true. + /** Takes elements from the stream while the pure predicate is true. * * @param f - * The predicate function + * The pure predicate function + * @return + * A new stream containing elements that satisfy the predicate + */ + def takeWhile(f: V => Boolean)(using + tag: Tag[Emit[Chunk[V]]], + discr: Stream.Dummy, + frame: Frame + ): Stream[V, S] = + Stream[V, S](ArrowEffect.handleState(tag, true, emit)( + [C] => + (input, state, cont) => + if !state then (false, Kyo.pure[Unit, Emit[Chunk[V]] & S](())) + else if input.isEmpty then (state, cont(())) + else + val c = input.takeWhile(f) + Emit.valueWith(c)((c.size == input.size, cont(()))) + )) + end takeWhile + + /** Takes elements from the stream while the effectful predicate is true. + * + * @param f + * The effectful predicate function * @return * A new stream containing elements that satisfy the predicate */ @@ -250,6 +321,19 @@ sealed abstract class Stream[V, -S]: } )) + def filter(f: V => Boolean)(using + tag: Tag[Emit[Chunk[V]]], + discr: Flat[Boolean], + frame: Frame + ): Stream[V, S] = + Stream[V, S](ArrowEffect.handleState(tag, (), emit)( + [C] => + (input, _, cont) => + val c = input.filter(f) + if c.isEmpty then ((), cont(())) + else Emit.valueWith(c)(((), cont(()))) + )) + /** Emits only elements that are different from their predecessor. * * @return @@ -362,7 +446,27 @@ sealed abstract class Stream[V, -S]: cont(()) ) - /** Runs the stream and folds over its values using the given function and initial accumulator. + /** Runs the stream and folds over its values using the given pure function and initial accumulator. + * + * @param acc + * The initial accumulator value + * @param f + * The folding function + * @return + * The final accumulated value + */ + def runFold[A](acc: A)(f: (A, V) => A)(using + tag: Tag[Emit[Chunk[V]]], + frame: Frame + ): A < S = + ArrowEffect.handleState(tag, acc, emit)( + handle = [C] => + (input, state, cont) => + (input.foldLeft(state)(f), cont(())), + done = (state, _) => state + ) + + /** Runs the stream and folds over its values using the given effectful function and initial accumulator. * * @param acc * The initial accumulator value @@ -371,7 +475,7 @@ sealed abstract class Stream[V, -S]: * @return * The final accumulated value */ - def runFold[A, S2](acc: A)(f: (A, V) => A < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): A < (S & S2) = + def runFoldKyo[A, S2](acc: A)(f: (A, V) => A < S2)(using tag: Tag[Emit[Chunk[V]]], frame: Frame): A < (S & S2) = ArrowEffect.handleState(tag, acc, emit)( handle = [C] => (input, state, cont) => @@ -468,4 +572,10 @@ object Stream: } } + /** A dummy type that can be used as implicit evidence to help the compiler discriminate between overloaded methods. + */ + sealed class Dummy + object Dummy: + given Dummy = new Dummy {} + end Stream diff --git a/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala b/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala index cd7371f46..fc08ada45 100644 --- a/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala +++ b/kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala @@ -332,6 +332,14 @@ class StreamTest extends Test: n / 2 ) } + + "with effects" in { + def predicate(i: Int) = Var.get[Boolean].map(b => Var.set(!b).andThen(b && !(i % 3 == 0))) + val result = Var.run(false)(Stream.init(1 to n).filter(predicate).run).eval + assert( + result.size > 0 && result.forall(_ % 2 == 0) && result.forall(i => !(i % 3 == 0)) + ) + } } "changes" - { @@ -781,7 +789,7 @@ class StreamTest extends Test: "early termination" in { val stream = Stream.init(Seq(1, 2, 3, 4, 5)) - val folded = stream.runFold(0) { (acc, v) => + val folded = stream.runFoldKyo(0) { (acc, v) => if acc < 6 then acc + v else Abort.fail(()) } assert(Abort.run[Unit](folded).eval.foldError(_ => true)(_ => false)) diff --git a/kyo-prelude/shared/src/test/scala/kyo/debug/DebugTest.scala b/kyo-prelude/shared/src/test/scala/kyo/debug/DebugTest.scala index b4ccc5c45..573287f02 100644 --- a/kyo-prelude/shared/src/test/scala/kyo/debug/DebugTest.scala +++ b/kyo-prelude/shared/src/test/scala/kyo/debug/DebugTest.scala @@ -170,10 +170,10 @@ class DebugTest extends Test: "with Stream JS" taggedAs jsOnly in testOutput( - "DebugTest.scala:55:36", - "true", "DebugTest.scala:54:28", "undefined", + "DebugTest.scala:55:36", + "Seq(Seq(6))", "DebugTest.scala:56:21", "Seq(6)" ) { From 46633d579ebf1cbd0dfd5b6ef5339797fb341b4a Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Fri, 31 Jan 2025 11:54:35 -0800 Subject: [PATCH 2/5] [build] disable Scala Native (#1063) ### Problem We've been having consistent issues with the Scala Native build timing out. I've been debugging this issue for some time and trying several changes but I haven't had success so far. It seems related to concurrency issues in Scala Native's test runner. ### Solution Since this is significantly affecting Kyo's development, I think we we could disable the native build for now. I'll dust off an old linux box I have to try to reproduce it locally. --- .github/workflows/build-main.yml | 4 ++-- .github/workflows/build-pr.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-main.yml b/.github/workflows/build-main.yml index 0be46c277..898d5f6ae 100644 --- a/.github/workflows/build-main.yml +++ b/.github/workflows/build-main.yml @@ -29,5 +29,5 @@ jobs: - name: Build JS run: sbt '+kyoJS/test' - - name: Build Native - run: sbt '+kyoNative/test' \ No newline at end of file + # - name: Build Native + # run: sbt '+kyoNative/test' \ No newline at end of file diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index bb3dfe4d6..5d7f2fef6 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -29,5 +29,5 @@ jobs: - name: Build JS run: sbt '+kyoJS/testQuick' - - name: Build Native - run: sbt '+kyoNative/testQuick' \ No newline at end of file + # - name: Build Native + # run: sbt '+kyoNative/testQuick' \ No newline at end of file From 617ca5d8424d8dde66baf8476bc3b95e1439dbbe Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Fri, 31 Jan 2025 12:13:22 -0800 Subject: [PATCH 3/5] fix flaky tests --- kyo-core/shared/src/test/scala/kyo/HubTest.scala | 4 +++- .../shared/src/test/scala/kyo/internal/KyoSttpMonadTest.scala | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/kyo-core/shared/src/test/scala/kyo/HubTest.scala b/kyo-core/shared/src/test/scala/kyo/HubTest.scala index 36cd0b90e..54bc9e311 100644 --- a/kyo-core/shared/src/test/scala/kyo/HubTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/HubTest.scala @@ -171,7 +171,9 @@ class HubTest extends Test: f <- Async.run(l.stream(2).mapChunk(Chunk(_)).take(2).run) _ <- Kyo.foreachDiscard(1 to 4)(h.put) r <- f.get - yield assert(r == Chunk(Chunk(1, 2), Chunk(3, 4))) + yield + assert(r.forall(_.size <= 2)) + assert(r.flattenChunk == Chunk(1, 2, 3, 4)) } "stream handles rapid publish-consume cycles" in run { diff --git a/kyo-sttp/shared/src/test/scala/kyo/internal/KyoSttpMonadTest.scala b/kyo-sttp/shared/src/test/scala/kyo/internal/KyoSttpMonadTest.scala index 311092703..b83c9b5bf 100644 --- a/kyo-sttp/shared/src/test/scala/kyo/internal/KyoSttpMonadTest.scala +++ b/kyo-sttp/shared/src/test/scala/kyo/internal/KyoSttpMonadTest.scala @@ -98,6 +98,7 @@ class KyoSttpMonadTest extends Test: } } _ <- started.await + _ <- Async.sleep(10.millis) interrupted <- fiber.interrupt _ <- cancelled.await result <- fiber.getResult From 724e9fe63516520cd973a0420f5f722d2b1700e8 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Fri, 31 Jan 2025 13:05:44 -0800 Subject: [PATCH 4/5] [kernel] Loop optimizations (#1061) ### Problem I'm taking a look at profiling sessions for Kyo's benchmarks and noticed that `Loop` is getting the `Safepoint` thread local on each iteration because of the `run` call being in a default parameter. The compiler generates a method without the `Safepoint` evidence. ### Solution Restructure methods to avoid the default parameter. --- .../src/main/scala/kyo/kernel/Loop.scala | 140 +++++++++++------- .../src/test/scala/kyo/kernel/LoopTest.scala | 55 +++++++ 2 files changed, 142 insertions(+), 53 deletions(-) diff --git a/kyo-kernel/shared/src/main/scala/kyo/kernel/Loop.scala b/kyo-kernel/shared/src/main/scala/kyo/kernel/Loop.scala index 02ef15c88..a276cfcb9 100644 --- a/kyo-kernel/shared/src/main/scala/kyo/kernel/Loop.scala +++ b/kyo-kernel/shared/src/main/scala/kyo/kernel/Loop.scala @@ -253,18 +253,18 @@ object Loop: safepoint: Safepoint ): O < S = @nowarn("msg=anonymous") - @tailrec def loop(i1: A)(v: Outcome[A, O] < S = run(i1))(using Safepoint): O < S = + @tailrec def loop(v: Outcome[A, O] < S)(using Safepoint): O < S = v match case next: Continue[A] @unchecked => - loop(next._1)() + loop(run(next._1)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[A, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(i1)(kyo(v, context)) + loop(kyo(v, context)) case res => res.asInstanceOf[O] - loop(input)() + loop(Loop.continue(input)) end apply /** Executes a loop with two state values. @@ -287,18 +287,18 @@ object Loop: safepoint: Safepoint ): O < S = @nowarn("msg=anonymous") - @tailrec def loop(i1: A, i2: B)(v: Outcome2[A, B, O] < S = run(i1, i2))(using Safepoint): O < S = + @tailrec def loop(v: Outcome2[A, B, O] < S)(using Safepoint): O < S = v match case next: Continue2[A, B] @unchecked => - loop(next._1, next._2)() + loop(run(next._1, next._2)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome2[A, B, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(i1, i2)(kyo(v, context)) + loop(kyo(v, context)) case res => res.asInstanceOf[O] - loop(input1, input2)() + loop(Loop.continue(input1, input2)) end apply /** Executes a loop with three state values. @@ -321,18 +321,18 @@ object Loop: inline run: Safepoint ?=> (A, B, C) => Outcome3[A, B, C, O] < S )(using inline _frame: Frame, safepoint: Safepoint): O < S = @nowarn("msg=anonymous") - @tailrec def loop(i1: A, i2: B, i3: C)(v: Outcome3[A, B, C, O] < S = run(i1, i2, i3))(using Safepoint): O < S = + @tailrec def loop(v: Outcome3[A, B, C, O] < S)(using Safepoint): O < S = v match case next: Continue3[A, B, C] @unchecked => - loop(next._1, next._2, next._3)() + loop(run(next._1, next._2, next._3)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome3[A, B, C, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(i1, i2, i3)(kyo(v, context)) + loop(kyo(v, context)) case res => res.asInstanceOf[O] - loop(input1, input2, input3)() + loop(Loop.continue(input1, input2, input3)) end apply /** Executes a loop with four state values. @@ -357,18 +357,18 @@ object Loop: inline run: Safepoint ?=> (A, B, C, D) => Outcome4[A, B, C, D, O] < S )(using inline _frame: Frame, safepoint: Safepoint): O < S = @nowarn("msg=anonymous") - @tailrec def loop(i1: A, i2: B, i3: C, i4: D)(v: Outcome4[A, B, C, D, O] < S = run(i1, i2, i3, i4))(using Safepoint): O < S = + @tailrec def loop(v: Outcome4[A, B, C, D, O] < S)(using Safepoint): O < S = v match case next: Continue4[A, B, C, D] @unchecked => - loop(next._1, next._2, next._3, next._4)() + loop(run(next._1, next._2, next._3, next._4)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome4[A, B, C, D, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(i1, i2, i3, i4)(kyo(v, context)) + loop(kyo(v, context)) case res => res.asInstanceOf[O] - loop(input1, input2, input3, input4)() + loop(Loop.continue(input1, input2, input3, input4)) end apply /** Executes an indexed loop without state values. @@ -387,10 +387,10 @@ object Loop: safepoint: Safepoint ): O < S = @nowarn("msg=anonymous") - @tailrec def loop(idx: Int)(v: Outcome[Unit, O] < S = run(idx))(using Safepoint): O < S = + @tailrec def loop(idx: Int)(v: Outcome[Unit, O] < S)(using Safepoint): O < S = v match case next: Continue[Unit] @unchecked => - loop(idx + 1)() + loop(idx + 1)(run(idx)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame @@ -398,7 +398,7 @@ object Loop: loop(idx)(kyo(v, context)) case res => res.asInstanceOf[O] - loop(0)() + loop(0)(Loop.continue) end indexed /** Executes an indexed loop with a single state value. @@ -418,18 +418,18 @@ object Loop: safepoint: Safepoint ): O < S = @nowarn("msg=anonymous") - @tailrec def loop(idx: Int, i1: A)(v: Outcome[A, O] < S = run(idx, i1))(using Safepoint): O < S = + @tailrec def loop(idx: Int)(v: Outcome[A, O] < S)(using Safepoint): O < S = v match case next: Continue[A] @unchecked => - loop(idx + 1, next._1)() + loop(idx + 1)(run(idx, next._1)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[A, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(idx, i1)(kyo(v, context)) + loop(idx)(kyo(v, context)) case res => res.asInstanceOf[O] - loop(0, input)() + loop(0)(Loop.continue(input)) end indexed /** Executes an indexed loop with two state values. @@ -450,18 +450,18 @@ object Loop: inline run: Safepoint ?=> (Int, A, B) => Outcome2[A, B, O] < S )(using inline _frame: Frame, safepoint: Safepoint): O < S = @nowarn("msg=anonymous") - @tailrec def loop(idx: Int, i1: A, i2: B)(v: Outcome2[A, B, O] < S = run(idx, i1, i2))(using Safepoint): O < S = + @tailrec def loop(idx: Int)(v: Outcome2[A, B, O] < S)(using Safepoint): O < S = v match case next: Continue2[A, B] @unchecked => - loop(idx + 1, next._1, next._2)() + loop(idx + 1)(run(idx, next._1, next._2)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome2[A, B, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(idx, i1, i2)(kyo(v, context)) + loop(idx)(kyo(v, context)) case res => res.asInstanceOf[O] - loop(0, input1, input2)() + loop(0)(Loop.continue(input1, input2)) end indexed /** Executes an indexed loop with three state values. @@ -484,18 +484,18 @@ object Loop: inline run: Safepoint ?=> (Int, A, B, C) => Outcome3[A, B, C, O] < S )(using inline _frame: Frame, safepoint: Safepoint): O < S = @nowarn("msg=anonymous") - @tailrec def loop(idx: Int, i1: A, i2: B, i3: C)(v: Outcome3[A, B, C, O] < S = run(idx, i1, i2, i3))(using Safepoint): O < S = + @tailrec def loop(idx: Int)(v: Outcome3[A, B, C, O] < S)(using Safepoint): O < S = v match case next: Continue3[A, B, C] @unchecked => - loop(idx + 1, next._1, next._2, next._3)() + loop(idx + 1)(run(idx, next._1, next._2, next._3)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome3[A, B, C, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(idx, i1, i2, i3)(kyo(v, context)) + loop(idx)(kyo(v, context)) case res => res.asInstanceOf[O] - loop(0, input1, input2, input3)() + loop(0)(Loop.continue(input1, input2, input3)) end indexed /** Executes an indexed loop with four state values. @@ -520,19 +520,18 @@ object Loop: inline run: Safepoint ?=> (Int, A, B, C, D) => Outcome4[A, B, C, D, O] < S )(using inline _frame: Frame, safepoint: Safepoint): O < S = @nowarn("msg=anonymous") - @tailrec def loop(idx: Int, i1: A, i2: B, i3: C, i4: D)(v: Outcome4[A, B, C, D, O] < S = - run(idx, i1, i2, i3, i4))(using Safepoint): O < S = + @tailrec def loop(idx: Int)(v: Outcome4[A, B, C, D, O] < S)(using Safepoint): O < S = v match case next: Continue4[A, B, C, D] @unchecked => - loop(idx + 1, next._1, next._2, next._3, next._4)() + loop(idx + 1)(run(idx, next._1, next._2, next._3, next._4)) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome4[A, B, C, D, O], S] @unchecked => new KyoContinue[IX, OX, EX, Any, O, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = - loop(idx, i1, i2, i3, i4)(kyo(v, context)) + loop(idx)(kyo(v, context)) case res => res.asInstanceOf[O] - loop(0, input1, input2, input3, input4)() + loop(0)(Loop.continue(input1, input2, input3, input4)) end indexed /** Executes a loop that continues until explicitly completed. @@ -547,10 +546,10 @@ object Loop: */ inline def foreach[S](inline run: Safepoint ?=> Outcome[Unit, Unit] < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S = @nowarn("msg=anonymous") - @tailrec def loop(v: Outcome[Unit, Unit] < S = run)(using Safepoint): Unit < S = + @tailrec def loop(v: Outcome[Unit, Unit] < S)(using Safepoint): Unit < S = v match case next: Continue[Unit] @unchecked => - loop() + loop(run) case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, Unit], S] @unchecked => new KyoContinue[IX, OX, EX, Any, Unit, S](kyo): def frame = _frame @@ -558,7 +557,7 @@ object Loop: loop(kyo(v, context)) case res => () - loop() + loop(Loop.continue) end foreach /** Repeats an operation a specified number of times. @@ -575,21 +574,20 @@ object Loop: */ inline def repeat[S](n: Int)(inline run: Safepoint ?=> Unit < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S = @nowarn("msg=anonymous") - @tailrec def loop(i: Int)(v: Outcome[Unit, Unit] < S = run)(using Safepoint): Unit < S = - if i == n then () + @tailrec def loop(i: Int)(v: Unit < S)(using Safepoint): Unit < S = + if i > n then () else v match - case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, Unit], S] @unchecked => + case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked => new KyoContinue[IX, OX, EX, Any, Unit, S](kyo): def frame = _frame def apply(v: OX[Any], context: Context)(using Safepoint) = loop(i)(kyo(v, context)) - end new case _ => - loop(i + 1)() + loop(i + 1)(run) end if end loop - loop(0)() + loop(0)(()) end repeat /** Executes a loop indefinitely until explicitly terminated. @@ -602,14 +600,50 @@ object Loop: * @return * Nothing, as this loop runs forever unless interrupted */ - inline def forever[S](inline run: Safepoint ?=> Unit < S)(using Frame, Safepoint): Unit < S = - def _loop(): Unit < S = loop() - @tailrec def loop(): Unit < S = - run match - case kyo: Kyo[Unit, S] @unchecked => - kyo.andThen(_loop()) + inline def forever[S](inline run: Safepoint ?=> Unit < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S = + @nowarn("msg=anonymous") + @tailrec def loop(v: Unit < S)(using Safepoint): Unit < S = + v match + case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked => + new KyoContinue[IX, OX, EX, Any, Unit, S](kyo): + def frame = _frame + def apply(v: OX[Any], context: Context)(using Safepoint) = + loop(kyo(v, context)) case _ => - loop() - loop() + loop(run) + end loop + loop(()) end forever + + /** Executes an operation repeatedly while a condition remains true. + * + * @param condition + * The condition to check before each iteration. The loop continues while this is true. + * @param run + * The operation to execute in each iteration + * @return + * Unit after the loop completes + */ + inline def whileTrue[S](inline condition: Safepoint ?=> Boolean < S)(inline run: Unit < S)( + using + inline _frame: Frame, + safepoint: Safepoint + ): Unit < S = + @nowarn("msg=anonymous") + def loop(v: Unit < S)(using Safepoint): Unit < S = + condition.map { + case true => + v match + case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked => + new KyoContinue[IX, OX, EX, Any, Unit, S](kyo): + def frame = _frame + def apply(v: OX[Any], context: Context)(using Safepoint) = + loop(kyo(v, context)) + case _ => + loop(run) + case false => () + } + end loop + loop(()) + end whileTrue end Loop diff --git a/kyo-kernel/shared/src/test/scala/kyo/kernel/LoopTest.scala b/kyo-kernel/shared/src/test/scala/kyo/kernel/LoopTest.scala index 753874652..631bf00c6 100644 --- a/kyo-kernel/shared/src/test/scala/kyo/kernel/LoopTest.scala +++ b/kyo-kernel/shared/src/test/scala/kyo/kernel/LoopTest.scala @@ -674,4 +674,59 @@ class LoopTest extends Test: typeCheckFailure("implicitly[Flat[Loop.Outcome4[Int, String, Boolean, Double, Int < Any]]]")(error) } } + + "whileTrue" - { + "continues while condition is true" in { + var counter = 0 + Loop.whileTrue(counter < 5) { + counter += 1 + }.eval + assert(counter == 5) + } + + "does not execute when condition is initially false" in { + var executed = false + Loop.whileTrue(false) { + executed = true + }.eval + assert(!executed) + } + + "with suspended condition" in { + var counter = 0 + val result = Loop.whileTrue(Effect.defer(counter < 3)) { + counter += 1 + } + result.eval + assert(counter == 3) + } + + "with suspended body" in { + var counter = 0 + val result = Loop.whileTrue(counter < 3) { + Effect.defer(counter += 1) + } + result.eval + assert(counter == 3) + } + + "stack safety" in { + var counter = 0 + val largeNumber = 100000 + Loop.whileTrue(counter < largeNumber) { + counter += 1 + }.eval + assert(counter == largeNumber) + } + + "stack safety with suspended operations" in { + var counter = 0 + val largeNumber = 10000 + val result = Loop.whileTrue(Effect.defer(counter < largeNumber)) { + Effect.defer(counter += 1) + } + result.eval + assert(counter == largeNumber) + } + } end LoopTest From 5f1d70a990555d41886ea00a59f7034b9d866dbb Mon Sep 17 00:00:00 2001 From: Ondra Pelech Date: Sat, 1 Feb 2025 02:18:20 +0100 Subject: [PATCH 5/5] Enable Scala Native compilation (tests are still off) (#1064) Relates to issue * https://github.com/getkyo/kyo/pull/1063 Hello @fwbrasil , I see that Scala Native got disabled. It sucks that SN tests were causing issues, but oh well, such is life with bleeding edge tech, like SN. But if it were only tests which had problems, does that mean that we could still keep SN compilation? If we keep SN compilation in CI, we could prevent potential incompatibilities from getting merged. --- .github/workflows/build-main.yml | 4 ++-- .github/workflows/build-pr.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-main.yml b/.github/workflows/build-main.yml index 898d5f6ae..77c14f8e0 100644 --- a/.github/workflows/build-main.yml +++ b/.github/workflows/build-main.yml @@ -29,5 +29,5 @@ jobs: - name: Build JS run: sbt '+kyoJS/test' - # - name: Build Native - # run: sbt '+kyoNative/test' \ No newline at end of file + - name: Build Native + run: sbt '+kyoNative/Test/compile' # test diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index 5d7f2fef6..f319bc536 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -29,5 +29,5 @@ jobs: - name: Build JS run: sbt '+kyoJS/testQuick' - # - name: Build Native - # run: sbt '+kyoNative/testQuick' \ No newline at end of file + - name: Build Native + run: sbt '+kyoNative/Test/compile' # testQuick