Skip to content

Commit

Permalink
Merge branch 'main' into rename-stream-method
Browse files Browse the repository at this point in the history
  • Loading branch information
c0d33ngr authored Feb 1, 2025
2 parents 21fbcb6 + 5f1d70a commit 1b1ff70
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ jobs:
run: sbt '+kyoJS/test'

- name: Build Native
run: sbt '+kyoNative/test'
run: sbt '+kyoNative/Test/compile' # test
2 changes: 1 addition & 1 deletion .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ jobs:
run: sbt '+kyoJS/testQuick'

- name: Build Native
run: sbt '+kyoNative/testQuick'
run: sbt '+kyoNative/Test/compile' # testQuick
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,8 @@ val result: Chunk[String] < (Env[Config] & Async) =

The `Stream` effect is useful for processing large amounts of data in a memory-efficient manner, as it allows for lazy evaluation and only keeps a small portion of the data in memory at any given time. It's also composable, allowing you to build complex data processing pipelines by chaining stream operations.

Note that a number of `Stream` methods (e.g., `map`, `filter`, `mapChunk`) are overloaded to provide different implementations for pure vs effectful transformations. This can make a big difference for performance, so take care that the functions you pass to these methods are typed to return pure values if they do not include effects. Unncecessarily lifting them to return `A < Any` will result in perfomance loss.

### Var: Stateful Computations

The `Var` effect allows for stateful computations, similar to the `State` monad. It enables the management of state within a computation in a purely functional manner.
Expand Down
1 change: 0 additions & 1 deletion kyo-bench/src/main/scala/kyo/bench/arena/StreamBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class StreamBench extends ArenaBench.SyncAndFork(25000000):
.filter(_ % 2 == 0)
.map(_ + 1)
.runFold(0)(_ + _)
.eval
end kyoBench

def zioBench() =
Expand Down
32 changes: 32 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/arena/StreamIOBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kyo.bench.arena

class StreamIOBench extends ArenaBench.SyncAndFork(25000000):
val seq = (0 until 10000).toVector

def catsBench() =
import cats.effect.*
import fs2.*
Stream.emits(seq)
.evalFilter(v => IO(v % 2 == 0))
.evalMap(v => IO(v + 1))
.compile
.fold(0)(_ + _)
end catsBench

def kyoBench() =
import kyo.*
Stream.init(seq)
.filter(v => IO(v % 2 == 0))
.map(v => IO(v + 1))
.runFold(0)(_ + _)
end kyoBench

def zioBench() =
import zio.*
import zio.stream.*
ZStream.fromIterable(seq)
.filterZIO(v => ZIO.succeed(v % 2 == 0))
.mapZIO(v => ZIO.succeed(v + 1))
.runSum
end zioBench
end StreamIOBench
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object WarmupJITProfile:
new RandomBench,
new SemaphoreBench,
new StateMapBench,
new StreamBench
new StreamBench,
new StreamIOBench
)
end WarmupJITProfile
4 changes: 3 additions & 1 deletion kyo-core/shared/src/test/scala/kyo/HubTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ class HubTest extends Test:
f <- Async.run(l.stream(2).mapChunk(Chunk(_)).take(2).run)
_ <- Kyo.foreachDiscard(1 to 4)(h.put)
r <- f.get
yield assert(r == Chunk(Chunk(1, 2), Chunk(3, 4)))
yield
assert(r.forall(_.size <= 2))
assert(r.flattenChunk == Chunk(1, 2, 3, 4))
}

"stream handles rapid publish-consume cycles" in run {
Expand Down
140 changes: 87 additions & 53 deletions kyo-kernel/shared/src/main/scala/kyo/kernel/Loop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,18 @@ object Loop:
safepoint: Safepoint
): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(i1: A)(v: Outcome[A, O] < S = run(i1))(using Safepoint): O < S =
@tailrec def loop(v: Outcome[A, O] < S)(using Safepoint): O < S =
v match
case next: Continue[A] @unchecked =>
loop(next._1)()
loop(run(next._1))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[A, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(i1)(kyo(v, context))
loop(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(input)()
loop(Loop.continue(input))
end apply

/** Executes a loop with two state values.
Expand All @@ -287,18 +287,18 @@ object Loop:
safepoint: Safepoint
): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(i1: A, i2: B)(v: Outcome2[A, B, O] < S = run(i1, i2))(using Safepoint): O < S =
@tailrec def loop(v: Outcome2[A, B, O] < S)(using Safepoint): O < S =
v match
case next: Continue2[A, B] @unchecked =>
loop(next._1, next._2)()
loop(run(next._1, next._2))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome2[A, B, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(i1, i2)(kyo(v, context))
loop(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(input1, input2)()
loop(Loop.continue(input1, input2))
end apply

/** Executes a loop with three state values.
Expand All @@ -321,18 +321,18 @@ object Loop:
inline run: Safepoint ?=> (A, B, C) => Outcome3[A, B, C, O] < S
)(using inline _frame: Frame, safepoint: Safepoint): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(i1: A, i2: B, i3: C)(v: Outcome3[A, B, C, O] < S = run(i1, i2, i3))(using Safepoint): O < S =
@tailrec def loop(v: Outcome3[A, B, C, O] < S)(using Safepoint): O < S =
v match
case next: Continue3[A, B, C] @unchecked =>
loop(next._1, next._2, next._3)()
loop(run(next._1, next._2, next._3))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome3[A, B, C, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(i1, i2, i3)(kyo(v, context))
loop(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(input1, input2, input3)()
loop(Loop.continue(input1, input2, input3))
end apply

/** Executes a loop with four state values.
Expand All @@ -357,18 +357,18 @@ object Loop:
inline run: Safepoint ?=> (A, B, C, D) => Outcome4[A, B, C, D, O] < S
)(using inline _frame: Frame, safepoint: Safepoint): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(i1: A, i2: B, i3: C, i4: D)(v: Outcome4[A, B, C, D, O] < S = run(i1, i2, i3, i4))(using Safepoint): O < S =
@tailrec def loop(v: Outcome4[A, B, C, D, O] < S)(using Safepoint): O < S =
v match
case next: Continue4[A, B, C, D] @unchecked =>
loop(next._1, next._2, next._3, next._4)()
loop(run(next._1, next._2, next._3, next._4))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome4[A, B, C, D, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(i1, i2, i3, i4)(kyo(v, context))
loop(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(input1, input2, input3, input4)()
loop(Loop.continue(input1, input2, input3, input4))
end apply

/** Executes an indexed loop without state values.
Expand All @@ -387,18 +387,18 @@ object Loop:
safepoint: Safepoint
): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(idx: Int)(v: Outcome[Unit, O] < S = run(idx))(using Safepoint): O < S =
@tailrec def loop(idx: Int)(v: Outcome[Unit, O] < S)(using Safepoint): O < S =
v match
case next: Continue[Unit] @unchecked =>
loop(idx + 1)()
loop(idx + 1)(run(idx))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(idx)(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(0)()
loop(0)(Loop.continue)
end indexed

/** Executes an indexed loop with a single state value.
Expand All @@ -418,18 +418,18 @@ object Loop:
safepoint: Safepoint
): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(idx: Int, i1: A)(v: Outcome[A, O] < S = run(idx, i1))(using Safepoint): O < S =
@tailrec def loop(idx: Int)(v: Outcome[A, O] < S)(using Safepoint): O < S =
v match
case next: Continue[A] @unchecked =>
loop(idx + 1, next._1)()
loop(idx + 1)(run(idx, next._1))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[A, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(idx, i1)(kyo(v, context))
loop(idx)(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(0, input)()
loop(0)(Loop.continue(input))
end indexed

/** Executes an indexed loop with two state values.
Expand All @@ -450,18 +450,18 @@ object Loop:
inline run: Safepoint ?=> (Int, A, B) => Outcome2[A, B, O] < S
)(using inline _frame: Frame, safepoint: Safepoint): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(idx: Int, i1: A, i2: B)(v: Outcome2[A, B, O] < S = run(idx, i1, i2))(using Safepoint): O < S =
@tailrec def loop(idx: Int)(v: Outcome2[A, B, O] < S)(using Safepoint): O < S =
v match
case next: Continue2[A, B] @unchecked =>
loop(idx + 1, next._1, next._2)()
loop(idx + 1)(run(idx, next._1, next._2))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome2[A, B, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(idx, i1, i2)(kyo(v, context))
loop(idx)(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(0, input1, input2)()
loop(0)(Loop.continue(input1, input2))
end indexed

/** Executes an indexed loop with three state values.
Expand All @@ -484,18 +484,18 @@ object Loop:
inline run: Safepoint ?=> (Int, A, B, C) => Outcome3[A, B, C, O] < S
)(using inline _frame: Frame, safepoint: Safepoint): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(idx: Int, i1: A, i2: B, i3: C)(v: Outcome3[A, B, C, O] < S = run(idx, i1, i2, i3))(using Safepoint): O < S =
@tailrec def loop(idx: Int)(v: Outcome3[A, B, C, O] < S)(using Safepoint): O < S =
v match
case next: Continue3[A, B, C] @unchecked =>
loop(idx + 1, next._1, next._2, next._3)()
loop(idx + 1)(run(idx, next._1, next._2, next._3))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome3[A, B, C, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(idx, i1, i2, i3)(kyo(v, context))
loop(idx)(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(0, input1, input2, input3)()
loop(0)(Loop.continue(input1, input2, input3))
end indexed

/** Executes an indexed loop with four state values.
Expand All @@ -520,19 +520,18 @@ object Loop:
inline run: Safepoint ?=> (Int, A, B, C, D) => Outcome4[A, B, C, D, O] < S
)(using inline _frame: Frame, safepoint: Safepoint): O < S =
@nowarn("msg=anonymous")
@tailrec def loop(idx: Int, i1: A, i2: B, i3: C, i4: D)(v: Outcome4[A, B, C, D, O] < S =
run(idx, i1, i2, i3, i4))(using Safepoint): O < S =
@tailrec def loop(idx: Int)(v: Outcome4[A, B, C, D, O] < S)(using Safepoint): O < S =
v match
case next: Continue4[A, B, C, D] @unchecked =>
loop(idx + 1, next._1, next._2, next._3, next._4)()
loop(idx + 1)(run(idx, next._1, next._2, next._3, next._4))
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome4[A, B, C, D, O], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, O, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(idx, i1, i2, i3, i4)(kyo(v, context))
loop(idx)(kyo(v, context))
case res =>
res.asInstanceOf[O]
loop(0, input1, input2, input3, input4)()
loop(0)(Loop.continue(input1, input2, input3, input4))
end indexed

/** Executes a loop that continues until explicitly completed.
Expand All @@ -547,18 +546,18 @@ object Loop:
*/
inline def foreach[S](inline run: Safepoint ?=> Outcome[Unit, Unit] < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S =
@nowarn("msg=anonymous")
@tailrec def loop(v: Outcome[Unit, Unit] < S = run)(using Safepoint): Unit < S =
@tailrec def loop(v: Outcome[Unit, Unit] < S)(using Safepoint): Unit < S =
v match
case next: Continue[Unit] @unchecked =>
loop()
loop(run)
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, Unit], S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, Unit, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(kyo(v, context))
case res =>
()
loop()
loop(Loop.continue)
end foreach

/** Repeats an operation a specified number of times.
Expand All @@ -575,21 +574,20 @@ object Loop:
*/
inline def repeat[S](n: Int)(inline run: Safepoint ?=> Unit < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S =
@nowarn("msg=anonymous")
@tailrec def loop(i: Int)(v: Outcome[Unit, Unit] < S = run)(using Safepoint): Unit < S =
if i == n then ()
@tailrec def loop(i: Int)(v: Unit < S)(using Safepoint): Unit < S =
if i > n then ()
else
v match
case kyo: KyoSuspend[IX, OX, EX, Any, Outcome[Unit, Unit], S] @unchecked =>
case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, Unit, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(i)(kyo(v, context))
end new
case _ =>
loop(i + 1)()
loop(i + 1)(run)
end if
end loop
loop(0)()
loop(0)(())
end repeat

/** Executes a loop indefinitely until explicitly terminated.
Expand All @@ -602,14 +600,50 @@ object Loop:
* @return
* Nothing, as this loop runs forever unless interrupted
*/
inline def forever[S](inline run: Safepoint ?=> Unit < S)(using Frame, Safepoint): Unit < S =
def _loop(): Unit < S = loop()
@tailrec def loop(): Unit < S =
run match
case kyo: Kyo[Unit, S] @unchecked =>
kyo.andThen(_loop())
inline def forever[S](inline run: Safepoint ?=> Unit < S)(using inline _frame: Frame, safepoint: Safepoint): Unit < S =
@nowarn("msg=anonymous")
@tailrec def loop(v: Unit < S)(using Safepoint): Unit < S =
v match
case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, Unit, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(kyo(v, context))
case _ =>
loop()
loop()
loop(run)
end loop
loop(())
end forever

/** Executes an operation repeatedly while a condition remains true.
*
* @param condition
* The condition to check before each iteration. The loop continues while this is true.
* @param run
* The operation to execute in each iteration
* @return
* Unit after the loop completes
*/
inline def whileTrue[S](inline condition: Safepoint ?=> Boolean < S)(inline run: Unit < S)(
using
inline _frame: Frame,
safepoint: Safepoint
): Unit < S =
@nowarn("msg=anonymous")
def loop(v: Unit < S)(using Safepoint): Unit < S =
condition.map {
case true =>
v match
case kyo: KyoSuspend[IX, OX, EX, Any, Unit, S] @unchecked =>
new KyoContinue[IX, OX, EX, Any, Unit, S](kyo):
def frame = _frame
def apply(v: OX[Any], context: Context)(using Safepoint) =
loop(kyo(v, context))
case _ =>
loop(run)
case false => ()
}
end loop
loop(())
end whileTrue
end Loop
Loading

0 comments on commit 1b1ff70

Please sign in to comment.