diff --git a/.changeset/calm-jars-occur.md b/.changeset/calm-jars-occur.md new file mode 100644 index 00000000000..7310a5b6a89 --- /dev/null +++ b/.changeset/calm-jars-occur.md @@ -0,0 +1,5 @@ +--- +"@effect/platform": patch +--- + +support context propogation in platform workers diff --git a/packages/platform-browser/test/Worker.test.ts b/packages/platform-browser/test/Worker.test.ts index f23be0d0fe0..0a7c44b8fda 100644 --- a/packages/platform-browser/test/Worker.test.ts +++ b/packages/platform-browser/test/Worker.test.ts @@ -1,9 +1,9 @@ import * as EffectWorker from "@effect/platform-browser/Worker" import "@vitest/web-worker" -import { Cause, Chunk, Effect, Exit, Stream } from "effect" +import { Cause, Chunk, Effect, Exit, Option, Stream } from "effect" import { assert, describe, it } from "vitest" import type { WorkerMessage } from "./fixtures/schema.js" -import { GetPersonById, GetUserById, Person, SetName, User } from "./fixtures/schema.js" +import { GetPersonById, GetSpan, GetUserById, Person, SetName, User } from "./fixtures/schema.js" const runPromiseExit = (effect: Effect.Effect) => Effect.runPromiseExit(effect).then((exit) => { @@ -69,6 +69,28 @@ describe.sequential("Worker", () => { runPromiseExit )) + it("tracing", () => + Effect.gen(function*(_) { + const parentSpan = yield* _(Effect.currentSpan) + const pool = yield* _(EffectWorker.makePoolSerialized({ + spawn: () => new globalThis.Worker(new URL("./fixtures/serializedWorker.ts", import.meta.url)), + size: 1 + })) + const span = yield* _(pool.executeEffect(new GetSpan())) + assert.deepStrictEqual( + span.parent, + Option.some({ + traceId: parentSpan.traceId, + spanId: parentSpan.spanId + }) + ) + }).pipe( + Effect.withSpan("test"), + Effect.scoped, + Effect.provide(EffectWorker.layerManager), + runPromiseExit + )) + it("SharedWorker", () => Effect.gen(function*(_) { const pool = yield* _(EffectWorker.makePool({ diff --git a/packages/platform-browser/test/fixtures/schema.ts b/packages/platform-browser/test/fixtures/schema.ts index 3422ba37f8e..e38f7f21641 100644 --- a/packages/platform-browser/test/fixtures/schema.ts +++ b/packages/platform-browser/test/fixtures/schema.ts @@ -27,5 +27,20 @@ export class SetName extends Schema.TaggedRequest()("SetName", Schema.n } } -export type WorkerMessage = GetUserById | GetPersonById | SetName -export const WorkerMessage = Schema.union(GetUserById, GetPersonById, SetName) +export class GetSpan extends Schema.TaggedRequest()( + "GetSpan", + Schema.never, + Schema.struct({ + name: Schema.string, + traceId: Schema.string, + spanId: Schema.string, + parent: Schema.option(Schema.struct({ + traceId: Schema.string, + spanId: Schema.string + })) + }), + {} +) {} + +export const WorkerMessage = Schema.union(GetUserById, GetPersonById, SetName, GetSpan) +export type WorkerMessage = Schema.Schema.To diff --git a/packages/platform-browser/test/fixtures/serializedWorker.ts b/packages/platform-browser/test/fixtures/serializedWorker.ts index cb4313e274a..12186922b47 100644 --- a/packages/platform-browser/test/fixtures/serializedWorker.ts +++ b/packages/platform-browser/test/fixtures/serializedWorker.ts @@ -1,5 +1,5 @@ import * as Runner from "@effect/platform-browser/WorkerRunner" -import { Context, Effect, Layer, Stream } from "effect" +import { Context, Effect, Layer, Option, Stream } from "effect" import { Person, User, WorkerMessage } from "./schema.js" interface Name { @@ -14,7 +14,20 @@ const WorkerLive = Runner.layerSerialized(WorkerMessage, { new Person({ id: req.id, name: "ing" }) ), GetUserById: (req) => Effect.map(Name, (name) => new User({ id: req.id, name })), - SetName: (req) => Layer.succeed(Name, req.name) + SetName: (req) => Layer.succeed(Name, req.name), + GetSpan: (_) => + Effect.gen(function*(_) { + const span = yield* _(Effect.currentSpan, Effect.orDie) + return { + traceId: span.traceId, + spanId: span.spanId, + name: span.name, + parent: Option.map(span.parent, (span) => ({ + traceId: span.traceId, + spanId: span.spanId + })) + } + }).pipe(Effect.withSpan("GetSpan")) }) .pipe( Layer.provide(Runner.layerPlatform) diff --git a/packages/platform/src/Worker.ts b/packages/platform/src/Worker.ts index 8d772c44b59..4bced7a0e83 100644 --- a/packages/platform/src/Worker.ts +++ b/packages/platform/src/Worker.ts @@ -10,10 +10,12 @@ import type * as Effect from "effect/Effect" import type * as Fiber from "effect/Fiber" import type { LazyArg } from "effect/Function" import type * as Layer from "effect/Layer" +import type * as Option from "effect/Option" import type * as Pool from "effect/Pool" import type * as Queue from "effect/Queue" import type * as Scope from "effect/Scope" import type * as Stream from "effect/Stream" +import type * as Tracer from "effect/Tracer" import * as internal from "./internal/worker.js" import type { WorkerError } from "./WorkerError.js" @@ -98,7 +100,15 @@ export declare namespace Worker { * @since 1.0.0 * @category models */ - export type Request = readonly [id: number, data: 0, I] | readonly [id: number, interrupt: 1] + export type Request = + | readonly [id: number, data: 0, I, trace?: Span] + | readonly [id: number, interrupt: 1] + + /** + * @since 1.0.0 + * @category models + */ + export type Span = readonly [traceId: string, spanId: string, sampled: boolean] /** * @since 1.0.0 @@ -150,8 +160,8 @@ export declare namespace WorkerPool { * @since 1.0.0 */ export interface WorkerQueue { - readonly offer: (id: number, item: I) => Effect.Effect - readonly take: Effect.Effect + readonly offer: (id: number, item: I, span: Option.Option) => Effect.Effect + readonly take: Effect.Effect]> readonly shutdown: Effect.Effect } diff --git a/packages/platform/src/internal/worker.ts b/packages/platform/src/internal/worker.ts index eb2a01d8d1b..66d13f2ddbc 100644 --- a/packages/platform/src/internal/worker.ts +++ b/packages/platform/src/internal/worker.ts @@ -1,5 +1,6 @@ import * as Schema from "@effect/schema/Schema" import * as Serializable from "@effect/schema/Serializable" +import { Tracer } from "effect" import * as Cause from "effect/Cause" import * as Channel from "effect/Channel" import * as Chunk from "effect/Chunk" @@ -10,6 +11,7 @@ import * as Exit from "effect/Exit" import * as Fiber from "effect/Fiber" import { identity, pipe } from "effect/Function" import * as Layer from "effect/Layer" +import * as Option from "effect/Option" import * as Pool from "effect/Pool" import * as Queue from "effect/Queue" import * as ReadonlyArray from "effect/ReadonlyArray" @@ -23,9 +25,9 @@ import { WorkerError } from "../WorkerError.js" /** @internal */ export const defaultQueue = () => Effect.map( - Queue.unbounded(), + Queue.unbounded]>(), (queue): Worker.WorkerQueue => ({ - offer: (id, item) => Queue.offer(queue, [id, item]), + offer: (id, item, span) => Queue.offer(queue, [id, item, span]), take: Queue.take(queue), shutdown: Queue.shutdown(queue) }) @@ -170,12 +172,16 @@ export const makeManager = Effect.gen(function*(_) { Effect.all([ Effect.sync(() => requestIdCounter++), Queue.unbounded>>(), - Deferred.make() + Deferred.make(), + Effect.map( + Effect.serviceOption(Tracer.ParentSpan), + Option.filter((span): span is Tracer.Span => span._tag === "Span") + ) ]), - ([id, queue, deferred]) => + ([id, queue, deferred, span]) => Effect.suspend(() => { requestMap.set(id, [queue, deferred]) - return outbound.offer(id, request) + return outbound.offer(id, request, span) }) ) @@ -183,7 +189,8 @@ export const makeManager = Effect.gen(function*(_) { [id, , deferred]: [ number, Queue.Queue>>, - Deferred.Deferred + Deferred.Deferred, + Option.Option ], exit: Exit.Exit ) => { @@ -225,16 +232,19 @@ export const makeManager = Effect.gen(function*(_) { yield* _( semaphore.take(1), Effect.zipRight(outbound.take), - Effect.flatMap(([id, request]) => + Effect.flatMap(([id, request, span]) => pipe( Effect.suspend(() => { const result = requestMap.get(id) if (!result) return Effect.unit const transferables = transfers(request) + const spanTuple = Option.getOrUndefined( + Option.map(span, (span) => [span.traceId, span.spanId, span.sampled] as const) + ) return pipe( Effect.flatMap( encode ? encode(request) : Effect.succeed(request), - (payload) => sendQueue.offer([[id, 0, payload], transferables]) + (payload) => sendQueue.offer([[id, 0, payload, spanTuple], transferables]) ), Effect.catchAllCause((cause) => Queue.offer(result[0], Exit.failCause(cause))), Effect.zipRight(Deferred.await(result[1])) diff --git a/packages/platform/src/internal/workerRunner.ts b/packages/platform/src/internal/workerRunner.ts index d52f38bf934..0c8152c5fb9 100644 --- a/packages/platform/src/internal/workerRunner.ts +++ b/packages/platform/src/internal/workerRunner.ts @@ -44,7 +44,7 @@ export const make = ( return Effect.succeed(req) } - return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data]) + return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data, req[3]]) }) : identity, Effect.tap((req) => { @@ -57,7 +57,7 @@ export const make = ( const stream = process(req[2]) - const effect = Effect.isEffect(stream) ? + let effect = Effect.isEffect(stream) ? Effect.matchCauseEffect(stream, { onFailure: (cause) => Either.match(Cause.failureOrCause(cause), { @@ -119,6 +119,17 @@ export const make = ( }) ) + if (req[3]) { + const [traceId, spanId, sampled] = req[3] + effect = Effect.withParentSpan(effect, { + _tag: "ExternalSpan", + traceId, + spanId, + sampled, + context: Context.empty() + }) + } + return pipe( effect, Effect.ensuring(Effect.sync(() => fiberMap.delete(id))),