Skip to content

Commit

Permalink
Merge branch 'main' into github-templates
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil authored Jan 21, 2025
2 parents bed9e88 + de308a3 commit 1d73722
Show file tree
Hide file tree
Showing 20 changed files with 43 additions and 61 deletions.
10 changes: 1 addition & 9 deletions .github/workflows/build-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,4 @@ jobs:
run: sbt '+kyoJS/test'

- name: Build Native
run: sbt '+kyoNative/test'

- name: Save build state
uses: actions/cache@v3
with:
path: |
*/target
**/target
key: ${{ runner.os }}-build-state-main
run: sbt '+kyoNative/test'
12 changes: 1 addition & 11 deletions .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

steps:
- uses: actions/[email protected]
- uses: coursier/cache-action@v6
- uses: coursier/cache-action@v6
- name: Install Dependencies
run: |
sudo apt-get update
Expand All @@ -23,16 +23,6 @@ jobs:
with:
java-version: [email protected]=tgz+https://download.java.net/java/GA/jdk21.0.2/f2283984656d49d69e91c558476027ac/13/GPL/openjdk-21.0.2_linux-x64_bin.tar.gz

- name: Restore build state
uses: actions/cache@v3
with:
path: |
*/target
**/target
key: ${{ runner.os }}-build-state-main
restore-keys: |
${{ runner.os }}-build-state-main
- name: Build JVM
run: sbt '+kyoJVM/testQuick'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CountdownLatchBench extends Bench.ForkOnly(0):
import kyo.*

def iterate(l: Latch, n: Int): Unit < IO =
if n <= 0 then IO.unit
if n <= 0 then Kyo.unit
else l.release.flatMap(_ => iterate(l, n - 1))

for
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/DeepBindBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class DeepBindBench extends Bench.SyncAndFork(()):
import kyo.*

def loop(i: Int): Unit < IO =
IO.unit.flatMap { _ =>
Kyo.unit.flatMap { _ =>
if i > depth then
()
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EnqueueDequeueBench extends Bench.ForkOnly(()):

def loop(c: Channel[Unit], i: Int): Unit < (Async & Abort[Closed]) =
if i >= depth then
IO.unit
Kyo.unit
else
c.put(()).flatMap(_ => c.take.flatMap(_ => loop(c, i + 1)))

Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/ForkChainedBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ForkChainedBench extends Bench.ForkOnly(0):

def iterate(p: Promise[Nothing, Unit], n: Int): Unit < IO =
if n <= 0 then p.complete(Result.unit).unit
else IO.unit.flatMap(_ => Async.run(iterate(p, n - 1)).unit)
else Kyo.unit.flatMap(_ => Async.run(iterate(p, n - 1)).unit)

for
p <- Promise.init[Nothing, Unit]
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PingPongBench extends Bench.ForkOnly(()):
_ <- Async.run(chan.put(()))
_ <- chan.take
n <- ref.decrementAndGet
_ <- if n == 0 then promise.complete(Result.unit).unit else IO.unit
_ <- if n == 0 then promise.complete(Result.unit).unit else Kyo.unit
yield ()
_ <- repeat(depth)(Async.run[Closed, Unit, Any](effect))
yield ()
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/RendezvousBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class RendezvousBench extends Bench.ForkOnly(10000 * (10000 + 1) / 2):
}
}
else
IO.unit
Kyo.unit

def consume(waiting: AtomicRef[Any], n: Int = 0, acc: Int = 0): Int < Async =
if n <= depth then
Expand Down
8 changes: 4 additions & 4 deletions kyo-bench/src/main/scala/kyo/bench/SchedulingBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ class SchedulingBench extends Bench.ForkOnly(1001000):
import kyo.*

def fiber(i: Int): Int < IO =
IO.unit.flatMap { _ =>
Kyo.unit.flatMap { _ =>
IO(i).flatMap { j =>
IO.unit.flatMap { _ =>
Kyo.unit.flatMap { _ =>
if j > depth then
IO.unit.flatMap(_ => IO(j))
Kyo.unit.flatMap(_ => IO(j))
else
IO.unit.flatMap(_ => fiber(j + 1))
Kyo.unit.flatMap(_ => fiber(j + 1))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SemaphoreBench extends Bench.ForkOnly(()):

def loop(s: Meter, i: Int): Unit < (Async & Abort[Closed]) =
if i >= depth then
IO.unit
Kyo.unit
else
s.run(()).flatMap(_ => loop(s, i + 1))

Expand Down
2 changes: 1 addition & 1 deletion kyo-core/jvm/src/main/scala/kyo/Path.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class Path private (val path: List[String]) derives CanEqual:
(parentExists, createFolders) match
case (true, _) => IO(JFiles.copy(toJava, to.toJava, opts*)).unit
case (false, true) => Path(toJava.getParent().toString).mkDir.andThen(IO(JFiles.copy(toJava, to.toJava, opts*)).unit)
case _ => IO.unit
case _ => ()
}
end copy

Expand Down
2 changes: 1 addition & 1 deletion kyo-core/jvm/src/main/scala/kyo/Process.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ object Process:
for
_ <- Async.run(Resource.run(resources))
yield ()
case _ => IO.unit
case _ => Kyo.unit
yield process

override def cwd(newCwd: Path) = self.copy(cwd = Some(newCwd))
Expand Down
7 changes: 0 additions & 7 deletions kyo-core/shared/src/main/scala/kyo/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ opaque type IO <: Abort[Nothing] = Abort[Nothing]

object IO:

/** Creates a unit IO effect, representing a no-op side effect.
*
* @return
* A unit value wrapped in an IO effect.
*/
inline def unit: Unit < IO = ()

/** Suspends a potentially side-effecting computation in an IO effect.
*
* This method allows you to lift any computation (including those with side effects) into the IO context, deferring its execution
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/test/scala/kyo/ResourceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ResourceTest extends Test:
case object TestException extends NoStackTrace

"acquire fails" taggedAs jvmOnly in run {
val io = Resource.acquireRelease(IO[Int, Any](throw TestException))(_ => IO.unit)
val io = Resource.acquireRelease(IO[Int, Any](throw TestException))(_ => Kyo.unit)
Resource.run(io)
.pipe(Async.runAndBlock(timeout))
.pipe(Abort.run(_))
Expand Down
16 changes: 15 additions & 1 deletion kyo-kernel/shared/src/main/scala/kyo/Kyo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@ object Kyo:
* While pure values are automatically lifted into Kyo computations in most cases, this method can be useful in specific scenarios,
* such as in if/else expressions, to help with type inference.
*
* Note: This is a zero-cost operation that simply wraps the value in a Kyo computation type without introducing any effect suspension.
*
* @tparam A
* The type of the value
* @tparam S
* The effect context (can be Any)
* @param v
* The value to lift into the effect context
* @return
* A computation that produces the given value
* A computation that directly produces the given value without suspension
*/
inline def pure[A, S](inline v: A): A < S = v

/** Returns a pure effect that produces Unit.
*
* This is exactly equivalent to `pure(())`, as both simply lift the Unit value into the effect context without introducing any effect
* suspension.
*
* @tparam S
* The effect context (can be Any)
* @return
* A computation that directly produces Unit without suspension
*/
inline def unit[S]: Unit < S = ()

/** Zips two effects into a tuple.
*
* @param v1
Expand Down
5 changes: 2 additions & 3 deletions kyo-prelude/shared/src/main/scala/kyo/Emit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ object Emit:
* @param acc
* The initial accumulator value
* @param f
* The folding function that takes the current accumulator and emitted value, and returns a tuple of the new accumulator and an
* Ack to control further emissions
* The folding function that takes the current accumulator and emitted value, and returns an updated accumulator
* @param v
* The computation with Emit effect
* @return
Expand Down Expand Up @@ -147,7 +146,7 @@ object Emit:
* @return
* A tuple containing:
* - Maybe[V]: The first emitted value if any (None if no values were emitted)
* - A continuation function that takes an Ack and returns the remaining computation
* - A continuation function that returns the remaining computation
*/
def apply[A: Flat, S](v: A < (Emit[V] & S))(using tag: Tag[Emit[V]], frame: Frame): (Maybe[V], () => A < (Emit[V] & S)) < S =
ArrowEffect.handleFirst(tag, v)(
Expand Down
18 changes: 6 additions & 12 deletions kyo-prelude/shared/src/main/scala/kyo/Poll.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ import kyo.kernel.ArrowEffect

/** Represents polling values from a data source with backpressure control.
*
* Poll is used to consume values while maintaining flow control through acknowledgements. Each poll operation takes an Ack that determines
* how many values can be consumed, and returns Maybe[V] indicating whether a value was available.
*
* Key behaviors:
* - Each poll operation requires an Ack value that signals the consumer's readiness to receive more data
* * Key behaviors:
* - Poll returns Maybe[V], where:
* - Present(v) indicates a successful poll with value v
* - Absent indicates the end of the stream (no more values will be available)
* - Once Absent is received, the consumer should stop polling as the stream has terminated
* - Backpressure is maintained through the Ack responses:
* - Continue signals readiness to receive more values
* - Stop indicates the consumer wants to pause receiving values
* Poll is used to consume values. Each poll operation signals readiness to receive data, and returns Maybe[V] indicating whether a value
* was available.
*
* The effect enables building streaming data pipelines with controlled consumption rates. Handlers can process values at their own pace by
* returning appropriate Ack responses, while respecting stream termination signals.
* polling only as needed.
*
* @tparam V
* The type of values being polled from the data source.
Expand Down Expand Up @@ -152,9 +147,8 @@ object Poll:

/** Runs a Poll effect with a single input value, stopping after the first poll operation.
*
* This method provides a single input value to the Poll effect and stops after the first poll. It returns a tuple containing:
* - An Ack value indicating whether to continue or stop
* - A continuation function that can process the Maybe[V] result of the poll
* This method provides a single input value to the Poll effect and stops after the first poll. It returns a continuation function
* that can process the Maybe[V] result of the poll
*
* @param v
* The computation requiring Poll values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ final private[kyo] class StreamSubscriber[V](
end if
case other =>
if state.compareAndSet(curState, other) then
IO.unit
Kyo.unit
else
handleInterupt()
end match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final private[kyo] class StreamSubscription[V, Ctx](
fiber.onComplete {
case Result.Success(StreamComplete) => IO(subscriber.onComplete())
case Result.Panic(e) => IO(subscriber.onError(e))
case Result.Failure(StreamCanceled) => IO.unit
case Result.Failure(StreamCanceled) => Kyo.unit
}.andThen(fiber)
}
end consume
Expand Down
4 changes: 2 additions & 2 deletions kyo-stm/shared/src/main/scala/kyo/TTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ object TTable:
removeFromIndexes(id, prev.get)
.andThen(updateIndexes(id, record))
else
Kyo.pure(())
Kyo.unit
yield prev

def upsert(id: Id, record: Record[Fields])(using Frame) =
Expand All @@ -197,7 +197,7 @@ object TTable:
for
record <- store.get(id)
deleted <- store.remove(id)
_ <- if deleted.nonEmpty then removeFromIndexes(id, record.get) else Kyo.pure(())
_ <- if deleted.nonEmpty then removeFromIndexes(id, record.get) else Kyo.unit
yield deleted

def size(using Frame) = store.size
Expand Down

0 comments on commit 1d73722

Please sign in to comment.