From e83a3dd965334cf17c9eb995bcf6827f0915847a Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 26 Nov 2024 14:04:42 -0500 Subject: [PATCH] Preserve scopes in stream operations (#3989) --- .changeset/dull-ants-sip.md | 7 + .changeset/funny-zebras-happen.md | 21 + packages/effect/src/Channel.ts | 70 +- packages/effect/src/Effect.ts | 11 + packages/effect/src/Sink.ts | 11 + packages/effect/src/Stream.ts | 49 +- packages/effect/src/internal/channel.ts | 944 +++++++++--------- .../src/internal/channel/channelExecutor.ts | 70 +- packages/effect/src/internal/fiberRuntime.ts | 12 +- packages/effect/src/internal/groupBy.ts | 35 +- packages/effect/src/internal/layer.ts | 13 +- packages/effect/src/internal/mailbox.ts | 10 +- packages/effect/src/internal/sink.ts | 90 +- packages/effect/src/internal/stream.ts | 598 +++++------ packages/effect/test/Stream/scoping.test.ts | 176 ++-- 15 files changed, 1143 insertions(+), 974 deletions(-) create mode 100644 .changeset/dull-ants-sip.md create mode 100644 .changeset/funny-zebras-happen.md diff --git a/.changeset/dull-ants-sip.md b/.changeset/dull-ants-sip.md new file mode 100644 index 00000000000..bdbdf84ade8 --- /dev/null +++ b/.changeset/dull-ants-sip.md @@ -0,0 +1,7 @@ +--- +"effect": minor +--- + +Ensure scopes are preserved by stream / sink / channel operations + +**NOTE**: This change does modify the public signature of several `Stream` / `Sink` / `Channel` methods. Namely, certain run methods that previously removed a `Scope` from the environment will no longer do so. This was a bug with the previous implementation of how scopes were propagated, and is why this change is being made in a minor release. diff --git a/.changeset/funny-zebras-happen.md b/.changeset/funny-zebras-happen.md new file mode 100644 index 00000000000..b786d7b6b15 --- /dev/null +++ b/.changeset/funny-zebras-happen.md @@ -0,0 +1,21 @@ +--- +"effect": minor +--- + +Add `Effect.scopedWith` to run an effect that depends on a `Scope`, and then closes the `Scope` after the effect has completed + +```ts +import { Effect, Scope } from "effect" + +const program: Effect.Effect = Effect.scopedWith((scope) => + Effect.acquireRelease( + Effect.log("Acquiring..."), + () => Effect.log("Releasing...") + ).pipe(Scope.extend(scope)) +) + +Effect.runPromise(program) +// Output: +// timestamp=2024-11-26T16:44:54.158Z level=INFO fiber=#0 message=Acquiring... +// timestamp=2024-11-26T16:44:54.165Z level=INFO fiber=#0 message=Releasing... +``` diff --git a/packages/effect/src/Channel.ts b/packages/effect/src/Channel.ts index 7bdddd90ec7..5ce1f493430 100644 --- a/packages/effect/src/Channel.ts +++ b/packages/effect/src/Channel.ts @@ -1930,7 +1930,7 @@ export const repeated: ( */ export const run: ( self: Channel -) => Effect.Effect> = channel.run +) => Effect.Effect = channel.run /** * Run the channel until it finishes with a done value or fails with an error @@ -1943,7 +1943,7 @@ export const run: ( */ export const runCollect: ( self: Channel -) => Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Exclude> = channel.runCollect +) => Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Env> = channel.runCollect /** * Runs a channel until the end is received. @@ -1953,7 +1953,21 @@ export const runCollect: ( */ export const runDrain: ( self: Channel -) => Effect.Effect> = channel.runDrain +) => Effect.Effect = channel.runDrain + +/** + * Run the channel until it finishes with a done value or fails with an error. + * The channel must not read any input or write any output. + * + * Closing the channel, which includes execution of all the finalizers + * attached to the channel will be added to the current scope as a finalizer. + * + * @since 3.11.0 + * @category destructors + */ +export const runScoped: ( + self: Channel +) => Effect.Effect = channel.runScoped /** * Use a scoped effect to emit an output element. @@ -1965,6 +1979,18 @@ export const scoped: ( effect: Effect.Effect ) => Channel> = channel.scoped +/** + * Use a function that receives a scope and returns an effect to emit an output + * element. The output element will be the result of the returned effect, if + * successful. + * + * @since 3.11.0 + * @category constructors + */ +export const scopedWith: ( + f: (scope: Scope.Scope) => Effect.Effect +) => Channel = channel.scopedWith + /** * Splits strings on newlines. Handles both Windows newlines (`\r\n`) and UNIX * newlines (`\n`). @@ -2034,6 +2060,27 @@ export const toPull: ( ) => Effect.Effect, OutErr, Env>, never, Scope.Scope | Env> = channel.toPull +/** + * Returns an `Effect` that can be used to repeatedly pull elements from the + * constructed `Channel` within the provided `Scope`. The pull effect fails + * with the channel's failure in case the channel fails, or returns either the + * channel's done value or an emitted element. + * + * @since 3.11.0 + * @category destructors + */ +export const toPullIn: { + ( + scope: Scope.Scope + ): ( + self: Channel + ) => Effect.Effect, OutErr, Env>, never, Env> + ( + self: Channel, + scope: Scope.Scope + ): Effect.Effect, OutErr, Env>, never, Env> +} = channel.toPullIn + /** * Converts a `Channel` to a `Queue`. * @@ -2073,7 +2120,8 @@ export { } /** - * Makes a channel from an effect that returns a channel in case of success. + * Constructs a `Channel` from an effect that will result in a `Channel` if + * successful. * * @since 2.0.0 * @category constructors @@ -2083,7 +2131,8 @@ export const unwrap: ) => Channel = channel.unwrap /** - * Makes a channel from a managed that returns a channel in case of success. + * Constructs a `Channel` from a scoped effect that will result in a + * `Channel` if successful. * * @since 2.0.0 * @category constructors @@ -2092,6 +2141,17 @@ export const unwrapScoped: , E, R> ) => Channel> = channel.unwrapScoped +/** + * Constructs a `Channel` from a function which receives a `Scope` and returns + * an effect that will result in a `Channel` if successful. + * + * @since 3.11.0 + * @category constructors + */ +export const unwrapScopedWith: ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +) => Channel = channel.unwrapScopedWith + /** * Updates a service in the context of this channel. * diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index fd81437cd12..0f658e3f540 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -4297,6 +4297,17 @@ export const scope: Effect = fiberRuntime.scope export const scopeWith: (f: (scope: Scope.Scope) => Effect) => Effect = fiberRuntime.scopeWith +/** + * Creates a `Scope`, passes it to the specified effectful function, and then + * closes the scope as soon as the effect is complete (whether through success, + * failure, or interruption). + * + * @since 3.11.0 + * @category scoping, resources & finalization + */ +export const scopedWith: (f: (scope: Scope.Scope) => Effect) => Effect = + fiberRuntime.scopedWith + /** * Scopes all resources used in this workflow to the lifetime of the workflow, * ensuring that their finalizers are run as soon as this workflow completes diff --git a/packages/effect/src/Sink.ts b/packages/effect/src/Sink.ts index bea27ed7a25..a024ad87fe1 100644 --- a/packages/effect/src/Sink.ts +++ b/packages/effect/src/Sink.ts @@ -1357,6 +1357,17 @@ export const unwrapScoped: ( effect: Effect.Effect, E, R> ) => Sink> = internal.unwrapScoped +/** + * Constructs a `Sink` from a function which receives a `Scope` and returns + * an effect that will result in a `Sink` if successful. + * + * @since 3.11.0 + * @category constructors + */ +export const unwrapScopedWith: ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +) => Sink = internal.unwrapScopedWith + /** * Returns the sink that executes this one and times its execution. * diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index f9684caf9ea..60bb737f300 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -4076,8 +4076,7 @@ export const run: { * @since 2.0.0 * @category destructors */ -export const runCollect: (self: Stream) => Effect.Effect, E, Exclude> = - internal.runCollect +export const runCollect: (self: Stream) => Effect.Effect, E, R> = internal.runCollect /** * Runs the stream and emits the number of elements processed @@ -4085,8 +4084,7 @@ export const runCollect: (self: Stream) => Effect.Effect(self: Stream) => Effect.Effect> = - internal.runCount +export const runCount: (self: Stream) => Effect.Effect = internal.runCount /** * Runs the stream only for its effects. The emitted elements are discarded. @@ -4094,8 +4092,7 @@ export const runCount: (self: Stream) => Effect.Effect(self: Stream) => Effect.Effect> = - internal.runDrain +export const runDrain: (self: Stream) => Effect.Effect = internal.runDrain /** * Executes a pure fold over the stream of values - reduces all elements in @@ -4105,8 +4102,8 @@ export const runDrain: (self: Stream) => Effect.Effect(s: S, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect> - (self: Stream, s: S, f: (s: S, a: A) => S): Effect.Effect> + (s: S, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect + (self: Stream, s: S, f: (s: S, a: A) => S): Effect.Effect } = internal.runFold /** @@ -4166,17 +4163,8 @@ export const runFoldScopedEffect: { * @category destructors */ export const runFoldWhile: { - ( - s: S, - cont: Predicate, - f: (s: S, a: A) => S - ): (self: Stream) => Effect.Effect> - ( - self: Stream, - s: S, - cont: Predicate, - f: (s: S, a: A) => S - ): Effect.Effect> + (s: S, cont: Predicate, f: (s: S, a: A) => S): (self: Stream) => Effect.Effect + (self: Stream, s: S, cont: Predicate, f: (s: S, a: A) => S): Effect.Effect } = internal.runFoldWhile /** @@ -4591,6 +4579,17 @@ export const scheduleWith: { export const scoped: (effect: Effect.Effect) => Stream> = internal.scoped +/** + * Use a function that receives a scope and returns an effect to emit an output + * element. The output element will be the result of the returned effect, if + * successful. + * + * @since 3.11.0 + * @category constructors + */ +export const scopedWith: (f: (scope: Scope.Scope) => Effect.Effect) => Stream = + internal.scopedWith + /** * Emits a sliding window of `n` elements. * @@ -5436,6 +5435,18 @@ export const unwrapScoped: ( effect: Effect.Effect, E, R> ) => Stream> = internal.unwrapScoped +/** + * Creates a stream produced from a function which receives a `Scope` and + * returns an `Effect`. The resulting stream will emit a single element, which + * will be the result of the returned effect, if successful. + * + * @since 3.11.0 + * @category constructors + */ +export const unwrapScopedWith: ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +) => Stream = internal.unwrapScopedWith + /** * Updates the specified service within the context of the `Stream`. * diff --git a/packages/effect/src/internal/channel.ts b/packages/effect/src/internal/channel.ts index 4000de66e88..b23bf6aed64 100644 --- a/packages/effect/src/internal/channel.ts +++ b/packages/effect/src/internal/channel.ts @@ -910,87 +910,66 @@ export const mapOutEffectPar = dual< f: (o: OutElem) => Effect.Effect, n: number ): Channel.Channel => - pipe( - Effect.gen(function*($) { - const queue = yield* $( - Effect.acquireRelease( - Queue.bounded, OutErr | OutErr1, Env1>>(n), - (queue) => Queue.shutdown(queue) - ) - ) - const errorSignal = yield* $(Deferred.make()) - const withPermits = n === Number.POSITIVE_INFINITY ? - ((_: number) => identity) : - (yield* $(Effect.makeSemaphore(n))).withPermits - const pull = yield* $(toPull(self)) - yield* $( - Effect.matchCauseEffect(pull, { - onFailure: (cause) => Queue.offer(queue, Effect.failCause(cause)), - onSuccess: (either) => - Either.match( - either, - { - onLeft: (outDone) => { - const lock = withPermits(n) - return Effect.zipRight( - Effect.interruptible(lock(Effect.void)), - Effect.asVoid(Queue.offer( - queue, - Effect.succeed(Either.left(outDone)) - )) - ) - }, - onRight: (outElem) => - Effect.gen(function*($) { - const deferred = yield* $(Deferred.make()) - const latch = yield* $(Deferred.make()) - yield* $(Effect.asVoid(Queue.offer( - queue, - Effect.map(Deferred.await(deferred), Either.right) - ))) - yield* $( - Deferred.succeed(latch, void 0), - Effect.zipRight( - pipe( - Effect.uninterruptibleMask((restore) => - pipe( - Effect.exit(restore(Deferred.await(errorSignal))), - Effect.raceFirst(Effect.exit(restore(f(outElem)))), - // TODO: remove - Effect.flatMap((exit) => Effect.suspend(() => exit)) - ) - ), - Effect.tapErrorCause((cause) => Deferred.failCause(errorSignal, cause)), - Effect.intoDeferred(deferred) + unwrapScopedWith( + (scope) => + Effect.gen(function*() { + const input = yield* singleProducerAsyncInput.make() + const queueReader = fromInput(input) + const queue = yield* Queue.bounded, OutErr | OutErr1, Env1>>(n) + yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) + const errorSignal = yield* Deferred.make() + const withPermits = n === Number.POSITIVE_INFINITY ? + ((_: number) => identity) : + (yield* Effect.makeSemaphore(n)).withPermits + const pull = yield* queueReader.pipe(core.pipeTo(self), toPullIn(scope)) + yield* pull.pipe( + Effect.matchCauseEffect({ + onFailure: (cause) => Queue.offer(queue, Effect.failCause(cause)), + onSuccess: Either.match({ + onLeft: (outDone) => + Effect.zipRight( + Effect.interruptible(withPermits(n)(Effect.void)), + Effect.asVoid(Queue.offer(queue, Effect.succeed(Either.left(outDone)))) + ), + onRight: (outElem) => + Effect.gen(function*() { + const deferred = yield* Deferred.make() + const latch = yield* Deferred.make() + yield* Queue.offer(queue, Effect.map(Deferred.await(deferred), Either.right)) + yield* Deferred.succeed(latch, void 0).pipe( + Effect.zipRight( + Effect.uninterruptibleMask((restore) => + Effect.exit(restore(Deferred.await(errorSignal))).pipe( + Effect.raceFirst(Effect.exit(restore(f(outElem)))), + Effect.flatMap(identity) ) - ), - withPermits(1), - Effect.forkScoped - ) - yield* $(Deferred.await(latch)) - }) - } - ) - }), - Effect.forever, - Effect.interruptible, - Effect.forkScoped - ) - return queue - }), - Effect.map((queue) => { - const consumer: Channel.Channel = unwrap( - Effect.matchCause(Effect.flatten(Queue.take(queue)), { - onFailure: core.failCause, - onSuccess: Either.match({ - onLeft: core.succeedNow, - onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) + ).pipe( + Effect.tapErrorCause((cause) => Deferred.failCause(errorSignal, cause)), + Effect.intoDeferred(deferred) + ) + ), + withPermits(1), + Effect.forkIn(scope) + ) + yield* Deferred.await(latch) + }) + }) + }), + Effect.forever, + Effect.interruptible, + Effect.forkIn(scope) + ) + const consumer: Channel.Channel = unwrap( + Effect.matchCause(Effect.flatten(Queue.take(queue)), { + onFailure: core.failCause, + onSuccess: Either.match({ + onLeft: core.succeedNow, + onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) + }) }) - }) - ) - return consumer - }), - unwrapScoped + ) + return core.embedInput(consumer, input) + }) )) /** @internal */ @@ -1134,180 +1113,168 @@ export const mergeAllWith = ( InDone & InDone1, Env | Env1 > => - pipe( - Effect.gen(function*($) { - const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : concurrency - const input = yield* $(singleProducerAsyncInput.make< - InErr & InErr1, - InElem & InElem1, - InDone & InDone1 - >()) - const queueReader = fromInput(input) - const queue = yield* $( - Effect.acquireRelease( - Queue.bounded, OutErr | OutErr1, Env>>(bufferSize), - (queue) => Queue.shutdown(queue) - ) - ) - const cancelers = yield* $( - Effect.acquireRelease( - Queue.unbounded>(), - (queue) => Queue.shutdown(queue) + unwrapScopedWith( + (scope) => + Effect.gen(function*() { + const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : concurrency + const input = yield* singleProducerAsyncInput.make< + InErr & InErr1, + InElem & InElem1, + InDone & InDone1 + >() + const queueReader = fromInput(input) + const queue = yield* Queue.bounded, OutErr | OutErr1, Env>>( + bufferSize ) - ) - const lastDone = yield* $(Ref.make>(Option.none())) - const errorSignal = yield* $(Deferred.make()) - const withPermits = (yield* $(Effect.makeSemaphore(concurrencyN))) - .withPermits - const pull = yield* $(toPull(channels)) - const evaluatePull = ( - pull: Effect.Effect, OutErr | OutErr1, Env | Env1> - ) => - pipe( - Effect.flatMap( - pull, - Either.match({ + yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) + const cancelers = yield* Queue.unbounded>() + yield* Scope.addFinalizer(scope, Queue.shutdown(cancelers)) + const lastDone = yield* Ref.make>(Option.none()) + const errorSignal = yield* Deferred.make() + const withPermits = (yield* Effect.makeSemaphore(concurrencyN)).withPermits + const pull = yield* toPullIn(core.pipeTo(queueReader, channels), scope) + + function evaluatePull( + pull: Effect.Effect< + Either.Either, + OutErr | OutErr1, + Env | Env1 + > + ) { + return pull.pipe( + Effect.flatMap(Either.match({ onLeft: (done) => Effect.succeed(Option.some(done)), onRight: (outElem) => Effect.as( Queue.offer(queue, Effect.succeed(Either.right(outElem))), Option.none() ) - }) - ), - Effect.repeat({ until: (_): _ is Option.Some => Option.isSome(_) }), - Effect.flatMap((outDone) => - Ref.update( - lastDone, - Option.match({ - onNone: () => Option.some(outDone.value), - onSome: (lastDone) => Option.some(f(lastDone, outDone.value)) - }) - ) - ), - Effect.catchAllCause((cause) => - Cause.isInterrupted(cause) ? - Effect.failCause(cause) : - pipe( - Queue.offer(queue, Effect.failCause(cause)), - Effect.zipRight(Deferred.succeed(errorSignal, void 0)), - Effect.asVoid + })), + Effect.repeat({ until: (_): _ is Option.Some => Option.isSome(_) }), + Effect.flatMap((outDone) => + Ref.update( + lastDone, + Option.match({ + onNone: () => Option.some(outDone.value), + onSome: (lastDone) => Option.some(f(lastDone, outDone.value)) + }) ) - ) - ) - yield* $( - Effect.matchCauseEffect(pull, { - onFailure: (cause) => - pipe( - Queue.offer(queue, Effect.failCause(cause)), - Effect.zipRight(Effect.succeed(false)) ), - onSuccess: Either.match({ - onLeft: (outDone) => - Effect.raceWith( - Effect.interruptible(Deferred.await(errorSignal)), - Effect.interruptible(withPermits(concurrencyN)(Effect.void)), - { - onSelfDone: (_, permitAcquisition) => Effect.as(Fiber.interrupt(permitAcquisition), false), - onOtherDone: (_, failureAwait) => - Effect.zipRight( - Fiber.interrupt(failureAwait), - pipe( - Ref.get(lastDone), - Effect.flatMap(Option.match({ - onNone: () => Queue.offer(queue, Effect.succeed(Either.left(outDone))), - onSome: (lastDone) => Queue.offer(queue, Effect.succeed(Either.left(f(lastDone, outDone)))) - })), - Effect.as(false) - ) - ) - } + Effect.catchAllCause((cause) => + Cause.isInterrupted(cause) + ? Effect.failCause(cause) + : Queue.offer(queue, Effect.failCause(cause)).pipe( + Effect.zipRight(Deferred.succeed(errorSignal, void 0)), + Effect.asVoid + ) + ) + ) + } + + yield* pull.pipe( + Effect.matchCauseEffect({ + onFailure: (cause) => + Queue.offer(queue, Effect.failCause(cause)).pipe( + Effect.zipRight(Effect.succeed(false)) ), - onRight: (channel) => - _mergeStrategy.match(mergeStrategy, { - onBackPressure: () => - Effect.gen(function*($) { - const latch = yield* $(Deferred.make()) - const raceEffects: Effect.Effect = pipe( - queueReader, - core.pipeTo(channel), - toPull, - Effect.flatMap((pull) => - Effect.race( - evaluatePull(pull), - Effect.interruptible(Deferred.await(errorSignal)) + onSuccess: Either.match({ + onLeft: (outDone) => + Effect.raceWith( + Effect.interruptible(Deferred.await(errorSignal)), + Effect.interruptible(withPermits(concurrencyN)(Effect.void)), + { + onSelfDone: (_, permitAcquisition) => Effect.as(Fiber.interrupt(permitAcquisition), false), + onOtherDone: (_, failureAwait) => + Effect.zipRight( + Fiber.interrupt(failureAwait), + Ref.get(lastDone).pipe( + Effect.flatMap(Option.match({ + onNone: () => Queue.offer(queue, Effect.succeed(Either.left(outDone))), + onSome: (lastDone) => Queue.offer(queue, Effect.succeed(Either.left(f(lastDone, outDone)))) + })), + Effect.as(false) ) - ), - Effect.scoped - ) - yield* $( - Deferred.succeed(latch, void 0), - Effect.zipRight(raceEffects), - withPermits(1), - Effect.forkScoped - ) - yield* $(Deferred.await(latch)) - const errored = yield* $(Deferred.isDone(errorSignal)) - return !errored - }), - onBufferSliding: () => - Effect.gen(function*($) { - const canceler = yield* $(Deferred.make()) - const latch = yield* $(Deferred.make()) - const size = yield* $(Queue.size(cancelers)) - yield* $( - Queue.take(cancelers), - Effect.flatMap((_) => Deferred.succeed(_, void 0)), - Effect.when(() => size >= concurrencyN) - ) - yield* $(Queue.offer(cancelers, canceler)) - const raceEffects: Effect.Effect = pipe( - queueReader, - core.pipeTo(channel), - toPull, - Effect.flatMap((pull) => - pipe( - evaluatePull(pull), - Effect.race(Effect.interruptible(Deferred.await(errorSignal))), - Effect.race(Effect.interruptible(Deferred.await(canceler))) + ) + } + ), + onRight: (channel) => + _mergeStrategy.match(mergeStrategy, { + onBackPressure: () => + Effect.gen(function*() { + const latch = yield* Deferred.make() + const raceEffects = Effect.scopedWith((scope) => + toPullIn(core.pipeTo(queueReader, channel), scope).pipe( + Effect.flatMap((pull) => + Effect.race( + Effect.exit(evaluatePull(pull)), + Effect.exit(Effect.interruptible(Deferred.await(errorSignal))) + ) + ), + Effect.flatMap(identity) ) - ), - Effect.scoped - ) - yield* $( - Deferred.succeed(latch, void 0), - Effect.zipRight(raceEffects), - withPermits(1), - Effect.forkScoped - ) - yield* $(Deferred.await(latch)) - const errored = yield* $(Deferred.isDone(errorSignal)) - return !errored - }) + ) + yield* Deferred.succeed(latch, void 0).pipe( + Effect.zipRight(raceEffects), + withPermits(1), + Effect.forkIn(scope) + ) + yield* Deferred.await(latch) + const errored = yield* Deferred.isDone(errorSignal) + return !errored + }), + onBufferSliding: () => + Effect.gen(function*() { + const canceler = yield* Deferred.make() + const latch = yield* Deferred.make() + const size = yield* Queue.size(cancelers) + yield* Queue.take(cancelers).pipe( + Effect.flatMap((canceler) => Deferred.succeed(canceler, void 0)), + Effect.when(() => size >= concurrencyN) + ) + yield* Queue.offer(cancelers, canceler) + const raceEffects = Effect.scopedWith((scope) => + toPullIn(core.pipeTo(queueReader, channel), scope).pipe( + Effect.flatMap((pull) => + Effect.exit(evaluatePull(pull)).pipe( + Effect.race(Effect.exit(Effect.interruptible(Deferred.await(errorSignal)))), + Effect.race(Effect.exit(Effect.interruptible(Deferred.await(canceler)))) + ) + ), + Effect.flatMap(identity) + ) + ) + yield* Deferred.succeed(latch, void 0).pipe( + Effect.zipRight(raceEffects), + withPermits(1), + Effect.forkIn(scope) + ) + yield* Deferred.await(latch) + const errored = yield* Deferred.isDone(errorSignal) + return !errored + }) + }) + }) + }), + Effect.repeat({ while: (_) => _ }), + Effect.forkIn(scope) + ) + + const consumer: Channel.Channel = + pipe( + Queue.take(queue), + Effect.flatten, + Effect.matchCause({ + onFailure: core.failCause, + onSuccess: Either.match({ + onLeft: core.succeedNow, + onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) }) - }) - }), - Effect.repeat({ while: (_) => _ }), - Effect.forkScoped - ) - return [queue, input] as const - }), - Effect.map(([queue, input]) => { - const consumer: Channel.Channel = pipe( - Queue.take(queue), - Effect.flatten, - Effect.matchCause({ - onFailure: core.failCause, - onSuccess: Either.match({ - onLeft: core.succeedNow, - onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) - }) - }), - unwrap - ) - return core.embedInput(consumer, input) - }), - unwrapScoped + }), + unwrap + ) + + return core.embedInput(consumer, input) + }) ) /** @internal */ @@ -1596,132 +1563,76 @@ export const mergeWith = dual< OutDone2 | OutDone3, InDone & InDone1, Env1 | Env -> => - unwrapScoped( - Effect.flatMap( - singleProducerAsyncInput.make< +> => { + function merge(scope: Scope.Scope) { + return Effect.gen(function*() { + type State = MergeState.MergeState< + Env | Env1, + OutErr, + OutErr1, + OutErr2 | OutErr3, + OutElem | OutElem1, + OutDone, + OutDone1, + OutDone2 | OutDone3 + > + + const input = yield* singleProducerAsyncInput.make< InErr & InErr1, InElem & InElem1, InDone & InDone1 - >(), - (input) => { - const queueReader = fromInput(input) - return Effect.map( - Effect.all([ - toPull(core.pipeTo(queueReader, self)), - toPull(core.pipeTo(queueReader, options.other)), - Effect.scope - ]), - ([pullL, pullR, scope]) => { - type State = MergeState.MergeState< + >() + const queueReader = fromInput(input) + const pullL = yield* toPullIn(core.pipeTo(queueReader, self), scope) + const pullR = yield* toPullIn(core.pipeTo(queueReader, options.other), scope) + + function handleSide( + exit: Exit.Exit, Err>, + fiber: Fiber.Fiber, Err2>, + pull: Effect.Effect, Err, Env | Env1> + ) { + return ( + done: ( + ex: Exit.Exit + ) => MergeDecision.MergeDecision< + Env | Env1, + Err2, + Done2, + OutErr2 | OutErr3, + OutDone2 | OutDone3 + >, + both: ( + f1: Fiber.Fiber, Err>, + f2: Fiber.Fiber, Err2> + ) => State, + single: ( + f: ( + ex: Exit.Exit + ) => Effect.Effect + ) => State + ): Effect.Effect< + Channel.Channel< + OutElem | OutElem1, + unknown, + OutErr2 | OutErr3, + unknown, + OutDone2 | OutDone3, + unknown, + Env | Env1 + >, + never, + Env | Env1 + > => { + function onDecision( + decision: MergeDecision.MergeDecision< Env | Env1, - OutErr, - OutErr1, + Err2, + Done2, OutErr2 | OutErr3, - OutElem | OutElem1, - OutDone, - OutDone1, OutDone2 | OutDone3 > - - const handleSide = ( - exit: Exit.Exit, Err>, - fiber: Fiber.Fiber, Err2>, - pull: Effect.Effect, Err, Env | Env1> - ) => - ( - done: ( - ex: Exit.Exit - ) => MergeDecision.MergeDecision< - Env | Env1, - Err2, - Done2, - OutErr2 | OutErr3, - OutDone2 | OutDone3 - >, - both: ( - f1: Fiber.Fiber, Err>, - f2: Fiber.Fiber, Err2> - ) => State, - single: ( - f: ( - ex: Exit.Exit - ) => Effect.Effect - ) => State - ): Effect.Effect< - Channel.Channel< - OutElem | OutElem1, - unknown, - OutErr2 | OutErr3, - unknown, - OutDone2 | OutDone3, - unknown, - Env | Env1 - >, - never, - Env | Env1 - > => { - const onDecision = ( - decision: MergeDecision.MergeDecision< - Env | Env1, - Err2, - Done2, - OutErr2 | OutErr3, - OutDone2 | OutDone3 - > - ): Effect.Effect< - Channel.Channel< - OutElem | OutElem1, - unknown, - OutErr2 | OutErr3, - unknown, - OutDone2 | OutDone3, - unknown, - Env | Env1 - > - > => { - const op = decision as mergeDecision.Primitive - if (op._tag === MergeDecisionOpCodes.OP_DONE) { - return Effect.succeed( - core.fromEffect( - Effect.zipRight( - Fiber.interrupt(fiber), - op.effect - ) - ) - ) - } - return Effect.map( - Fiber.await(fiber), - Exit.match({ - onFailure: (cause) => core.fromEffect(op.f(Exit.failCause(cause))), - onSuccess: Either.match({ - onLeft: (done) => core.fromEffect(op.f(Exit.succeed(done))), - onRight: (elem) => zipRight(core.write(elem), go(single(op.f))) - }) - }) - ) - } - - return Exit.match(exit, { - onFailure: (cause) => onDecision(done(Exit.failCause(cause))), - onSuccess: Either.match({ - onLeft: (z) => onDecision(done(Exit.succeed(z))), - onRight: (elem) => - Effect.succeed( - core.flatMap(core.write(elem), () => - core.flatMap( - core.fromEffect(Effect.forkIn(Effect.interruptible(pull), scope)), - (leftFiber) => go(both(leftFiber, fiber)) - )) - ) - }) - }) - } - - const go = ( - state: State - ): Channel.Channel< + ): Effect.Effect< + Channel.Channel< OutElem | OutElem1, unknown, OutErr2 | OutErr3, @@ -1729,107 +1640,157 @@ export const mergeWith = dual< OutDone2 | OutDone3, unknown, Env | Env1 - > => { - switch (state._tag) { - case MergeStateOpCodes.OP_BOTH_RUNNING: { - const leftJoin = Effect.interruptible(Fiber.join(state.left)) - const rightJoin = Effect.interruptible(Fiber.join(state.right)) - return unwrap( - Effect.raceWith(leftJoin, rightJoin, { - onSelfDone: (leftExit, rf) => - Effect.zipRight( - Fiber.interrupt(rf), - handleSide(leftExit, state.right, pullL)( - options.onSelfDone, - mergeState.BothRunning, - (f) => mergeState.LeftDone(f) - ) - ), - onOtherDone: (rightExit, lf) => - Effect.zipRight( - Fiber.interrupt(lf), - handleSide(rightExit, state.left, pullR)( - options.onOtherDone as ( - ex: Exit.Exit - ) => MergeDecision.MergeDecision< - Env1 | Env, - OutErr, - OutDone, - OutErr2 | OutErr3, - OutDone2 | OutDone3 - >, - (left, right) => mergeState.BothRunning(right, left), - (f) => mergeState.RightDone(f) - ) - ) - }) - ) - } - case MergeStateOpCodes.OP_LEFT_DONE: { - return unwrap( - Effect.map( - Effect.exit(pullR), - Exit.match({ - onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), - onSuccess: Either.match({ - onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), - onRight: (elem) => - core.flatMap( - core.write(elem), - () => go(mergeState.LeftDone(state.f)) - ) - }) - }) - ) - ) - } - case MergeStateOpCodes.OP_RIGHT_DONE: { - return unwrap( - Effect.map( - Effect.exit(pullL), - Exit.match({ - onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), - onSuccess: Either.match({ - onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), - onRight: (elem) => - core.flatMap( - core.write(elem), - () => go(mergeState.RightDone(state.f)) - ) - }) - }) - ) + > + > { + const op = decision as mergeDecision.Primitive + if (op._tag === MergeDecisionOpCodes.OP_DONE) { + return Effect.succeed( + core.fromEffect( + Effect.zipRight( + Fiber.interrupt(fiber), + op.effect ) - } - } + ) + ) } + return Effect.map( + Fiber.await(fiber), + Exit.match({ + onFailure: (cause) => core.fromEffect(op.f(Exit.failCause(cause))), + onSuccess: Either.match({ + onLeft: (done) => core.fromEffect(op.f(Exit.succeed(done))), + onRight: (elem) => zipRight(core.write(elem), go(single(op.f))) + }) + }) + ) + } - return pipe( - core.fromEffect( - Effect.zipWith( - Effect.forkIn(Effect.interruptible(pullL), scope), - Effect.forkIn(Effect.interruptible(pullR), scope), - (left, right): State => - mergeState.BothRunning< - Env | Env1, - OutErr, - OutErr1, - OutErr2 | OutErr3, - OutElem | OutElem1, - OutDone, - OutDone1, - OutDone2 | OutDone3 - >(left, right) + return Exit.match(exit, { + onFailure: (cause) => onDecision(done(Exit.failCause(cause))), + onSuccess: Either.match({ + onLeft: (z) => onDecision(done(Exit.succeed(z))), + onRight: (elem) => + Effect.succeed( + core.flatMap(core.write(elem), () => + core.flatMap( + core.fromEffect(Effect.forkIn(Effect.interruptible(pull), scope)), + (leftFiber) => go(both(leftFiber, fiber)) + )) ) - ), - core.flatMap(go), - core.embedInput(input) + }) + }) + } + } + + function go( + state: State + ): Channel.Channel< + OutElem | OutElem1, + unknown, + OutErr2 | OutErr3, + unknown, + OutDone2 | OutDone3, + unknown, + Env | Env1 + > { + switch (state._tag) { + case MergeStateOpCodes.OP_BOTH_RUNNING: { + const leftJoin = Effect.interruptible(Fiber.join(state.left)) + const rightJoin = Effect.interruptible(Fiber.join(state.right)) + return unwrap( + Effect.raceWith(leftJoin, rightJoin, { + onSelfDone: (leftExit, rf) => + Effect.zipRight( + Fiber.interrupt(rf), + handleSide(leftExit, state.right, pullL)( + options.onSelfDone, + mergeState.BothRunning, + (f) => mergeState.LeftDone(f) + ) + ), + onOtherDone: (rightExit, lf) => + Effect.zipRight( + Fiber.interrupt(lf), + handleSide(rightExit, state.left, pullR)( + options.onOtherDone as ( + ex: Exit.Exit + ) => MergeDecision.MergeDecision< + Env1 | Env, + OutErr, + OutDone, + OutErr2 | OutErr3, + OutDone2 | OutDone3 + >, + (left, right) => mergeState.BothRunning(right, left), + (f) => mergeState.RightDone(f) + ) + ) + }) ) } - ) + case MergeStateOpCodes.OP_LEFT_DONE: { + return unwrap( + Effect.map( + Effect.exit(pullR), + Exit.match({ + onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), + onSuccess: Either.match({ + onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), + onRight: (elem) => + core.flatMap( + core.write(elem), + () => go(mergeState.LeftDone(state.f)) + ) + }) + }) + ) + ) + } + case MergeStateOpCodes.OP_RIGHT_DONE: { + return unwrap( + Effect.map( + Effect.exit(pullL), + Exit.match({ + onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), + onSuccess: Either.match({ + onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), + onRight: (elem) => + core.flatMap( + core.write(elem), + () => go(mergeState.RightDone(state.f)) + ) + }) + }) + ) + ) + } + } } - ) - )) + + return core.fromEffect( + Effect.zipWith( + Effect.forkIn(Effect.interruptible(pullL), scope), + Effect.forkIn(Effect.interruptible(pullR), scope), + (left, right): State => + mergeState.BothRunning< + Env | Env1, + OutErr, + OutErr1, + OutErr2 | OutErr3, + OutElem | OutElem1, + OutDone, + OutDone1, + OutDone2 | OutDone3 + >(left, right) + ) + ).pipe( + core.flatMap(go), + core.embedInput(input) + ) + }) + } + return unwrapScopedWith(merge) +}) /** @internal */ export const never: Channel.Channel = core.fromEffect( @@ -2003,7 +1964,9 @@ export const provideLayer = dual< self: Channel.Channel, layer: Layer.Layer ): Channel.Channel => - unwrapScoped(Effect.map(Layer.build(layer), (env) => core.provideContext(self, env)))) + unwrapScopedWith((scope) => + Effect.map(Layer.buildWithScope(layer, scope), (context) => core.provideContext(self, context)) + )) /** @internal */ export const mapInputContext = dual< @@ -2052,18 +2015,22 @@ export const repeated = ( /** @internal */ export const run = ( self: Channel.Channel -): Effect.Effect> => Effect.scoped(executor.runScoped(self)) +): Effect.Effect => Effect.scopedWith((scope) => executor.runIn(self, scope)) /** @internal */ export const runCollect = ( self: Channel.Channel -): Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Exclude> => - executor.run(core.collectElements(self)) +): Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Env> => run(core.collectElements(self)) /** @internal */ export const runDrain = ( self: Channel.Channel -): Effect.Effect> => executor.run(drain(self)) +): Effect.Effect => run(drain(self)) + +/** @internal */ +export const runScoped = ( + self: Channel.Channel +): Effect.Effect => Effect.scopeWith((scope) => executor.runIn(self, scope)) /** @internal */ export const scoped = ( @@ -2082,6 +2049,12 @@ export const scoped = ( ) ) +/** @internal */ +export const scopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect +): Channel.Channel => + unwrapScoped(Effect.map(Effect.scope, (scope) => core.flatMap(core.fromEffect(f(scope)), core.write))) + /** @internal */ export const service = >( tag: T @@ -2210,16 +2183,43 @@ export const toPubSub = ( export const toPull = ( self: Channel.Channel ): Effect.Effect, OutErr, Env>, never, Env | Scope.Scope> => - Effect.map( - Effect.acquireRelease( - Effect.sync(() => new executor.ChannelExecutor(self, void 0, identity)), - (exec, exit) => { - const finalize = exec.close(exit) - return finalize === undefined ? Effect.void : finalize - } + Effect.flatMap(Effect.scope, (scope) => toPullIn(self, scope)) + +/** @internal */ +export const toPullIn = dual< + (scope: Scope.Scope) => ( + self: Channel.Channel + ) => Effect.Effect, OutErr, Env>, never, Env>, + ( + self: Channel.Channel, + scope: Scope.Scope + ) => Effect.Effect, OutErr, Env>, never, Env> +>(2, ( + self: Channel.Channel, + scope: Scope.Scope +) => + Effect.zip( + Effect.sync(() => new executor.ChannelExecutor(self, void 0, identity)), + Effect.runtime() + ).pipe( + Effect.tap(([executor, runtime]) => + Scope.addFinalizerExit(scope, (exit) => { + const finalizer = executor.close(exit) + return finalizer !== undefined + ? Effect.provide(finalizer, runtime) + : Effect.void + }) ), - (exec) => Effect.suspend(() => interpretToPull(exec.run() as ChannelState.ChannelState, exec)) - ) + Effect.uninterruptible, + Effect.map(([executor]) => + Effect.suspend(() => + interpretToPull( + executor.run() as ChannelState.ChannelState, + executor + ) + ) + ) + )) /** @internal */ const interpretToPull = ( @@ -2289,6 +2289,16 @@ export const unwrapScoped = d ) +/** @internal */ +export const unwrapScopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +): Channel.Channel => + core.concatAllWith( + scopedWith(f), + (d, _) => d, + (d, _) => d + ) + /** @internal */ export const updateService = dual< >( diff --git a/packages/effect/src/internal/channel/channelExecutor.ts b/packages/effect/src/internal/channel/channelExecutor.ts index 8e568807350..3c18384483d 100644 --- a/packages/effect/src/internal/channel/channelExecutor.ts +++ b/packages/effect/src/internal/channel/channelExecutor.ts @@ -7,7 +7,7 @@ import * as Effect from "../../Effect.js" import * as ExecutionStrategy from "../../ExecutionStrategy.js" import * as Exit from "../../Exit.js" import * as Fiber from "../../Fiber.js" -import { identity, pipe } from "../../Function.js" +import { dual, identity, pipe } from "../../Function.js" import * as Option from "../../Option.js" import * as Scope from "../../Scope.js" import type * as UpstreamPullStrategy from "../../UpstreamPullStrategy.js" @@ -1092,14 +1092,18 @@ export const readUpstream = ( } /** @internal */ -export const run = ( - self: Channel.Channel -): Effect.Effect> => pipe(runScoped(self), Effect.scoped) - -/** @internal */ -export const runScoped = ( - self: Channel.Channel -): Effect.Effect => { +export const runIn = dual< + (scope: Scope.Scope) => ( + self: Channel.Channel + ) => Effect.Effect, + ( + self: Channel.Channel, + scope: Scope.Scope + ) => Effect.Effect +>(2, ( + self: Channel.Channel, + scope: Scope.Scope +) => { const run = ( channelDeferred: Deferred.Deferred, scopeDeferred: Deferred.Deferred, @@ -1109,8 +1113,7 @@ export const runScoped = ( Effect.sync(() => new ChannelExecutor(self, void 0, identity)), (exec) => Effect.suspend(() => - pipe( - runScopedInterpret(exec.run() as ChannelState.ChannelState, exec), + runScopedInterpret(exec.run() as ChannelState.ChannelState, exec).pipe( Effect.intoDeferred(channelDeferred), Effect.zipRight(Deferred.await(channelDeferred)), Effect.zipLeft(Deferred.await(scopeDeferred)) @@ -1128,33 +1131,34 @@ export const runScoped = ( } ) return Effect.uninterruptibleMask((restore) => - Effect.flatMap(Effect.scope, (parent) => - pipe( - Effect.all([ - Scope.fork(parent, ExecutionStrategy.sequential), - Deferred.make(), - Deferred.make() - ]), - Effect.flatMap(([child, channelDeferred, scopeDeferred]) => - pipe( - Effect.forkScoped(restore(run(channelDeferred, scopeDeferred, child))), - Effect.flatMap((fiber) => - pipe( - Scope.addFinalizer( - parent, - Deferred.succeed(scopeDeferred, void 0).pipe( - Effect.zipRight(Effect.yieldNow()) + Effect.all([ + Scope.fork(scope, ExecutionStrategy.sequential), + Deferred.make(), + Deferred.make() + ]).pipe(Effect.flatMap(([child, channelDeferred, scopeDeferred]) => + restore(run(channelDeferred, scopeDeferred, child)).pipe( + Effect.forkIn(scope), + Effect.flatMap((fiber) => + scope.addFinalizer(() => + Deferred.isDone(channelDeferred).pipe( + Effect.flatMap((isDone) => + isDone + ? Deferred.succeed(scopeDeferred, void 0).pipe( + Effect.zipRight(Fiber.await(fiber)), + Effect.zipRight(Fiber.inheritAll(fiber)) + ) + : Deferred.succeed(scopeDeferred, void 0).pipe( + Effect.zipRight(Fiber.interrupt(fiber)), + Effect.zipRight(Fiber.inheritAll(fiber)) ) - ), - Effect.zipRight(restore(Deferred.await(channelDeferred))), - Effect.zipLeft(Fiber.inheritAll(fiber)) ) ) - ) + ).pipe(Effect.zipRight(restore(Deferred.await(channelDeferred)))) ) - )) + ) + )) ) -} +}) /** @internal */ const runScopedInterpret = ( diff --git a/packages/effect/src/internal/fiberRuntime.ts b/packages/effect/src/internal/fiberRuntime.ts index 6538571f4b5..8b884b125b3 100644 --- a/packages/effect/src/internal/fiberRuntime.ts +++ b/packages/effect/src/internal/fiberRuntime.ts @@ -2833,6 +2833,11 @@ export const scopeWith = ( f: (scope: Scope.Scope) => Effect.Effect ): Effect.Effect => core.flatMap(scopeTag, f) +/** @internal */ +export const scopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect +): Effect.Effect => core.flatMap(scopeMake(), (scope) => core.onExit(f(scope), (exit) => scope.close(exit))) + /* @internal */ export const scopedEffect = (effect: Effect.Effect): Effect.Effect> => core.flatMap(scopeMake(), (scope) => scopeUse(effect, scope)) @@ -2876,12 +2881,7 @@ export const using = dual< self: Effect.Effect, use: (a: A) => Effect.Effect ) => Effect.Effect | R2> ->(2, (self, use) => - core.acquireUseRelease( - scopeMake(), - (scope) => core.flatMap(scopeExtend(self, scope), use), - (scope, exit) => core.scopeClose(scope, exit) - )) +>(2, (self, use) => scopedWith((scope) => core.flatMap(scopeExtend(self, scope), use))) /** @internal */ export const validate = dual< diff --git a/packages/effect/src/internal/groupBy.ts b/packages/effect/src/internal/groupBy.ts index d5af7b9c43a..01abb7f923d 100644 --- a/packages/effect/src/internal/groupBy.ts +++ b/packages/effect/src/internal/groupBy.ts @@ -12,6 +12,7 @@ import { pipeArguments } from "../Pipeable.js" import { hasProperty, type Predicate } from "../Predicate.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" +import * as Scope from "../Scope.js" import type * as Stream from "../Stream.js" import type * as Take from "../Take.js" import type { NoInfer } from "../Types.js" @@ -487,29 +488,19 @@ export const groupByKey = dual< ) ) }) - return make(stream.unwrapScoped( - pipe( - Effect.sync(() => new Map>>()), - Effect.flatMap((map) => - pipe( - Effect.acquireRelease( - Queue.unbounded>], E>>(), - (queue) => Queue.shutdown(queue) - ), - Effect.flatMap((queue) => - pipe( - self, - stream.toChannel, - core.pipeTo(loop(map, queue)), - channel.drain, - channelExecutor.runScoped, - Effect.forkScoped, - Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))) - ) - ) - ) + return make(stream.unwrapScopedWith((scope) => + Effect.gen(function*() { + const map = new Map>>() + const queue = yield* Queue.unbounded>], E>>() + yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) + return yield* stream.toChannel(self).pipe( + core.pipeTo(loop(map, queue)), + channel.drain, + channelExecutor.runIn(scope), + Effect.forkIn(scope), + Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))) ) - ) + }) )) } ) diff --git a/packages/effect/src/internal/layer.ts b/packages/effect/src/internal/layer.ts index cd04afc9734..5af1737d2d3 100644 --- a/packages/effect/src/internal/layer.ts +++ b/packages/effect/src/internal/layer.ts @@ -1274,14 +1274,11 @@ const provideSomeLayer = dual< layer: Layer.Layer ) => Effect.Effect> >(2, (self, layer) => - core.acquireUseRelease( - fiberRuntime.scopeMake(), - (scope) => - core.flatMap( - buildWithScope(layer, scope), - (context) => core.provideSomeContext(self, context) - ), - (scope, exit) => core.scopeClose(scope, exit) + fiberRuntime.scopedWith((scope) => + core.flatMap( + buildWithScope(layer, scope), + (context) => core.provideSomeContext(self, context) + ) )) const provideSomeRuntime = dual< diff --git a/packages/effect/src/internal/mailbox.ts b/packages/effect/src/internal/mailbox.ts index 8b16a1bf4ed..17b5cdefc79 100644 --- a/packages/effect/src/internal/mailbox.ts +++ b/packages/effect/src/internal/mailbox.ts @@ -547,10 +547,12 @@ export const fromStream: { onFailure: (cause: Cause) => mailbox.failCause(cause), onDone: () => mailbox.end }) - return stream.toChannel(self).pipe( - coreChannel.pipeTo(writer), - channelExecutor.runScoped, - circular.forkScoped + return channel.unwrapScopedWith((scope) => + stream.toChannel(self).pipe( + coreChannel.pipeTo(writer), + channelExecutor.runIn(scope), + circular.forkIn(scope) + ) ) } )) diff --git a/packages/effect/src/internal/sink.ts b/packages/effect/src/internal/sink.ts index 75995fbe6b2..7f65a19b82e 100644 --- a/packages/effect/src/internal/sink.ts +++ b/packages/effect/src/internal/sink.ts @@ -19,7 +19,7 @@ import { hasProperty, type Predicate, type Refinement } from "../Predicate.js" import * as PubSub from "../PubSub.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" -import type * as Scope from "../Scope.js" +import * as Scope from "../Scope.js" import type * as Sink from "../Sink.js" import * as channel from "./channel.js" import * as mergeDecision from "./channel/mergeDecision.js" @@ -1656,38 +1656,43 @@ export const raceWith = dual< readonly capacity?: number | undefined } ): Sink.Sink => { - const scoped = Effect.gen(function*($) { - const pubsub = yield* $( - PubSub.bounded, Exit.Exit>>(options?.capacity ?? 16) - ) - const channel1 = yield* $(channel.fromPubSubScoped(pubsub)) - const channel2 = yield* $(channel.fromPubSubScoped(pubsub)) - const reader = channel.toPubSub(pubsub) - const writer = pipe( - channel1, - core.pipeTo(toChannel(self)), - channel.mergeWith({ - other: pipe(channel2, core.pipeTo(toChannel(options.other))), - onSelfDone: options.onSelfDone, - onOtherDone: options.onOtherDone - }) - ) - const racedChannel: Channel.Channel< - Chunk.Chunk, - Chunk.Chunk, - E | E2, - never, - A3 | A4, - unknown, - R | R2 - > = channel.mergeWith(reader, { - other: writer, - onSelfDone: (_) => mergeDecision.Await((exit) => Effect.suspend(() => exit)), - onOtherDone: (done) => mergeDecision.Done(Effect.suspend(() => done)) + function race(scope: Scope.Scope) { + return Effect.gen(function*() { + const pubsub = yield* PubSub.bounded< + Either.Either, Exit.Exit> + >(options?.capacity ?? 16) + const subscription1 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) + const subscription2 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) + const reader = channel.toPubSub(pubsub) + const writer = channel.fromQueue(subscription1).pipe( + core.pipeTo(toChannel(self)), + channel.zipLeft(core.fromEffect(Queue.shutdown(subscription1))), + channel.mergeWith({ + other: channel.fromQueue(subscription2).pipe( + core.pipeTo(toChannel(options.other)), + channel.zipLeft(core.fromEffect(Queue.shutdown(subscription2))) + ), + onSelfDone: options.onSelfDone, + onOtherDone: options.onOtherDone + }) + ) + const racedChannel = channel.mergeWith(reader, { + other: writer, + onSelfDone: () => mergeDecision.Await(identity), + onOtherDone: (exit) => mergeDecision.Done(exit) + }) as Channel.Channel< + Chunk.Chunk, + Chunk.Chunk, + E | E2, + never, + A3 | A4, + unknown, + R | R2 + > + return new SinkImpl(racedChannel) }) - return new SinkImpl(racedChannel) - }) - return unwrapScoped(scoped) + } + return unwrapScopedWith(race) } ) @@ -1923,9 +1928,24 @@ export const unwrap = ( /** @internal */ export const unwrapScoped = ( effect: Effect.Effect, E, R> -): Sink.Sink> => { - return new SinkImpl(channel.unwrapScoped(pipe(effect, Effect.map((sink) => toChannel(sink))))) -} +): Sink.Sink> => + new SinkImpl( + channel.unwrapScoped(effect.pipe( + Effect.map((sink) => toChannel(sink)) + )) + ) + +/** @internal */ +export const unwrapScopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +): Sink.Sink => + new SinkImpl( + channel.unwrapScopedWith((scope) => + f(scope).pipe( + Effect.map((sink) => toChannel(sink)) + ) + ) + ) /** @internal */ export const withDuration = ( diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index db42b2bf2f8..295b68a351a 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -39,7 +39,6 @@ import type { NoInfer, TupleOf } from "../Types.js" import * as channel from "./channel.js" import * as channelExecutor from "./channel/channelExecutor.js" import * as MergeStrategy from "./channel/mergeStrategy.js" -import * as singleProducerAsyncInput from "./channel/singleProducerAsyncInput.js" import * as core from "./core-stream.js" import * as doNotation from "./doNotation.js" import { RingBuffer } from "./ringBuffer.js" @@ -191,8 +190,7 @@ export const aggregateWithinEither = dual< Ref.make(false), Ref.make(false) ]) - return pipe( - fromEffect(layer), + return fromEffect(layer).pipe( flatMap(([handoff, sinkEndReason, sinkLeftovers, scheduleDriver, consumed, endAfterEmit]) => { const handoffProducer: Channel.Channel, never, E | E2, unknown, unknown> = core .readWithCause({ @@ -299,7 +297,7 @@ export const aggregateWithinEither = dual< handoffConsumer, channel.pipeToOrFail(_sink.toChannel(sink)), core.collectElements, - channelExecutor.run, + channel.run, Effect.forkIn(scope) ) ) @@ -421,31 +419,21 @@ export const aggregateWithinEither = dual< }) ) } - return unwrapScoped( - pipe( - self, - toChannel, - core.pipeTo(handoffProducer), - channelExecutor.run, - Effect.forkScoped, + return unwrapScopedWith((scope) => + core.pipeTo(toChannel(self), handoffProducer).pipe( + channel.run, + Effect.forkIn(scope), Effect.zipRight( - pipe( - handoffConsumer, - channel.pipeToOrFail(_sink.toChannel(sink)), + channel.pipeToOrFail(handoffConsumer, _sink.toChannel(sink)).pipe( core.collectElements, - channelExecutor.run, - Effect.forkScoped, + channel.run, + Effect.forkIn(scope), Effect.flatMap((sinkFiber) => - pipe( - Effect.forkScoped(timeout(Option.none())), - Effect.flatMap((scheduleFiber) => - pipe( - Effect.scope, - Effect.map((scope) => - new StreamImpl( - scheduledAggregator(sinkFiber, scheduleFiber, scope) - ) - ) + timeout(Option.none()).pipe( + Effect.forkIn(scope), + Effect.map((scheduleFiber) => + new StreamImpl( + scheduledAggregator(sinkFiber, scheduleFiber, scope) ) ) ) @@ -1209,7 +1197,7 @@ const bufferSignal = ( pipe( bufferChannel, core.pipeTo(producer(queue, ref)), - channelExecutor.runScoped, + channel.runScoped, Effect.forkScoped ) ), @@ -1530,19 +1518,20 @@ export const combine = dual< pullRight: Effect.Effect, R4> ) => Effect.Effect>, never, R5> ): Stream.Stream => { - const producer = ( + function producer( handoff: Handoff.Handoff>>, latch: Handoff.Handoff - ): Channel.Channel => - pipe( - core.fromEffect(Handoff.take(latch)), + ): Channel.Channel { + return core.fromEffect(Handoff.take(latch)).pipe( channel.zipRight(core.readWithCause({ onInput: (input) => core.flatMap( - core.fromEffect(pipe( - handoff, - Handoff.offer>>(Exit.succeed(input)) - )), + core.fromEffect( + Handoff.offer>>( + handoff, + Exit.succeed(input) + ) + ), () => producer(handoff, latch) ), onFailure: (cause) => @@ -1564,41 +1553,41 @@ export const combine = dual< ) })) ) + } return new StreamImpl( - channel.unwrapScoped( - Effect.gen(function*($) { - const left = yield* $(Handoff.make>>()) - const right = yield* $(Handoff.make>>()) - const latchL = yield* $(Handoff.make()) - const latchR = yield* $(Handoff.make()) - yield* $( - toChannel(self), - channel.concatMap(channel.writeChunk), - core.pipeTo(producer(left, latchL)), - channelExecutor.runScoped, - Effect.forkScoped - ) - yield* $( - toChannel(that), - channel.concatMap(channel.writeChunk), - core.pipeTo(producer(right, latchR)), - channelExecutor.runScoped, - Effect.forkScoped - ) - const pullLeft = pipe( - latchL, - Handoff.offer(void 0), - // TODO: remove - Effect.zipRight(pipe(Handoff.take(left), Effect.flatMap((exit) => Effect.suspend(() => exit)))) - ) - const pullRight = pipe( - latchR, - Handoff.offer(void 0), - // TODO: remove - Effect.zipRight(pipe(Handoff.take(right), Effect.flatMap((exit) => Effect.suspend(() => exit)))) - ) - return toChannel(unfoldEffect(s, (s) => Effect.flatMap(f(s, pullLeft, pullRight), unsome))) - }) + channel.unwrapScopedWith((scope) => + Effect.all([ + Handoff.make>>(), + Handoff.make>>(), + Handoff.make(), + Handoff.make() + ]).pipe( + Effect.tap(([left, _, latchL]) => + toChannel(self).pipe( + channel.concatMap(channel.writeChunk), + core.pipeTo(producer(left, latchL)), + channelExecutor.runIn(scope), + Effect.forkIn(scope) + ) + ), + Effect.tap(([, right, _, rightL]) => + toChannel(that).pipe( + channel.concatMap(channel.writeChunk), + core.pipeTo(producer(right, rightL)), + channelExecutor.runIn(scope), + Effect.forkIn(scope) + ) + ), + Effect.map(([left, right, latchL, latchR]) => { + const pullLeft = Handoff.offer(latchL, void 0).pipe( + Effect.zipRight(Handoff.take(left).pipe(Effect.flatMap(identity))) + ) + const pullRight = Handoff.offer(latchR, void 0).pipe( + Effect.zipRight(Handoff.take(right).pipe(Effect.flatMap(identity))) + ) + return toChannel(unfoldEffect(s, (s) => Effect.flatMap(f(s, pullLeft, pullRight), unsome))) + }) + ) ) ) }) @@ -1661,53 +1650,35 @@ export const combineChunks = dual< }) ) return new StreamImpl( - pipe( + channel.unwrapScopedWith((scope) => Effect.all([ Handoff.make>(), Handoff.make>(), Handoff.make(), Handoff.make() - ]), - Effect.tap(([left, _, latchL]) => - pipe( - toChannel(self), - core.pipeTo(producer(left, latchL)), - channelExecutor.runScoped, - Effect.forkScoped - ) - ), - Effect.tap(([_, right, __, latchR]) => - pipe( - toChannel(that), - core.pipeTo(producer(right, latchR)), - channelExecutor.runScoped, - Effect.forkScoped - ) - ), - Effect.map(([left, right, latchL, latchR]) => { - const pullLeft = pipe( - latchL, - Handoff.offer(void 0), - Effect.zipRight( - pipe( - Handoff.take(left), - Effect.flatMap(InternalTake.done) - ) + ]).pipe( + Effect.tap(([left, _, latchL]) => + core.pipeTo(toChannel(self), producer(left, latchL)).pipe( + channelExecutor.runIn(scope), + Effect.forkIn(scope) ) - ) - const pullRight = pipe( - latchR, - Handoff.offer(void 0), - Effect.zipRight( - pipe( - Handoff.take(right), - Effect.flatMap(InternalTake.done) - ) + ), + Effect.tap(([_, right, __, latchR]) => + core.pipeTo(toChannel(that), producer(right, latchR)).pipe( + channelExecutor.runIn(scope), + Effect.forkIn(scope) ) - ) - return toChannel(unfoldChunkEffect(s, (s) => Effect.flatMap(f(s, pullLeft, pullRight), unsome))) - }), - channel.unwrapScoped + ), + Effect.map(([left, right, latchL, latchR]) => { + const pullLeft = Handoff.offer(latchL, void 0).pipe( + Effect.zipRight(Handoff.take(left).pipe(Effect.flatMap(InternalTake.done))) + ) + const pullRight = Handoff.offer(latchR, void 0).pipe( + Effect.zipRight(Handoff.take(right).pipe(Effect.flatMap(InternalTake.done))) + ) + return toChannel(unfoldChunkEffect(s, (s) => Effect.flatMap(f(s, pullLeft, pullRight), unsome))) + }) + ) ) ) }) @@ -1809,101 +1780,114 @@ export const crossWith: { export const debounce = dual< (duration: Duration.DurationInput) => (self: Stream.Stream) => Stream.Stream, (self: Stream.Stream, duration: Duration.DurationInput) => Stream.Stream ->(2, (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => - pipe( - singleProducerAsyncInput.make, unknown>(), - Effect.flatMap((input) => - Effect.transplant((grafter) => - pipe( - Handoff.make>(), - Effect.map((handoff) => { - const enqueue = (last: Chunk.Chunk): Effect.Effect< - Channel.Channel, unknown, E, unknown, unknown, unknown> - > => - pipe( - Clock.sleep(duration), - Effect.as(last), - Effect.fork, - grafter, - Effect.map((fiber) => consumer(DebounceState.previous(fiber))) +>( + 2, + (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => + unwrapScopedWith((scope) => + Effect.gen(function*() { + const handoff = yield* Handoff.make>() + + function enqueue(last: Chunk.Chunk): Effect.Effect< + Channel.Channel, unknown, E, unknown, unknown, unknown> + > { + return Clock.sleep(duration).pipe( + Effect.as(last), + Effect.forkIn(scope), + Effect.map((fiber) => consumer(DebounceState.previous(fiber))) + ) + } + + const producer: Channel.Channel, E, E, unknown, unknown> = core.readWithCause({ + onInput: (input: Chunk.Chunk) => + Option.match(Chunk.last(input), { + onNone: () => producer, + onSome: (elem) => + core.fromEffect(Handoff.offer(handoff, HandoffSignal.emit(Chunk.of(elem)))).pipe( + core.flatMap(() => producer) + ) + }), + onFailure: (cause) => + core.fromEffect( + Handoff.offer>(handoff, HandoffSignal.halt(cause)) + ), + onDone: () => + core.fromEffect( + Handoff.offer>( + handoff, + HandoffSignal.end(SinkEndReason.UpstreamEnd) ) - const producer: Channel.Channel, E, E, unknown, unknown> = core - .readWithCause({ - onInput: (input: Chunk.Chunk) => - Option.match(Chunk.last(input), { - onNone: () => producer, - onSome: (last) => - core.flatMap( - core.fromEffect( - Handoff.offer>( - handoff, - HandoffSignal.emit(Chunk.of(last)) - ) - ), - () => producer - ) - }), - onFailure: (cause) => - core.fromEffect( - Handoff.offer>(handoff, HandoffSignal.halt(cause)) - ), - onDone: () => - core.fromEffect( - Handoff.offer>( - handoff, - HandoffSignal.end(SinkEndReason.UpstreamEnd) - ) - ) - }) - const consumer = ( - state: DebounceState.DebounceState - ): Channel.Channel, unknown, E, unknown, unknown, unknown> => { - switch (state._tag) { - case DebounceState.OP_NOT_STARTED: { - return pipe( - Handoff.take(handoff), - Effect.map((signal) => { - switch (signal._tag) { - case HandoffSignal.OP_EMIT: { - return channel.unwrap(enqueue(signal.elements)) - } - case HandoffSignal.OP_HALT: { - return core.failCause(signal.cause) - } - case HandoffSignal.OP_END: { - return core.void - } + ) + }) + + function consumer( + state: DebounceState.DebounceState + ): Channel.Channel, unknown, E, unknown, unknown, unknown> { + switch (state._tag) { + case DebounceState.OP_NOT_STARTED: { + return channel.unwrap( + Handoff.take(handoff).pipe( + Effect.map((signal) => { + switch (signal._tag) { + case HandoffSignal.OP_EMIT: { + return channel.unwrap(enqueue(signal.elements)) } - }), - channel.unwrap - ) - } - case DebounceState.OP_PREVIOUS: { - return channel.unwrap( - Effect.raceWith(Fiber.join(state.fiber), Handoff.take(handoff), { + case HandoffSignal.OP_HALT: { + return core.failCause(signal.cause) + } + case HandoffSignal.OP_END: { + return core.void + } + } + }) + ) + ) + } + case DebounceState.OP_PREVIOUS: { + return channel.unwrap( + Handoff.take(handoff).pipe( + Effect.forkIn(scope), + Effect.flatMap((handoffFiber) => + Effect.raceWith(Fiber.join(state.fiber), Fiber.join(handoffFiber), { onSelfDone: (leftExit, current) => Exit.match(leftExit, { - onFailure: (cause) => pipe(Fiber.interrupt(current), Effect.as(core.failCause(cause))), + onFailure: (cause) => + Fiber.interrupt(current).pipe( + Effect.as(core.failCause(cause)) + ), onSuccess: (chunk) => - Effect.succeed( - pipe(core.write(chunk), core.flatMap(() => consumer(DebounceState.current(current)))) + Fiber.interrupt(current).pipe( + Effect.zipRight(Effect.succeed( + core.write(chunk).pipe( + core.flatMap(() => consumer(DebounceState.current(handoffFiber))) + ) + )) ) }), onOtherDone: (rightExit, previous) => Exit.match(rightExit, { - onFailure: (cause) => pipe(Fiber.interrupt(previous), Effect.as(core.failCause(cause))), + onFailure: (cause) => + Fiber.interrupt(previous).pipe( + Effect.as(core.failCause(cause)) + ), onSuccess: (signal) => { switch (signal._tag) { case HandoffSignal.OP_EMIT: { - return pipe(Fiber.interrupt(previous), Effect.zipRight(enqueue(signal.elements))) + return Fiber.interrupt(previous).pipe( + Effect.zipRight(enqueue(signal.elements)) + ) } case HandoffSignal.OP_HALT: { - return pipe(Fiber.interrupt(previous), Effect.as(core.failCause(signal.cause))) + return Fiber.interrupt(previous).pipe( + Effect.as(core.failCause(signal.cause)) + ) } case HandoffSignal.OP_END: { - return pipe( - Fiber.join(previous), - Effect.map((chunk) => pipe(core.write(chunk), channel.zipRight(core.void))) + return Fiber.join(previous).pipe( + Effect.map((chunk) => + core.write(chunk).pipe( + channel.zipRight(core.void) + ) + ) ) } } @@ -1911,46 +1895,40 @@ export const debounce = dual< }) }) ) - } - case DebounceState.OP_CURRENT: { - return pipe( - Fiber.join(state.fiber), - Effect.map((signal) => { - switch (signal._tag) { - case HandoffSignal.OP_EMIT: { - return channel.unwrap(enqueue(signal.elements)) - } - case HandoffSignal.OP_HALT: { - return core.failCause(signal.cause) - } - case HandoffSignal.OP_END: { - return core.void - } + ) + ) + } + case DebounceState.OP_CURRENT: { + return channel.unwrap( + Fiber.join(state.fiber).pipe( + Effect.map((signal) => { + switch (signal._tag) { + case HandoffSignal.OP_EMIT: { + return channel.unwrap(enqueue(signal.elements)) } - }), - channel.unwrap - ) - } - } + case HandoffSignal.OP_HALT: { + return core.failCause(signal.cause) + } + case HandoffSignal.OP_END: { + return core.void + } + } + }) + ) + ) } - const debounceChannel: Channel.Channel, Chunk.Chunk, E, E, unknown, unknown> = pipe( - channel.fromInput(input), - core.pipeTo(producer), - channelExecutor.run, - Effect.forkScoped, - Effect.as(pipe( - consumer(DebounceState.notStarted), - core.embedInput, unknown>(input as any) - )), - channel.unwrapScoped - ) - return new StreamImpl(pipe(toChannel(self), core.pipeTo(debounceChannel))) - }) - ) - ) - ), - unwrap - )) + } + } + + return scopedWith((scope) => + core.pipeTo(toChannel(self), producer).pipe( + channelExecutor.runIn(scope), + Effect.forkIn(scope) + ) + ).pipe(crossRight(new StreamImpl(consumer(DebounceState.notStarted)))) + }) + ) +) /** @internal */ export const die = (defect: unknown): Stream.Stream => fromEffect(Effect.die(defect)) @@ -2270,22 +2248,16 @@ export const drainFork = dual< self: Stream.Stream, that: Stream.Stream ): Stream.Stream => - pipe( - fromEffect(Deferred.make()), - flatMap((backgroundDied) => - pipe( - scoped( - pipe( - that, - runForEachScoped(() => Effect.void), - Effect.catchAllCause((cause) => Deferred.failCause(backgroundDied, cause)), - Effect.forkScoped - ) - ), - crossRight(pipe(self, interruptWhenDeferred(backgroundDied))) + fromEffect(Deferred.make()).pipe(flatMap((backgroundDied) => + scopedWith((scope) => + toChannel(that).pipe( + channel.drain, + channelExecutor.runIn(scope), + Effect.catchAllCause((cause) => Deferred.failCause(backgroundDied, cause)), + Effect.forkIn(scope) ) - ) - ) + ).pipe(crossRight(interruptWhenDeferred(self, backgroundDied))) + )) ) /** @internal */ @@ -3544,10 +3516,11 @@ export const haltWhen = dual< channel.unwrap ) return new StreamImpl( - pipe( - Effect.forkScoped(effect), - Effect.map((fiber) => pipe(toChannel(self), core.pipeTo(writer(fiber)))), - channel.unwrapScoped + channel.unwrapScopedWith((scope) => + effect.pipe( + Effect.forkIn(scope), + Effect.map((fiber) => toChannel(self).pipe(core.pipeTo(writer(fiber)))) + ) ) ) } @@ -3656,26 +3629,24 @@ export const interleaveWith = dual< ) }) return new StreamImpl( - channel.unwrapScoped( + channel.unwrapScopedWith((scope) => pipe( Handoff.make>(), Effect.zip(Handoff.make>()), Effect.tap(([left]) => - pipe( - toChannel(self), + toChannel(self).pipe( channel.concatMap(channel.writeChunk), core.pipeTo(producer(left)), - channelExecutor.runScoped, - Effect.forkScoped + channelExecutor.runIn(scope), + Effect.forkIn(scope) ) ), Effect.tap(([_, right]) => - pipe( - toChannel(that), + toChannel(that).pipe( channel.concatMap(channel.writeChunk), core.pipeTo(producer(right)), - channelExecutor.runScoped, - Effect.forkScoped + channelExecutor.runIn(scope), + Effect.forkIn(scope) ) ), Effect.map(([left, right]) => { @@ -4895,10 +4866,11 @@ export const provideLayer = dual< layer: Layer.Layer ): Stream.Stream => new StreamImpl( - channel.unwrapScoped(pipe( - Layer.build(layer), - Effect.map((env) => pipe(toChannel(self), core.provideContext(env))) - )) + channel.unwrapScopedWith((scope) => + Layer.buildWithScope(layer, scope).pipe( + Effect.map((env) => pipe(toChannel(self), core.provideContext(env))) + ) + ) ) ) @@ -5508,44 +5480,50 @@ export const retry = dual< export const run = dual< ( sink: Sink.Sink - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, sink: Sink.Sink - ) => Effect.Effect> + ) => Effect.Effect >(2, ( self: Stream.Stream, sink: Sink.Sink -): Effect.Effect> => - pipe(toChannel(self), channel.pipeToOrFail(_sink.toChannel(sink)), channel.runDrain)) +): Effect.Effect => + toChannel(self).pipe( + channel.pipeToOrFail(_sink.toChannel(sink)), + channel.runDrain + )) /** @internal */ export const runCollect = ( self: Stream.Stream -): Effect.Effect, E, Exclude> => pipe(self, run(_sink.collectAll())) +): Effect.Effect, E, R> => run(self, _sink.collectAll()) /** @internal */ -export const runCount = (self: Stream.Stream): Effect.Effect> => - pipe(self, run(_sink.count)) +export const runCount = (self: Stream.Stream): Effect.Effect => run(self, _sink.count) /** @internal */ -export const runDrain = (self: Stream.Stream): Effect.Effect> => - pipe(self, run(_sink.drain)) +export const runDrain = (self: Stream.Stream): Effect.Effect => run(self, _sink.drain) /** @internal */ export const runFold = dual< ( s: S, f: (s: S, a: A) => S - ) => (self: Stream.Stream) => Effect.Effect>, - (self: Stream.Stream, s: S, f: (s: S, a: A) => S) => Effect.Effect> ->( - 3, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, s: S, f: (s: S, a: A) => S - ): Effect.Effect> => pipe(self, runFoldWhileScoped(s, constTrue, f), Effect.scoped) + ) => Effect.Effect +>( + 3, + (self: Stream.Stream, s: S, f: (s: S, a: A) => S): Effect.Effect => + runFoldWhile(self, s, constTrue, f) ) /** @internal */ @@ -5553,18 +5531,17 @@ export const runFoldEffect = dual< ( s: S, f: (s: S, a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect>, + ) => (self: Stream.Stream) => Effect.Effect, ( self: Stream.Stream, s: S, f: (s: S, a: A) => Effect.Effect - ) => Effect.Effect> + ) => Effect.Effect >(3, ( self: Stream.Stream, s: S, f: (s: S, a: A) => Effect.Effect -): Effect.Effect> => - pipe(self, runFoldWhileScopedEffect(s, constTrue, f), Effect.scoped)) +): Effect.Effect => runFoldWhileEffect(self, s, constTrue, f)) /** @internal */ export const runFoldScoped = dual< @@ -5599,19 +5576,21 @@ export const runFoldWhile = dual< s: S, cont: Predicate, f: (s: S, a: A) => S - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => S - ) => Effect.Effect> + ) => Effect.Effect >(4, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => S -): Effect.Effect> => pipe(self, runFoldWhileScoped(s, cont, f), Effect.scoped)) +): Effect.Effect => run(self, _sink.fold(s, cont, f))) /** @internal */ export const runFoldWhileEffect = dual< @@ -5619,20 +5598,21 @@ export const runFoldWhileEffect = dual< s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect - ) => Effect.Effect> + ) => Effect.Effect >(4, ( self: Stream.Stream, s: S, cont: Predicate, f: (s: S, a: A) => Effect.Effect -): Effect.Effect> => - pipe(self, runFoldWhileScopedEffect(s, cont, f), Effect.scoped)) +): Effect.Effect => run(self, _sink.foldEffect(s, cont, f))) /** @internal */ export const runFoldWhileScoped = dual< @@ -5678,29 +5658,33 @@ export const runFoldWhileScopedEffect = dual< export const runForEach = dual< ( f: (a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, f: (a: A) => Effect.Effect - ) => Effect.Effect> + ) => Effect.Effect >(2, ( self: Stream.Stream, f: (a: A) => Effect.Effect -): Effect.Effect> => pipe(self, run(_sink.forEach(f)))) +): Effect.Effect => run(self, _sink.forEach(f))) /** @internal */ export const runForEachChunk = dual< ( f: (a: Chunk.Chunk) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, f: (a: Chunk.Chunk) => Effect.Effect - ) => Effect.Effect> + ) => Effect.Effect >(2, ( self: Stream.Stream, f: (a: Chunk.Chunk) => Effect.Effect -): Effect.Effect> => pipe(self, run(_sink.forEachChunk(f)))) +): Effect.Effect => run(self, _sink.forEachChunk(f))) /** @internal */ export const runForEachChunkScoped = dual< @@ -5734,15 +5718,17 @@ export const runForEachScoped = dual< export const runForEachWhile = dual< ( f: (a: A) => Effect.Effect - ) => (self: Stream.Stream) => Effect.Effect>, + ) => ( + self: Stream.Stream + ) => Effect.Effect, ( self: Stream.Stream, f: (a: A) => Effect.Effect - ) => Effect.Effect> + ) => Effect.Effect >(2, ( self: Stream.Stream, f: (a: A) => Effect.Effect -): Effect.Effect> => pipe(self, run(_sink.forEachWhile(f)))) +): Effect.Effect => run(self, _sink.forEachWhile(f))) /** @internal */ export const runForEachWhileScoped = dual< @@ -5761,7 +5747,7 @@ export const runForEachWhileScoped = dual< /** @internal */ export const runHead = ( self: Stream.Stream -): Effect.Effect, E, Exclude> => pipe(self, run(_sink.head())) +): Effect.Effect, E, R> => run(self, _sink.head()) /** @internal */ export const runIntoPubSub = dual< @@ -5837,7 +5823,7 @@ export const runIntoQueueElementsScoped = dual< return pipe( core.pipeTo(toChannel(self), writer), channel.drain, - channelExecutor.runScoped, + channel.runScoped, Effect.asVoid ) }) @@ -5865,7 +5851,7 @@ export const runIntoQueueScoped = dual< core.pipeTo(toChannel(self), writer), channel.mapOutEffect((take) => Queue.offer(queue, take)), channel.drain, - channelExecutor.runScoped, + channel.runScoped, Effect.asVoid ) }) @@ -5873,7 +5859,7 @@ export const runIntoQueueScoped = dual< /** @internal */ export const runLast = ( self: Stream.Stream -): Effect.Effect, E, Exclude> => pipe(self, run(_sink.last())) +): Effect.Effect, E, R> => run(self, _sink.last()) /** @internal */ export const runScoped = dual< @@ -5892,12 +5878,11 @@ export const runScoped = dual< toChannel(self), channel.pipeToOrFail(_sink.toChannel(sink)), channel.drain, - channelExecutor.runScoped + channel.runScoped )) /** @internal */ -export const runSum = (self: Stream.Stream): Effect.Effect> => - pipe(self, run(_sink.sum)) +export const runSum = (self: Stream.Stream): Effect.Effect => run(self, _sink.sum) /** @internal */ export const scan = dual< @@ -6085,6 +6070,16 @@ export const scoped = ( ): Stream.Stream> => new StreamImpl(channel.ensuring(channel.scoped(pipe(effect, Effect.map(Chunk.of))), Effect.void)) +/** @internal */ +export const scopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect +): Stream.Stream => + new StreamImpl(channel.scopedWith((scope) => + f(scope).pipe( + Effect.map(Chunk.of) + ) + )) + /** @internal */ export const some = (self: Stream.Stream, E, R>): Stream.Stream, R> => pipe(self, mapError(Option.some), someOrFail(() => Option.none())) @@ -7279,6 +7274,11 @@ export const unwrapScoped = ( effect: Effect.Effect, E, R> ): Stream.Stream | R2> => flatten(scoped(effect)) +/** @internal */ +export const unwrapScopedWith = ( + f: (scope: Scope.Scope) => Effect.Effect, E, R> +): Stream.Stream => flatten(scopedWith((scope) => f(scope))) + /** @internal */ export const updateService = dual< >( diff --git a/packages/effect/test/Stream/scoping.test.ts b/packages/effect/test/Stream/scoping.test.ts index 082158d6385..d7c8ea813d3 100644 --- a/packages/effect/test/Stream/scoping.test.ts +++ b/packages/effect/test/Stream/scoping.test.ts @@ -1,3 +1,4 @@ +import * as Array from "effect/Array" import * as Cause from "effect/Cause" import * as Chunk from "effect/Chunk" import * as Deferred from "effect/Deferred" @@ -6,87 +7,92 @@ import * as Either from "effect/Either" import * as Exit from "effect/Exit" import * as Fiber from "effect/Fiber" import * as FiberId from "effect/FiberId" -import { pipe } from "effect/Function" import * as Option from "effect/Option" import * as Ref from "effect/Ref" +import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" import * as it from "effect/test/utils/extend" import { assert, describe } from "vitest" describe("Stream", () => { it.effect("acquireRelease - simple example", () => - Effect.gen(function*($) { - const ref = yield* $(Ref.make(false)) - const stream = pipe( - Stream.acquireRelease( - Effect.succeed(Chunk.range(0, 2)), - () => Ref.set(ref, true) - ), - Stream.flatMap(Stream.fromIterable) - ) - const result = yield* $(Stream.runCollect(stream)) - const released = yield* $(Ref.get(ref)) + Effect.gen(function*() { + const ref = yield* Ref.make(false) + const stream = Stream.acquireRelease( + Effect.succeed(Chunk.range(0, 2)), + () => Ref.set(ref, true) + ).pipe(Stream.flatMap(Stream.fromIterable)) + const result = yield* Stream.runCollect(stream) + const released = yield* Ref.get(ref) assert.isTrue(released) - assert.deepStrictEqual(Array.from(result), [0, 1, 2]) + assert.deepStrictEqual(Chunk.toArray(result), [0, 1, 2]) })) it.effect("acquireRelease - short circuits", () => - Effect.gen(function*($) { - const ref = yield* $(Ref.make(false)) - const stream = pipe( - Stream.acquireRelease( - Effect.succeed(Chunk.range(0, 2)), - () => Ref.set(ref, true) - ), + Effect.gen(function*() { + const ref = yield* Ref.make(false) + const stream = Stream.acquireRelease( + Effect.succeed(Chunk.range(0, 2)), + () => Ref.set(ref, true) + ).pipe( Stream.flatMap(Stream.fromIterable), Stream.take(2) ) - const result = yield* $(Stream.runCollect(stream)) - const released = yield* $(Ref.get(ref)) + const result = yield* Stream.runCollect(stream) + const released = yield* Ref.get(ref) assert.isTrue(released) - assert.deepStrictEqual(Array.from(result), [0, 1]) + assert.deepStrictEqual(Chunk.toArray(result), [0, 1]) })) it.effect("acquireRelease - no acquisition when short circuiting", () => - Effect.gen(function*($) { - const ref = yield* $(Ref.make(false)) - const stream = pipe( - Stream.make(1), - Stream.concat(Stream.acquireRelease(Ref.set(ref, true), () => Effect.void)), + Effect.gen(function*() { + const ref = yield* Ref.make(false) + const stream = Stream.make(1).pipe( + Stream.concat( + Stream.acquireRelease( + Ref.set(ref, true), + () => Effect.void + ) + ), Stream.take(0) ) - yield* $(Stream.runDrain(stream)) - const result = yield* $(Ref.get(ref)) + yield* Stream.runDrain(stream) + const result = yield* Ref.get(ref) assert.isFalse(result) })) it.effect("acquireRelease - releases when there are defects", () => - Effect.gen(function*($) { - const ref = yield* $(Ref.make(false)) - yield* $( - Stream.acquireRelease(Effect.void, () => Ref.set(ref, true)), + Effect.gen(function*() { + const ref = yield* Ref.make(false) + yield* Stream.acquireRelease( + Effect.void, + () => Ref.set(ref, true) + ).pipe( Stream.flatMap(() => Stream.fromEffect(Effect.dieMessage("boom"))), Stream.runDrain, Effect.exit ) - const result = yield* $(Ref.get(ref)) + const result = yield* Ref.get(ref) assert.isTrue(result) })) it.effect("acquireRelease - flatMap associativity does not effect lifetime", () => - Effect.gen(function*($) { - const leftAssoc = yield* $( - Stream.acquireRelease(Ref.make(true), (ref) => Ref.set(ref, false)), + Effect.gen(function*() { + const leftAssoc = yield* Stream.acquireRelease( + Ref.make(true), + (ref) => Ref.set(ref, false) + ).pipe( Stream.flatMap(Stream.succeed), Stream.flatMap((ref) => Stream.fromEffect(Ref.get(ref))), Stream.runCollect, Effect.map(Chunk.head) ) - const rightAssoc = yield* $( - Stream.acquireRelease(Ref.make(true), (ref) => Ref.set(ref, false)), + const rightAssoc = yield* Stream.acquireRelease( + Ref.make(true), + (ref) => Ref.set(ref, false) + ).pipe( Stream.flatMap((ref) => - pipe( - Stream.succeed(ref), + Stream.succeed(ref).pipe( Stream.flatMap((ref) => Stream.fromEffect(Ref.get(ref))) ) ), @@ -98,9 +104,11 @@ describe("Stream", () => { })) it.effect("acquireRelease - propagates errors", () => - Effect.gen(function*($) { - const result = yield* $( - Stream.acquireRelease(Effect.void, () => Effect.dieMessage("die")), + Effect.gen(function*() { + const result = yield* Stream.acquireRelease( + Effect.void, + () => Effect.dieMessage("die") + ).pipe( Stream.runCollect, Effect.exit ) @@ -108,25 +116,23 @@ describe("Stream", () => { })) it.effect("ensuring", () => - Effect.gen(function*($) { - const ref = yield* $(Ref.make(Chunk.empty())) - yield* $( - Stream.acquireRelease( - Ref.update(ref, Chunk.append("Acquire")), - () => Ref.update(ref, Chunk.append("Release")) - ), + Effect.gen(function*() { + const ref = yield* Ref.make(Chunk.empty()) + yield* Stream.acquireRelease( + Ref.update(ref, Chunk.append("Acquire")), + () => Ref.update(ref, Chunk.append("Release")) + ).pipe( Stream.crossRight(Stream.fromEffect(Ref.update(ref, Chunk.append("Use")))), Stream.ensuring(Ref.update(ref, Chunk.append("Ensuring"))), Stream.runDrain ) - const result = yield* $(Ref.get(ref)) - assert.deepStrictEqual(Array.from(result), ["Acquire", "Use", "Release", "Ensuring"]) + const result = yield* Ref.get(ref) + assert.deepStrictEqual(Chunk.toArray(result), ["Acquire", "Use", "Release", "Ensuring"]) })) it.effect("scoped - preserves the failure of an effect", () => - Effect.gen(function*($) { - const result = yield* $( - Stream.scoped(Effect.fail("fail")), + Effect.gen(function*() { + const result = yield* Stream.scoped(Effect.fail("fail")).pipe( Stream.runCollect, Effect.either ) @@ -134,42 +140,60 @@ describe("Stream", () => { })) it.effect("scoped - preserves the interruptibility of an effect", () => - Effect.gen(function*($) { - const isInterruptible1 = yield* $( - Stream.scoped(Effect.checkInterruptible(Effect.succeed)), - Stream.runHead - ) - const isInterruptible2 = yield* $( - Stream.scoped(Effect.uninterruptible(Effect.checkInterruptible(Effect.succeed))), + Effect.gen(function*() { + const isInterruptible1 = yield* Effect.checkInterruptible(Effect.succeed).pipe( + Stream.scoped, Stream.runHead ) + const isInterruptible2 = yield* Effect.uninterruptible( + Effect.checkInterruptible(Effect.succeed) + ).pipe(Stream.scoped, Stream.runHead) assert.deepStrictEqual(isInterruptible1, Option.some(true)) assert.deepStrictEqual(isInterruptible2, Option.some(false)) })) it.it("unwrapScoped", async () => { const awaiter = Deferred.unsafeMake(FiberId.none) - const program = Effect.gen(function*($) { + const program = Effect.gen(function*() { const stream = (deferred: Deferred.Deferred, ref: Ref.Ref>) => - pipe( - Effect.acquireRelease( - Ref.update(ref, (array) => [...array, "acquire outer"]), - () => Ref.update(ref, (array) => [...array, "release outer"]) - ), + Effect.acquireRelease( + Ref.update(ref, (array) => [...array, "acquire outer"]), + () => Ref.update(ref, (array) => [...array, "release outer"]) + ).pipe( Effect.zipRight(Deferred.succeed(deferred, void 0)), Effect.zipRight(Deferred.await(awaiter)), Effect.zipRight(Effect.succeed(Stream.make(1, 2, 3))), Stream.unwrapScoped ) - const ref = yield* $(Ref.make>([])) - const deferred = yield* $(Deferred.make()) - const fiber = yield* $(stream(deferred, ref), Stream.runDrain, Effect.fork) - yield* $(Deferred.await(deferred)) - yield* $(Fiber.interrupt(fiber)) - return yield* $(Ref.get(ref)) + const ref = yield* Ref.make>([]) + const deferred = yield* Deferred.make() + const fiber = yield* stream(deferred, ref).pipe(Stream.runDrain, Effect.fork) + yield* Deferred.await(deferred) + yield* Fiber.interrupt(fiber) + return yield* Ref.get(ref) }) const result = await Effect.runPromise(program) await Effect.runPromise(Deferred.succeed(awaiter, void 0)) assert.deepStrictEqual(result, ["acquire outer", "release outer"]) }) + + it.effect("preserves the scope", () => + Effect.gen(function*() { + const ref = yield* Ref.make(Array.empty()) + const scope = yield* Scope.make() + yield* Stream.make(1, 2).pipe( + Stream.flatMap((i) => + Stream.fromEffect(Effect.acquireRelease( + Ref.update(ref, Array.append(`Acquire: ${i}`)), + () => Ref.update(ref, Array.append(`Release: ${i}`)) + )), { bufferSize: 1, concurrency: "unbounded" }), + Stream.runDrain, + Scope.extend(scope) + ) + const before = yield* Ref.getAndSet(ref, Array.empty()) + yield* Scope.close(scope, Exit.void) + const after = yield* Ref.get(ref) + assert.deepStrictEqual(before, ["Acquire: 1", "Acquire: 2"]) + assert.deepStrictEqual(after, ["Release: 2", "Release: 1"]) + })) })