Skip to content

Commit

Permalink
more batch methods
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Jan 21, 2025
1 parent 93d4ed6 commit 29a6f82
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
33 changes: 33 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,31 @@ final class Hub[A] private[kyo] (
IO {
discard(listeners.remove(listener))
}

/** Puts multiple elements into the Hub as a batch.
*
* @param values
* The sequence of elements to put
*/
def putBatch(values: Seq[A])(using Frame): Unit < (Abort[Closed] & Async) = ch.putBatch(values)

/** Takes exactly n elements from the Hub's buffer, blocking until enough elements are available.
*
* @param n
* Number of elements to take
* @return
* Chunk containing exactly n elements
*/
def takeExactly(n: Int)(using Frame): Chunk[A] < (Abort[Closed] & Async) = ch.takeExactly(n)

/** Takes up to max elements from the Hub's buffer without blocking.
*
* @param max
* Maximum number of elements to take
* @return
* Chunk containing up to max elements
*/
def drainUpTo(max: Int)(using Frame): Chunk[A] < (IO & Abort[Closed]) = ch.drainUpTo(max)
end Hub

object Hub:
Expand Down Expand Up @@ -352,5 +377,13 @@ object Hub:
def streamFailing(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Abort[Closed] & Async] =
child.stream(maxChunkSize)

/** Takes up to max elements from the Listener's buffer without blocking.
*
* @param max
* Maximum number of elements to take
* @return
* Chunk containing up to max elements
*/
def drainUpTo(max: Int)(using Frame): Chunk[A] < (IO & Abort[Closed]) = child.drainUpTo(max)
end Listener
end Hub
103 changes: 103 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/HubTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,107 @@ class HubTest extends Test:
yield assert(v == 1)
}
}

"batch operations" - {
"putBatch" - {
"delivers to all listeners" in run {
for
h <- Hub.init[Int](4)
l1 <- h.listen
l2 <- h.listen
_ <- h.putBatch(1 to 4)
r1 <- l1.takeExactly(4)
r2 <- l2.takeExactly(4)
yield assert(r1 == r2 && r1 == Chunk.from(1 to 4))
}

"respects listener filters" in run {
for
h <- Hub.init[Int](4)
l1 <- h.listen(_ % 2 == 0)
l2 <- h.listen(_ % 2 == 1)
_ <- h.putBatch(1 to 4)
r1 <- l1.takeExactly(2)
r2 <- l2.takeExactly(2)
yield assert(r1 == Chunk(2, 4) && r2 == Chunk(1, 3))
}

"handles backpressure" in run {
for
h <- Hub.init[Int](2)
l <- h.listen(2)
_ <- h.putBatch(1 to 2)
fiber <- Async.run(h.putBatch(3 to 6))
_ <- Async.sleep(10.millis)
done <- fiber.done
size <- l.size
yield assert(!done && size == 2)
}

"fails after hub is closed" in run {
for
h <- Hub.init[Int](4)
_ <- h.close
result <- Abort.run(h.putBatch(1 to 4))
yield assert(result.isFailure)
}
}

"takeExactly" - {
"blocks until enough elements" in run {
for
h <- Hub.init[Int](4)
l <- h.listen
fiber <- Async.run(l.takeExactly(4))
_ <- Async.sleep(10.millis)
done1 <- fiber.done
_ <- h.putBatch(1 to 4)
res <- fiber.get
done2 <- fiber.done
yield assert(!done1 && done2 && res == Chunk.from(1 to 4))
}

"respects filters" in run {
for
h <- Hub.init[Int](4)
l <- h.listen(_ % 2 == 0)
_ <- h.putBatch(1 to 6)
r <- l.takeExactly(2)
yield assert(r == Chunk(2, 4))
}

"fails if hub closes before enough elements" in run {
for
h <- Hub.init[Int](4)
l <- h.listen
fiber <- Async.run(l.takeExactly(4))
_ <- h.putBatch(1 to 2)
_ <- h.close
res <- Abort.run(fiber.get)
yield assert(res.isFailure)
}
}

"drainUpTo" - {
"takes available elements up to max" in run {
for
h <- Hub.init[Int](4)
l <- h.listen
_ <- h.putBatch(1 to 4)
_ <- Async.sleep(10.millis)
r1 <- l.drainUpTo(2)
r2 <- l.drainUpTo(4)
yield assert(r1.size == 2 && r2.size == 2 &&
r1.concat(r2) == Chunk.from(1 to 4))
}

"returns empty chunk when no elements available" in run {
for
h <- Hub.init[Int](4)
l <- h.listen
r <- l.drainUpTo(4)
yield assert(r.isEmpty)
}
}
}
end HubTest

0 comments on commit 29a6f82

Please sign in to comment.