Skip to content

Commit

Permalink
Merge branch 'main' into stream-sep-pure-effect
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil authored Jan 22, 2025
2 parents b54fb6a + 05b51f6 commit d4a46d5
Show file tree
Hide file tree
Showing 3 changed files with 659 additions and 284 deletions.
254 changes: 177 additions & 77 deletions kyo-core/shared/src/main/scala/kyo/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,36 @@ package kyo
import Hub.*
import java.util.concurrent.CopyOnWriteArraySet

/** A Hub is a multi-producer, multi-consumer channel that broadcasts messages to all listeners.
/** A multi-producer, multi-consumer primitive that broadcasts messages to multiple listeners.
*
* Hub provides fan-out functionality by delivering each published message to all active listeners. Unlike a regular Channel which delivers
* messages to a single consumer, Hub enables multiple consumers to receive and process the same messages.
*
* Message flow and buffering:
* - Publishers send messages to the Hub's main buffer
* - A dedicated fiber distributes messages from the Hub buffer to each listener's individual buffer
* - Each listener consumes messages from its own buffer at its own pace
* - When any listener's buffer becomes full and the Hub's buffer is also full, backpressure is applied to publishers, affecting message
* delivery to all listeners
*
* @tparam A
* the type of elements in the Hub
* The type of messages that can be published through this Hub
*/
class Hub[A] private[kyo] (
final class Hub[A] private[kyo] (
ch: Channel[A],
fiber: Fiber[Closed, Unit],
listeners: CopyOnWriteArraySet[Channel[A]]
listeners: CopyOnWriteArraySet[Listener[A]]
)(using initFrame: Frame):

/** Returns the current size of the Hub.
*
* @return
* the number of elements currently in the Hub
*/
def size(using Frame): Int < (IO & Abort[Closed]) = ch.size

/** Attempts to offer an element to the Hub without blocking.
*
* If the Hub's buffer is full, this operation will return false immediately rather than waiting. The element will be delivered to all
* active listeners if accepted.
*
* @param v
* the element to offer
* @return
* true if the element was added, false otherwise
* true if the element was added successfully, false if the Hub's buffer was full
*/
def offer(v: A)(using Frame): Boolean < (IO & Abort[Closed]) = ch.offer(v)

Expand Down Expand Up @@ -60,88 +66,147 @@ class Hub[A] private[kyo] (
*/
def putFiber(v: A)(using Frame): Fiber[Closed, Unit] < IO = ch.putFiber(v)

/** Puts an element into the Hub, potentially blocking if the Hub is full.
/** Puts an element into the Hub, blocking if necessary until space is available.
*
* This operation will block when both a listener's buffer is full and preventing the Hub from processing, and the Hub's buffer is
* full. The element will be delivered to all active listeners once accepted.
*
* @param v
* the element to put
*/
def put(v: A)(using Frame): Unit < (Async & Abort[Closed]) = ch.put(v)

/** Puts multiple elements into the Hub as a batch.
*
* @param values
* The sequence of elements to put
*/
def putBatch(values: Seq[A])(using Frame): Unit < (Abort[Closed] & Async) = ch.putBatch(values)

/** Checks if the Hub is closed.
*
* @return
* true if the Hub is closed, false otherwise
*/
def closed(using Frame): Boolean < IO = ch.closed

/** Closes the Hub and all its listeners.
/** Closes the Hub and returns any remaining messages.
*
* When closed:
* - The Hub stops accepting new messages
* - All listeners are automatically closed
* - Any blocked publishers are unblocked with a Closed failure
* - All subsequent operations will fail with Closed
*
* @return
* a Maybe containing any remaining elements in the Hub
* a Maybe containing any messages that were in the Hub's buffer at the time of closing. Returns Absent if the close operation fails
* (e.g., if the Hub was already closed).
*/
def close(using frame: Frame): Maybe[Seq[A]] < IO =
fiber.interruptDiscard(Result.Panic(Closed("Hub", initFrame))).andThen {
fiber.interruptDiscard(Result.Failure(Closed("Hub", initFrame))).andThen {
ch.close.map { r =>
IO {
val array = listeners.toArray()
discard(listeners.removeIf(_ => true))
Loop.indexed { idx =>
if idx == array.length then Loop.done
else
array(idx).asInstanceOf[Channel[A]].close
.map(_ => Loop.continue)
}.andThen(r)
val l = Chunk.fromNoCopy(listeners.toArray()).asInstanceOf[Chunk[Listener[A]]]
discard(listeners.removeIf(_ => true)) // clear is not available in Scala Native
Kyo.foreachDiscard(l)(_.child.close.unit).andThen(r)
}
}
}

/** Creates a new listener for this Hub with default buffer size.
/** Creates a new listener for this Hub with the default buffer size.
*
* @return
* a new Listener
* a new Listener that will receive messages from the Hub
* @see
* [[Hub.DefaultBufferSize]] for the default buffer size used by this method
*/
def listen(using Frame): Listener[A] < (IO & Abort[Closed] & Resource) =
listen(0)
listen(DefaultBufferSize)

/** Creates a new listener for this Hub with specified buffer size.
/** Creates a new listener for this Hub with the specified buffer size.
*
* @param bufferSize
* the size of the buffer for the new listener
* the size of the buffer for the new listener. When full, it may cause hub backpressure affecting all listeners
* @return
* a new Listener
* a new Listener with the specified buffer size
*/
def listen(bufferSize: Int)(using frame: Frame): Listener[A] < (IO & Abort[Closed] & Resource) =
listen(bufferSize, _ => true)

/** Creates a new listener for this Hub with a custom filter predicate.
*
* @param filter
* a predicate function that determines which messages this listener receives
* @return
* a new Listener that will only receive messages matching the filter
* @see
* [[Hub.DefaultBufferSize]] for the default buffer size used by this method
*/
def listen(filter: A => Boolean)(using frame: Frame): Listener[A] < (IO & Abort[Closed] & Resource) =
listen(DefaultBufferSize, filter)

/** Creates a new listener for this Hub with specified buffer size and filter.
*
* The listener will only receive messages that pass its filter predicate. Messages that don't match the filter are discarded without
* consuming space in the listener's buffer. A new listener will only receive messages published after its creation - any messages
* published before the listener was created are not received.
*
* If the Hub is closed when attempting to create a listener, this operation will fail. Listeners created with this method should be
* properly closed when no longer needed, though they will be automatically closed if the Hub is closed.
*
* @param bufferSize
* the size of the buffer for the new listener. When full, it will cause backpressure
* @param filter
* a predicate function that determines which messages this listener receives
* @return
* a new Listener that will receive filtered messages from the Hub
*/
def listen(bufferSize: Int, filter: A => Boolean)(using frame: Frame): Listener[A] < (IO & Abort[Closed] & Resource) =
def fail = Abort.fail(Closed("Hub", initFrame))
closed.map {
case true => fail
case false =>
Channel.initWith[A](bufferSize) { child =>
IO {
discard(listeners.add(child))
closed.map {
case true =>
// race condition
IO {
discard(listeners.remove(child))
fail
}
case false =>
Resource.acquireRelease(new Listener[A](this, child)): listener =>
listener.close.unit
}
IO.Unsafe {
val child = Channel.Unsafe.init[A](bufferSize, Access.SingleProducerMultiConsumer).safe
val listener = new Listener[A](this, child, filter)
discard(listeners.add(listener))
closed.map {
case true =>
// race condition
IO {
discard(listeners.remove(listener))
fail
}
case false =>
Resource.acquireRelease(listener)(_.close.unit)
}
}
}
end listen

private[kyo] def remove(child: Channel[A])(using Frame): Unit < IO =
private[kyo] def remove(listener: Listener[A])(using Frame): Unit < IO =
IO {
discard(listeners.remove(child))
discard(listeners.remove(listener))
}
end Hub

object Hub:

/** Default buffer size used for Hub listeners when no explicit size is specified. */
inline def DefaultBufferSize: Int = 4096

/** Initializes a new Hub with the default capacity.
*
* @tparam A
* the type of elements in the Hub
* @return
* a new Hub instance with default buffer size
* @see
* [[Hub.DefaultBufferSize]] for the default capacity value used by this method
*/
def init[A](using Frame): Hub[A] < IO =
init(DefaultBufferSize)

/** Initializes a new Hub with the specified capacity.
*
* @param capacity
Expand All @@ -155,6 +220,7 @@ object Hub:
initWith[A](capacity)(identity)

/** Uses a new Hub with the given type and capacity.
*
* @param capacity
* the maximum number of elements the Hub can hold
* @param f
Expand All @@ -165,33 +231,45 @@ object Hub:
* The result of applying the function
*/
def initWith[A](capacity: Int)[B, S](f: Hub[A] => B < S)(using Frame): B < (S & IO) =
Channel.initWith[A](capacity) { ch =>
IO {
val listeners = new CopyOnWriteArraySet[Channel[A]]
Async.run {
Loop.foreach {
ch.take.map { v =>
IO {
val puts =
listeners.toArray()
.toList.asInstanceOf[List[Channel[A]]]
.map(child => Abort.run[Throwable](child.put(v)))
Async.parallelUnbounded(puts).map(_ => Loop.continue)
}
IO.Unsafe {
val channel = Channel.Unsafe.init[A](capacity, Access.MultiProducerSingleConsumer).safe
val listeners = new CopyOnWriteArraySet[Listener[A]]
def currentListeners = Chunk.fromNoCopy(listeners.toArray()).asInstanceOf[Chunk[Listener[A]]]
Async.run {
Loop.foreach {
channel.take.map { value =>
Abort.recover { error =>
bug(s"Hub publishing for value '$value' failed: $error")
Loop.continue
} {
Kyo.foreachDiscard(currentListeners) { listener =>
Abort.recover[Throwable](e => bug(s"Hub fiber failed to publish to listener: $e"))(
listener.put(value)
)
}.andThen(Loop.continue)
}
}
}.map { fiber =>
f(new Hub(ch, fiber, listeners))
}
}.map { fiber =>
f(new Hub(channel, fiber, listeners))
}
}

/** A Listener represents a subscriber to a Hub.
/** A subscriber to a Hub that receives and processes a filtered stream of messages.
*
* Each Listener maintains its own buffer and can process messages independently of other listeners. Messages published to the Hub are
* evaluated against the Listener's filter predicate - only matching messages are delivered to the Listener's buffer. A Listener only
* receives messages that were published after its creation.
*
* Message processing:
* - Messages matching the filter are added to the Listener's buffer
* - When the buffer is full, backpressure is applied to the Hub
* - The Listener can be closed independently of the Hub
*
* @tparam A
* the type of elements the Listener receives
* the type of messages this Listener receives
*/
class Listener[A] private[kyo] (hub: Hub[A], child: Channel[A]):
final class Listener[A] private[kyo] (hub: Hub[A], private[kyo] val child: Channel[A], filter: A => Boolean):

/** Returns the current size of the Listener's buffer.
*
Expand All @@ -214,6 +292,10 @@ object Hub:
*/
def full(using Frame): Boolean < (IO & Abort[Closed]) = child.full

private[Hub] def put(value: A)(using Frame): Unit < (Async & Abort[Closed]) =
if !filter(value) then ()
else child.put(value)

/** Attempts to retrieve and remove the head of the Listener's buffer without blocking.
*
* @return
Expand All @@ -229,14 +311,20 @@ object Hub:
def takeFiber(using Frame): Fiber[Closed, A] < IO = child.takeFiber

/** Takes an element from the Listener's buffer, potentially blocking if the buffer is empty.
*
* This operation will block until:
* - A message matching the listener's filter becomes available
* - The Hub or this Listener is closed (resulting in a Closed failure)
*
* @return
* the next element from the Listener's buffer
* the next element from the Listener's buffer that matches this listener's filter
* @throws Closed
* if either the Hub or this Listener has been closed
*/
def take(using Frame): A < (Async & Abort[Closed]) = child.take

/** Takes [[n]] elements from the Listener's buffer, semantically blocking until enough elements are present. Note that if enough
* elements are not added to the buffer it can block indefinitely.
/** Takes [[n]] elements from the Listener's buffer, blocking until enough elements are present. Note that if enough elements are
* not added to the buffer it can block indefinitely.
*
* @return
* Chunk of [[n]] elements
Expand All @@ -257,31 +345,43 @@ object Hub:
* a Maybe containing any remaining elements in the Listener's buffer
*/
def close(using Frame): Maybe[Seq[A]] < IO =
hub.remove(child).andThen(child.close)
hub.remove(this).andThen(child.close)

/** Stream elements from listener, optionally specifying a maximum chunk size. In the absence of [[maxChunkSize]], chunk sizes will
* be limited only by buffer capacity or the number of buffered elements at a given time. (Chunks can still be larger than buffer
* capacity.) Stops streaming when listener is closed.
/** Stream elements from listener, optionally specifying a maximum chunk size.
*
* @param maxChunkSize
* Maximum number of elements to take for each chunk
* This streaming operation:
* - Continues until the Hub or Listener is closed
* - Completes normally when closed (use streamFailing for failure behavior)
* - Only emits messages that match this listener's filter
* - Respects backpressure from downstream consumers
*
* @param maxChunkSize
* Maximum number of elements to include in each chunk. If not specified, chunks will be limited only by buffer capacity or
* available elements
* @return
* Asynchronous stream of elements from listener
* An asynchronous stream of elements from the listener that completes when closed
*/
def stream(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Async] =
def stream(maxChunkSize: Int = Stream.DefaultChunkSize)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Async] =
child.streamUntilClosed(maxChunkSize)

/** Like stream, but fails when listener is closed.
/** Like stream, but fails when listener is already closed.
*
* @param maxChunkSize
* Maximum number of elements to take for each chunk
*
* @return
* Asynchronous stream of elements from listener
*/
def streamFailing(maxChunkSize: Int = Int.MaxValue)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Abort[Closed] & Async] =
def streamFailing(maxChunkSize: Int = Stream.DefaultChunkSize)(using Tag[Emit[Chunk[A]]], Frame): Stream[A, Abort[Closed] & Async] =
child.stream(maxChunkSize)

/** Takes up to max elements from the Listener's buffer without blocking.
*
* @param max
* Maximum number of elements to take
* @return
* Chunk containing up to max elements
*/
def drainUpTo(max: Int)(using Frame): Chunk[A] < (IO & Abort[Closed]) = child.drainUpTo(max)
end Listener
end Hub
Loading

0 comments on commit d4a46d5

Please sign in to comment.