Skip to content

Commit

Permalink
support context propogation in platform workers (#1870)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jan 8, 2024
1 parent d5a1949 commit 4c90c54
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .changeset/calm-jars-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

support context propogation in platform workers
26 changes: 24 additions & 2 deletions packages/platform-browser/test/Worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as EffectWorker from "@effect/platform-browser/Worker"
import "@vitest/web-worker"
import { Cause, Chunk, Effect, Exit, Stream } from "effect"
import { Cause, Chunk, Effect, Exit, Option, Stream } from "effect"
import { assert, describe, it } from "vitest"
import type { WorkerMessage } from "./fixtures/schema.js"
import { GetPersonById, GetUserById, Person, SetName, User } from "./fixtures/schema.js"
import { GetPersonById, GetSpan, GetUserById, Person, SetName, User } from "./fixtures/schema.js"

const runPromiseExit = <E, A>(effect: Effect.Effect<never, E, A>) =>
Effect.runPromiseExit(effect).then((exit) => {
Expand Down Expand Up @@ -69,6 +69,28 @@ describe.sequential("Worker", () => {
runPromiseExit
))

it("tracing", () =>
Effect.gen(function*(_) {
const parentSpan = yield* _(Effect.currentSpan)
const pool = yield* _(EffectWorker.makePoolSerialized({
spawn: () => new globalThis.Worker(new URL("./fixtures/serializedWorker.ts", import.meta.url)),
size: 1
}))
const span = yield* _(pool.executeEffect(new GetSpan()))
assert.deepStrictEqual(
span.parent,
Option.some({
traceId: parentSpan.traceId,
spanId: parentSpan.spanId
})
)
}).pipe(
Effect.withSpan("test"),
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
))

it("SharedWorker", () =>
Effect.gen(function*(_) {
const pool = yield* _(EffectWorker.makePool<number, never, number>({
Expand Down
19 changes: 17 additions & 2 deletions packages/platform-browser/test/fixtures/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,20 @@ export class SetName extends Schema.TaggedRequest<SetName>()("SetName", Schema.n
}
}

export type WorkerMessage = GetUserById | GetPersonById | SetName
export const WorkerMessage = Schema.union(GetUserById, GetPersonById, SetName)
export class GetSpan extends Schema.TaggedRequest<GetSpan>()(
"GetSpan",
Schema.never,
Schema.struct({
name: Schema.string,
traceId: Schema.string,
spanId: Schema.string,
parent: Schema.option(Schema.struct({
traceId: Schema.string,
spanId: Schema.string
}))
}),
{}
) {}

export const WorkerMessage = Schema.union(GetUserById, GetPersonById, SetName, GetSpan)
export type WorkerMessage = Schema.Schema.To<typeof WorkerMessage>
17 changes: 15 additions & 2 deletions packages/platform-browser/test/fixtures/serializedWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Runner from "@effect/platform-browser/WorkerRunner"
import { Context, Effect, Layer, Stream } from "effect"
import { Context, Effect, Layer, Option, Stream } from "effect"
import { Person, User, WorkerMessage } from "./schema.js"

interface Name {
Expand All @@ -14,7 +14,20 @@ const WorkerLive = Runner.layerSerialized(WorkerMessage, {
new Person({ id: req.id, name: "ing" })
),
GetUserById: (req) => Effect.map(Name, (name) => new User({ id: req.id, name })),
SetName: (req) => Layer.succeed(Name, req.name)
SetName: (req) => Layer.succeed(Name, req.name),
GetSpan: (_) =>
Effect.gen(function*(_) {
const span = yield* _(Effect.currentSpan, Effect.orDie)
return {
traceId: span.traceId,
spanId: span.spanId,
name: span.name,
parent: Option.map(span.parent, (span) => ({
traceId: span.traceId,
spanId: span.spanId
}))
}
}).pipe(Effect.withSpan("GetSpan"))
})
.pipe(
Layer.provide(Runner.layerPlatform)
Expand Down
16 changes: 13 additions & 3 deletions packages/platform/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import type * as Effect from "effect/Effect"
import type * as Fiber from "effect/Fiber"
import type { LazyArg } from "effect/Function"
import type * as Layer from "effect/Layer"
import type * as Option from "effect/Option"
import type * as Pool from "effect/Pool"
import type * as Queue from "effect/Queue"
import type * as Scope from "effect/Scope"
import type * as Stream from "effect/Stream"
import type * as Tracer from "effect/Tracer"
import * as internal from "./internal/worker.js"
import type { WorkerError } from "./WorkerError.js"

Expand Down Expand Up @@ -98,7 +100,15 @@ export declare namespace Worker {
* @since 1.0.0
* @category models
*/
export type Request<I = unknown> = readonly [id: number, data: 0, I] | readonly [id: number, interrupt: 1]
export type Request<I = unknown> =
| readonly [id: number, data: 0, I, trace?: Span]
| readonly [id: number, interrupt: 1]

/**
* @since 1.0.0
* @category models
*/
export type Span = readonly [traceId: string, spanId: string, sampled: boolean]

/**
* @since 1.0.0
Expand Down Expand Up @@ -150,8 +160,8 @@ export declare namespace WorkerPool {
* @since 1.0.0
*/
export interface WorkerQueue<I> {
readonly offer: (id: number, item: I) => Effect.Effect<never, never, void>
readonly take: Effect.Effect<never, never, readonly [id: number, item: I]>
readonly offer: (id: number, item: I, span: Option.Option<Tracer.Span>) => Effect.Effect<never, never, void>
readonly take: Effect.Effect<never, never, readonly [id: number, item: I, span: Option.Option<Tracer.Span>]>
readonly shutdown: Effect.Effect<never, never, void>
}

Expand Down
26 changes: 18 additions & 8 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Schema from "@effect/schema/Schema"
import * as Serializable from "@effect/schema/Serializable"
import { Tracer } from "effect"
import * as Cause from "effect/Cause"
import * as Channel from "effect/Channel"
import * as Chunk from "effect/Chunk"
Expand All @@ -10,6 +11,7 @@ import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import { identity, pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Pool from "effect/Pool"
import * as Queue from "effect/Queue"
import * as ReadonlyArray from "effect/ReadonlyArray"
Expand All @@ -23,9 +25,9 @@ import { WorkerError } from "../WorkerError.js"
/** @internal */
export const defaultQueue = <I>() =>
Effect.map(
Queue.unbounded<readonly [id: number, item: I]>(),
Queue.unbounded<readonly [id: number, item: I, span: Option.Option<Tracer.Span>]>(),
(queue): Worker.WorkerQueue<I> => ({
offer: (id, item) => Queue.offer(queue, [id, item]),
offer: (id, item, span) => Queue.offer(queue, [id, item, span]),
take: Queue.take(queue),
shutdown: Queue.shutdown(queue)
})
Expand Down Expand Up @@ -170,20 +172,25 @@ export const makeManager = Effect.gen(function*(_) {
Effect.all([
Effect.sync(() => requestIdCounter++),
Queue.unbounded<Exit.Exit<E | WorkerError, ReadonlyArray<O>>>(),
Deferred.make<never, void>()
Deferred.make<never, void>(),
Effect.map(
Effect.serviceOption(Tracer.ParentSpan),
Option.filter((span): span is Tracer.Span => span._tag === "Span")
)
]),
([id, queue, deferred]) =>
([id, queue, deferred, span]) =>
Effect.suspend(() => {
requestMap.set(id, [queue, deferred])
return outbound.offer(id, request)
return outbound.offer(id, request, span)
})
)

const executeRelease = (
[id, , deferred]: [
number,
Queue.Queue<Exit.Exit<E | WorkerError, ReadonlyArray<O>>>,
Deferred.Deferred<never, void>
Deferred.Deferred<never, void>,
Option.Option<Tracer.Span>
],
exit: Exit.Exit<unknown, unknown>
) => {
Expand Down Expand Up @@ -225,16 +232,19 @@ export const makeManager = Effect.gen(function*(_) {
yield* _(
semaphore.take(1),
Effect.zipRight(outbound.take),
Effect.flatMap(([id, request]) =>
Effect.flatMap(([id, request, span]) =>
pipe(
Effect.suspend(() => {
const result = requestMap.get(id)
if (!result) return Effect.unit
const transferables = transfers(request)
const spanTuple = Option.getOrUndefined(
Option.map(span, (span) => [span.traceId, span.spanId, span.sampled] as const)
)
return pipe(
Effect.flatMap(
encode ? encode(request) : Effect.succeed(request),
(payload) => sendQueue.offer([[id, 0, payload], transferables])
(payload) => sendQueue.offer([[id, 0, payload, spanTuple], transferables])
),
Effect.catchAllCause((cause) => Queue.offer(result[0], Exit.failCause(cause))),
Effect.zipRight(Deferred.await(result[1]))
Expand Down
15 changes: 13 additions & 2 deletions packages/platform/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export const make = <I, R, E, O>(
return Effect.succeed(req)
}

return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data])
return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data, req[3]])
}) :
identity,
Effect.tap((req) => {
Expand All @@ -57,7 +57,7 @@ export const make = <I, R, E, O>(

const stream = process(req[2])

const effect = Effect.isEffect(stream) ?
let effect = Effect.isEffect(stream) ?
Effect.matchCauseEffect(stream, {
onFailure: (cause) =>
Either.match(Cause.failureOrCause(cause), {
Expand Down Expand Up @@ -119,6 +119,17 @@ export const make = <I, R, E, O>(
})
)

if (req[3]) {
const [traceId, spanId, sampled] = req[3]
effect = Effect.withParentSpan(effect, {
_tag: "ExternalSpan",
traceId,
spanId,
sampled,
context: Context.empty()
})
}

return pipe(
effect,
Effect.ensuring(Effect.sync(() => fiberMap.delete(id))),
Expand Down

0 comments on commit 4c90c54

Please sign in to comment.