diff --git a/README.md b/README.md index 4fe636d6d..1b4b0aaaa 100644 --- a/README.md +++ b/README.md @@ -2440,13 +2440,13 @@ import kyo.* import kyo.Hub.Listener // Initialize a Hub with a buffer -val a: Hub[Int] < IO = +val a: Hub[Int] < (IO & Resource) = Hub.init[Int](3) // Hub provide APIs similar to // channels: size, offer, isEmpty, // isFull, putFiber, put -val b: Boolean < (IO & Abort[Closed]) = +val b: Boolean < (IO & Abort[Closed] & Resource) = a.map(_.offer(1)) // But reading from hubs can only @@ -2489,7 +2489,7 @@ val g: Int < (Async & Abort[Closed]) = // only include items pending in // the hub's buffer. The listener // buffers are discarded -val h: Maybe[Seq[Int]] < IO = +val h: Maybe[Seq[Int]] < (IO & Resource) = a.map(_.close) ``` diff --git a/kyo-core/shared/src/main/scala/kyo/Hub.scala b/kyo-core/shared/src/main/scala/kyo/Hub.scala index b69114fe5..8993c109b 100644 --- a/kyo-core/shared/src/main/scala/kyo/Hub.scala +++ b/kyo-core/shared/src/main/scala/kyo/Hub.scala @@ -204,7 +204,7 @@ object Hub: * @see * [[Hub.DefaultBufferSize]] for the default capacity value used by this method */ - def init[A](using Frame): Hub[A] < IO = + def init[A](using Frame): Hub[A] < (IO & Resource) = init(DefaultBufferSize) /** Initializes a new Hub with the specified capacity. @@ -216,7 +216,7 @@ object Hub: * @return * a new Hub instance */ - def init[A](capacity: Int)(using Frame): Hub[A] < IO = + def init[A](capacity: Int)(using Frame): Hub[A] < (IO & Resource) = initWith[A](capacity)(identity) /** Uses a new Hub with the given type and capacity. @@ -230,7 +230,7 @@ object Hub: * @return * The result of applying the function */ - def initWith[A](capacity: Int)[B, S](f: Hub[A] => B < S)(using Frame): B < (S & IO) = + def initWith[A](capacity: Int)[B, S](f: Hub[A] => B < S)(using Frame): B < (S & IO & Resource) = IO.Unsafe { val channel = Channel.Unsafe.init[A](capacity, Access.MultiProducerSingleConsumer).safe val listeners = new CopyOnWriteArraySet[Listener[A]] @@ -251,7 +251,9 @@ object Hub: } } }.map { fiber => - f(new Hub(channel, fiber, listeners)) + Resource + .acquireRelease(new Hub(channel, fiber, listeners))(_.close.unit) + .map(f) } } diff --git a/kyo-core/shared/src/test/scala/kyo/HubTest.scala b/kyo-core/shared/src/test/scala/kyo/HubTest.scala index 54bc9e311..b1fcd3d8d 100644 --- a/kyo-core/shared/src/test/scala/kyo/HubTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/HubTest.scala @@ -3,13 +3,29 @@ package kyo class HubTest extends Test: val repeats = 100 - "initWith" in run { - Hub.initWith[Int](10) { h => - for - l <- h.listen - _ <- h.offer(1) - v <- l.take - yield assert(v == 1) + "initWith" - { + "listen, offer, take" in run { + Hub.initWith[Int](10) { h => + for + l <- h.listen + _ <- h.offer(1) + v <- l.take + yield assert(v == 1) + } + } + + "resource" in run { + val effect: (Int, Hub[Int]) < (Abort[Closed] & Async) = Resource.run: + Hub.initWith[Int](10) { h => + for + l <- h.listen + _ <- h.offer(1) + v <- l.take + yield (v, h) + } + effect.map: (v, h) => + h.closed.map: isClosed => + assert(v == 1 && isClosed) } }