Skip to content

Commit

Permalink
Remove ack from Emit and Poll (#977)
Browse files Browse the repository at this point in the history
Closes #966
  • Loading branch information
johnhungerford authored Jan 16, 2025
1 parent 0130053 commit bc2a698
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 721 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ The `Emit` effect is designed to accumulate values throughout a computation, sim
import kyo.*

// Add a value
val a: Ack < Emit[Int] =
val a: Unit < Emit[Int] =
Emit.value(42)

// Add multiple values
Expand Down
22 changes: 11 additions & 11 deletions kyo-combinators/shared/src/main/scala/kyo/Combinators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
yield (a, a1)
end extension

extension [A, S](effect: Ack < (Emit[Chunk[A]] & S))
extension [A, S](effect: Unit < (Emit[Chunk[A]] & S))
/** Convert streaming Emit effect to a [[Stream[A, S]]]
*
* @return
Expand Down Expand Up @@ -949,7 +949,7 @@ extension [A, B, S](effect: B < (Emit[A] & S))
ArrowEffect.handle(tag, effect):
[C] =>
(a, cont) =>
fn(a).map(_ => cont(Ack.Continue()))
fn(a).map(_ => cont(()))

/** Handle Emit[A] by passing emitted values to [[channel]]. Fails with Abort[Closed] on channel closure
*
Expand Down Expand Up @@ -982,15 +982,15 @@ extension [A, B, S](effect: B < (Emit[A] & S))
(v, buffer, cont) =>
val b2 = buffer.append(v)
if b2.size >= chunkSize then
Emit.valueWith(b2): ack =>
(Chunk.empty, cont(ack))
Emit.valueWith(b2):
(Chunk.empty, cont(()))
else
(b2, cont(Ack.Continue()))
(b2, cont(()))
end if
,
(buffer, v) =>
if buffer.isEmpty then v
else Emit.valueWith(buffer)(_ => v)
else Emit.valueWith(buffer)(v)
)

/** Convert emitting effect to stream, chunking Emitted values in [[chunkSize]], and discarding result.
Expand All @@ -1004,7 +1004,7 @@ extension [A, B, S](effect: B < (Emit[A] & S))
chunkSize: Int
)(
using
NotGiven[B <:< Ack],
NotGiven[B =:= Unit],
Tag[A],
Flat[B],
Frame
Expand Down Expand Up @@ -1037,10 +1037,10 @@ extension [A, B, S](effect: B < (Emit[Chunk[A]] & S))
*/
def emitToStreamDiscarding(
using
NotGiven[B <:< Ack],
NotGiven[B =:= Unit],
Frame
): Stream[A, S] =
Stream(effect.map(_ => Ack.Continue()))
Stream(effect.unit)

/** Convert an effect that emits chunks of type [[A]] while computing a result of type [[B]] to an asynchronous stream of the emission
* type [[A]] and a separate asynchronous effect that yields the result of the original effect after the stream has been handled.
Expand All @@ -1056,11 +1056,11 @@ extension [A, B, S](effect: B < (Emit[Chunk[A]] & S))
for
p <- Promise.init[Nothing, B]
streamEmit = effect.map: b =>
p.complete(Result.success(b)).map(_ => Ack.Continue())
p.complete(Result.success(b)).unit
yield (Stream(streamEmit), p.join)
end extension

extension [A, B, S](effect: Ack < (Emit[A] & S))
extension [A, B, S](effect: Unit < (Emit[A] & S))
/** Convert emitting effect to stream, chunking Emitted values in [[chunkSize]].
*
* @param chunkSize
Expand Down
4 changes: 2 additions & 2 deletions kyo-combinators/shared/src/main/scala/kyo/Constructors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ extension (kyoObject: Kyo.type)
* @return
* An effect that emits a value
*/
def emit[A](value: A)(using Tag[A], Frame): Ack < Emit[A] =

def emit[A](value: A)(using Tag[A], Frame): Unit < Emit[A] =
Emit.value(value)

/** Creates an effect that fails with Abort[E].
Expand Down Expand Up @@ -471,5 +472,4 @@ extension (kyoObject: Kyo.type)
sequence: => Seq[A < Async]
)(using Flat[A], Frame): Unit < Async =
foreachPar(sequence)(identity).unit

end extension
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ class EmitCombinatorTest extends Test:

"emit" - {
"handleEmit" in run {
val emit = Loop(1)(i => if i == 4 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1))).map(_ => "done")
val emit = Loop(1)(i => if i == 4 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1))).map(_ => "done")
emit.handleEmit.map:
case (chunk, res) => assert(chunk == Chunk(1, 2, 3) && res == "done")
}

"handleEmitDiscarding" in run {
val emit = Loop(1)(i => if i == 4 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1))).map(_ => "done")
val emit = Loop(1)(i => if i == 4 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1))).map(_ => "done")
emit.handleEmitDiscarding.map:
case chunk => assert(chunk == Chunk(1, 2, 3))
}

"foreachEmit" in run {
val emit = Loop(1)(i => if i == 4 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1))).map(_ => "done")
val emit = Loop(1)(i => if i == 4 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1))).map(_ => "done")
val effect = emit.foreachEmit(i => Var.update[Int](v => v + i).unit).map: result =>
Var.get[Int].map(v => (result, v))
Var.run(0)(effect).map:
case (res, state) => assert(res == "done" && state == 6)
}

"emitToChannel" in run {
val emit = Loop(1)(i => if i == 4 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1)))
val emit = Loop(1)(i => if i == 4 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1)))
for
channel <- Channel.init[Int](10)
_ <- emit.emitToChannel(channel)
Expand All @@ -39,35 +39,35 @@ class EmitCombinatorTest extends Test:
}

"emitChunked with number of emitted values divisible by chunk size" in run {
val emit = Loop(1)(i => if i == 5 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1)))
val emit = Loop(1)(i => if i == 5 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1)))
val chunkedEmit = emit.emitChunked(2)
chunkedEmit.handleEmitDiscarding.map: result =>
assert(result == Chunk(Chunk(1, 2), Chunk(3, 4)))
}

"emitChunked with number of emitted values not divisible by chunk size" in run {
val emit = Loop(1)(i => if i == 6 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1)))
val emit = Loop(1)(i => if i == 6 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1)))
val chunkedEmit = emit.emitChunked(2)
chunkedEmit.handleEmitDiscarding.map: result =>
assert(result == Chunk(Chunk(1, 2), Chunk(3, 4), Chunk(5)))
}

"emitChunkedToStream" in run {
val emit = Loop(0)(i => if i == 9 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1))).map(_ => Ack.Continue())
val emit = Loop(0)(i => if i == 9 then Loop.done else Emit.valueWith(i)(Loop.continue(i + 1))).unit
val stream = emit.emitChunkedToStream(2)
stream.run.map: chunk =>
assert(chunk == Chunk.from(0 until 9))
}

"emitChunkedToStreamDiscarding" in run {
val emit = Loop(0)(i => if i == 9 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1)))
val emit = Loop(0)(i => if i == 9 then Loop.done("done") else Emit.valueWith(i)(Loop.continue(i + 1)))
val stream = emit.emitChunkedToStreamDiscarding(2)
stream.run.map: chunk =>
assert(chunk == Chunk.from(0 until 9))
}

"emitChunkedToStreamAndResult" in run {
val emit = Loop(0)(i => if i == 9 then Loop.done(()) else Emit.valueWith(i)(_ => Loop.continue(i + 1))).map(_ => "done")
val emit = Loop(0)(i => if i == 9 then Loop.done("done") else Emit.valueWith(i)(Loop.continue(i + 1)))
for
(stream, handled) <- emit.emitChunkedToStreamAndResult(2)
streamRes <- stream.run
Expand Down
7 changes: 2 additions & 5 deletions kyo-core/jvm/src/main/scala/kyo/Path.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,9 @@ class Path private (val path: List[String]) derives CanEqual:
Resource.acquireRelease(acquire)(release).map { res =>
readOnce(res).map { state =>
Loop(state) {
case Absent => Loop.done(Ack.Stop)
case Absent => Loop.done
case Present(content) =>
Emit.valueWith(writeOnce(content)) {
case Ack.Stop => Loop.done(Ack.Stop)
case _ => readOnce(res).map(Loop.continue(_))
}
Emit.valueWith(writeOnce(content))(readOnce(res).map(Loop.continue(_)))
}
}
}
Expand Down
Loading

0 comments on commit bc2a698

Please sign in to comment.