Skip to content

Commit

Permalink
Stream: tap and tapChunk (#988)
Browse files Browse the repository at this point in the history
Adds methods for performing side effects on streams without altering
them.
  • Loading branch information
johnhungerford authored Jan 9, 2025
1 parent c4dc59c commit 7a775bf
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ class EmitCombinatorTest extends Test:
}
}

"stream.tap should not conflict with effect tap combinator" in {
val stream = Stream.init(Seq(1, 2, 3)).tap(i => Var.update[Int](_ + i).unit)
assert(Var.runTuple(0)(stream.run).eval == (6, Seq(1, 2, 3)))
}

end EmitCombinatorTest
38 changes: 38 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,44 @@ sealed abstract class Stream[V, -S]:
f(input).map(_.emit).map(ack => ((), cont(ack)))
))

/** Applies a side-effecting function to each element in the stream without altering them.
*
* @param f
* The function to apply to each value
* @return
* A new stream runs f while emitting values
*/
def tap[S1](f: V => Unit < S1)(
using
tag: Tag[Emit[Chunk[V]]],
frame: Frame
): Stream[V, S & S1] =
Stream:
ArrowEffect.handleState(tag, (), emit: Ack < (Emit[Chunk[V]] & S & S1)):
[C] =>
(input, _, cont) =>
Kyo.foreachDiscard(input)(f).andThen:
Emit.andMap(input)(ack => ((), cont(ack)))

/** Applies a side-effecting function to each chunk in the stream without altering them.
*
* @param f
* The function to apply to each chunk
* @return
* A new stream runs f while emitting chunks
*/
def tapChunk[S1](f: Chunk[V] => Unit < S1)(
using
tag: Tag[Emit[Chunk[V]]],
frame: Frame
): Stream[V, S & S1] =
Stream:
ArrowEffect.handleState(tag, (), emit: Ack < (Emit[Chunk[V]] & S & S1)):
[C] =>
(input, _, cont) =>
f(input).andThen:
Emit.andMap(input)(ack => ((), cont(ack)))

private def discard(using tag: Tag[Emit[Chunk[V]]], frame: Frame): Stream[V, S] =
Stream(ArrowEffect.handle(tag, emit)(
[C] => (input, cont) => cont(Stop)
Expand Down
30 changes: 30 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,36 @@ class StreamTest extends Test:
}
}

"tap" - {
"non-empty stream" in {
val stream = Stream
.init(Seq(1, 2, 3))
.tap(i => Var.update[Int](_ + i).unit)
assert(Var.runTuple(0)(stream.run).eval == (6, Seq(1, 2, 3)))
}
"empty stream" in {
val stream = Stream
.empty[Int]
.tap(i => Var.update[Int](_ + i).unit)
assert(Var.runTuple(0)(stream.run).eval == (0, Seq()))
}
}

"tapChunk" - {
"non-empty stream" in {
val stream = Stream
.apply(Emit.andMap(Chunk(1, 2, 3))(_ => Emit(Chunk(4, 5, 6))))
.tapChunk(c => Var.update[Int](_ + c.sum).unit)
assert(Var.runTuple(0)(stream.run).eval == (21, Seq(1, 2, 3, 4, 5, 6)))
}
"empty stream" in {
val stream = Stream
.empty[Int]
.tapChunk(c => Var.update[Int](_ + c.sum).unit)
assert(Var.runTuple(0)(stream.run).eval == (0, Seq()))
}
}

"concat" - {
"non-empty streams" in {
assert(
Expand Down

0 comments on commit 7a775bf

Please sign in to comment.