diff --git a/packages/rpc/src/Rpc.ts b/packages/rpc/src/Rpc.ts index 0e7fc9c9b2c..f6f1cee6950 100644 --- a/packages/rpc/src/Rpc.ts +++ b/packages/rpc/src/Rpc.ts @@ -1,11 +1,14 @@ /** * @since 1.0.0 */ +import type { Headers } from "@effect/platform/Headers" import * as Context_ from "effect/Context" +import type { Effect } from "effect/Effect" import { type Pipeable, pipeArguments } from "effect/Pipeable" import * as Predicate from "effect/Predicate" import * as Schema from "effect/Schema" import type * as AST from "effect/SchemaAST" +import type { Stream } from "effect/Stream" import type * as RpcMiddleware from "./RpcMiddleware.js" /** @@ -42,6 +45,7 @@ export interface Rpc< > extends Pipeable { readonly [TypeId]: TypeId readonly _tag: Tag + readonly key: string readonly payloadSchema: Payload readonly successSchema: Success readonly errorSchema: Error @@ -119,6 +123,8 @@ export interface Rpc< export interface Handler { readonly _: unique symbol readonly tag: Tag + readonly handler: (request: any, headers: Headers) => Effect | Stream + readonly context: Context } /** @@ -128,6 +134,7 @@ export interface Handler { export interface Any extends Pipeable { readonly [TypeId]: TypeId readonly _tag: string + readonly key: string } /** @@ -137,6 +144,7 @@ export interface Any extends Pipeable { export interface AnyWithProps { readonly [TypeId]: TypeId readonly _tag: string + readonly key: string readonly payloadSchema: AnyStructSchema readonly successSchema: Schema.Schema.Any readonly errorSchema: Schema.Schema.All @@ -385,7 +393,11 @@ const makeProto = < readonly errorSchema: Error readonly annotations: Context_.Context readonly middlewares: ReadonlySet -}): Rpc => Object.assign(Object.create(Proto), options) +}): Rpc => { + const self = Object.assign(Object.create(Proto), options) + self.key = `@effect/rpc/Rpc/${options._tag}` + return self +} const constEmptyStruct = Schema.Struct({}) diff --git a/packages/rpc/src/RpcGroup.ts b/packages/rpc/src/RpcGroup.ts index 1569ac06df0..f5cf79da51c 100644 --- a/packages/rpc/src/RpcGroup.ts +++ b/packages/rpc/src/RpcGroup.ts @@ -72,9 +72,8 @@ export interface RpcGroup extends Pipeable { RX > ): Layer.Layer< - Rpc.ToHandler | RpcRegistry, + Rpc.ToHandler, EX, - | Rpc.Middleware | Exclude | ( keyof Handlers extends infer K ? K extends keyof Handlers & string ? Handlers[K] extends (...args: any) => @@ -155,19 +154,19 @@ const RpcGroupProto = { }) }, toLayer(this: RpcGroup, build: Effect.Effect any>>) { - return Layer.scopedDiscard(Effect.gen(this, function*() { + return Layer.scopedContext(Effect.gen(this, function*() { const context = yield* Effect.context() - const registry = yield* RpcRegistry const handlers = Effect.isEffect(build) ? yield* build : build + const contextMap = new Map() for (const [tag, handler] of Object.entries(handlers)) { const rpc = this.requests.get(tag)! - registry.set(tag, { - rpc, + contextMap.set(rpc.key, { handler, context }) } - })).pipe(Layer.provideMerge(RpcRegistry.layer)) + return Context.unsafeMake(contextMap) + })) }, annotate(this: RpcGroup, tag: Context.Tag, value: any) { return makeProto({ @@ -213,27 +212,3 @@ export const make = ->() { - /** - * @since 1.0.0 - */ - static readonly layer = Layer.sync(RpcRegistry, () => new Map()) -} - -/** - * @since 1.0.0 - * @category registry - */ -export interface RpcRegistryEntry { - readonly rpc: Rpc.AnyWithProps - readonly handler: (request: any, headers: Headers) => Effect.Effect | Stream.Stream - readonly context: Context.Context -} diff --git a/packages/rpc/src/RpcServer.ts b/packages/rpc/src/RpcServer.ts index 6ae7d6d957c..4bdadc60440 100644 --- a/packages/rpc/src/RpcServer.ts +++ b/packages/rpc/src/RpcServer.ts @@ -25,7 +25,7 @@ import * as Schema from "effect/Schema" import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" import type * as Rpc from "./Rpc.js" -import * as RpcGroup from "./RpcGroup.js" +import type * as RpcGroup from "./RpcGroup.js" import { constEof, type FromClient, @@ -53,7 +53,7 @@ export interface RpcServer { * @category server */ export const make: ( - _group: RpcGroup.RpcGroup, + group: RpcGroup.RpcGroup, options?: { readonly disableSpanPropagation?: boolean | undefined readonly spanPrefix?: string | undefined @@ -62,10 +62,10 @@ export const make: ( ) => Effect.Effect< RpcServer, never, - Rpc.ToHandler | RpcGroup.RpcRegistry | Scope.Scope + Rpc.ToHandler | Rpc.Middleware | Scope.Scope > = Effect.fnUntraced( function*( - _group: RpcGroup.RpcGroup, + group: RpcGroup.RpcGroup, options?: { readonly disableSpanPropagation?: boolean | undefined readonly spanPrefix?: string | undefined @@ -75,8 +75,8 @@ export const make: ( const tracingEnabled = options?.disableSpanPropagation !== true const supportsAck = options?.disableClientAcks !== true const spanPrefix = options?.spanPrefix ?? "RpcServer" - const registry = yield* RpcGroup.RpcRegistry - const scope = yield* Effect.scope + const context = yield* Effect.context | Scope.Scope>() + const scope = Context.get(context, Scope.Scope) const runSemaphore = yield* Effect.makeSemaphore(1) let writeResponse: (response: FromServer) => Effect.Effect = () => Effect.void @@ -145,8 +145,8 @@ export const make: ( } const handleRequest = (client: Client, request: Request): Effect.Effect => { - const entry = registry.get(request.tag) - if (!entry) { + const rpc = group.requests.get(request.tag) as any as Rpc.AnyWithProps + if (!rpc) { const write = writeResponse({ _tag: "Exit", clientId: client.id, @@ -156,10 +156,12 @@ export const make: ( if (!client.ended || client.fibers.size > 0) return write return Effect.zipRight(write, endClient(client)) } - const isStream = RpcSchema.isStreamSchema(entry.rpc.successSchema) + const entry = context.unsafeMap.get(rpc.key) as Rpc.Handler + const isStream = RpcSchema.isStreamSchema(rpc.successSchema) const result = entry.handler(request.payload, request.headers) return Effect.exit(applyMiddleware( - entry, + rpc, + context, request.payload, request.headers, isStream @@ -282,23 +284,24 @@ export const make: ( ) const applyMiddleware = ( - entry: RpcGroup.RpcRegistryEntry, + rpc: Rpc.AnyWithProps, + context: Context.Context, payload: A, headers: Headers.Headers, handler: Effect.Effect ) => { - if (entry.rpc.middlewares.size === 0) { + if (rpc.middlewares.size === 0) { return handler } const options = { - rpc: entry.rpc, + rpc, payload, headers } - for (const tag of entry.rpc.middlewares) { - const middleware = Context.unsafeGet(entry.context, tag) + for (const tag of rpc.middlewares) { + const middleware = Context.unsafeGet(context, tag) if (tag.optional) { const previous = handler handler = Effect.matchEffect(middleware(options), { @@ -324,16 +327,15 @@ const applyMiddleware = ( export const makeEncoded: ( group: RpcGroup.RpcGroup, options?: - | { readonly disableSpanPropagation?: boolean | undefined; readonly spanPrefix?: string | undefined } + | { + readonly disableSpanPropagation?: boolean | undefined + readonly spanPrefix?: string | undefined + } | undefined ) => Effect.Effect< never, never, - | Rpc.ToHandler - | RpcGroup.RpcRegistry - | Scope.Scope - | Protocol - | RpcSerialization.RpcSerialization + Scope.Scope | Protocol | RpcSerialization.RpcSerialization | Rpc.ToHandler | Rpc.Middleware > = Effect.fnUntraced(function*( group: RpcGroup.RpcGroup, options?: { @@ -343,7 +345,7 @@ export const makeEncoded: ( ) { const { disconnects, end, requests, send, supportsAck } = yield* Protocol const serialization = yield* RpcSerialization.RpcSerialization - const registry = yield* RpcGroup.RpcRegistry + const context = yield* Effect.context | Rpc.Middleware>() const server = yield* make(group, { ...options, @@ -364,36 +366,37 @@ export const makeEncoded: ( readonly decode: (u: unknown) => Effect.Effect, ParseError> readonly encodeChunk: (u: ReadonlyArray) => Effect.Effect, ParseError> readonly encodeExit: (u: unknown) => Effect.Effect, ParseError> + readonly context: Context.Context } const schemasCache = new WeakMap() - const getSchemas = (entry: { - readonly rpc: Rpc.AnyWithProps - }) => { - let schemas = schemasCache.get(entry) + const getSchemas = (rpc: Rpc.AnyWithProps) => { + let schemas = schemasCache.get(rpc) if (!schemas) { - const streamSchemas = RpcSchema.getStreamSchemas(entry.rpc.successSchema.ast) - const failures = new Set([entry.rpc.errorSchema]) + const entry = context.unsafeMap.get(rpc.key) as Rpc.Handler + const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema.ast) + const failures = new Set([rpc.errorSchema]) if (Option.isSome(streamSchemas)) { failures.add(streamSchemas.value.failure) } - for (const middleware of entry.rpc.middlewares) { + for (const middleware of rpc.middlewares) { failures.add(middleware.failure) } schemas = { - decode: Schema.decodeUnknown(entry.rpc.payloadSchema as any), + decode: Schema.decodeUnknown(rpc.payloadSchema as any), encodeChunk: Schema.encodeUnknown( Schema.Array(Option.isSome(streamSchemas) ? streamSchemas.value.success : Schema.Any) ) as any, encodeExit: Schema.encodeUnknown( Schema.Exit({ - success: Option.isSome(streamSchemas) ? Schema.Void : entry.rpc.successSchema, + success: Option.isSome(streamSchemas) ? Schema.Void : rpc.successSchema, failure: Schema.Union(...failures), defect: Schema.Defect }) - ) as any + ) as any, + context: entry.context } - schemasCache.set(entry, schemas) + schemasCache.set(rpc, schemas) } return schemas } @@ -430,21 +433,31 @@ export const makeEncoded: ( case "Chunk": { const schemas = client.schemas.get(response.requestId) if (!schemas) return Effect.void - return handleEncode(client, response.requestId, schemas.encodeChunk(response.values), (values) => ({ - _tag: "Chunk", - requestId: response.requestId, - values - })) + return handleEncode( + client, + response.requestId, + Effect.locally(schemas.encodeChunk(response.values), FiberRef.currentContext, schemas.context), + (values) => ({ + _tag: "Chunk", + requestId: response.requestId, + values + }) + ) } case "Exit": { const schemas = client.schemas.get(response.requestId) if (!schemas) return Effect.void client.schemas.delete(response.requestId) - return handleEncode(client, response.requestId, schemas.encodeExit(response.exit), (exit) => ({ - _tag: "Exit", - requestId: response.requestId, - exit - })) + return handleEncode( + client, + response.requestId, + Effect.locally(schemas.encodeExit(response.exit), FiberRef.currentContext, schemas.context), + (exit) => ({ + _tag: "Exit", + requestId: response.requestId, + exit + }) + ) } case "Defect": { return sendDefect(client, response.defect) @@ -518,8 +531,8 @@ export const makeEncoded: ( switch (request._tag) { case "Request": { const tag = Predicate.hasProperty(request, "tag") ? request.tag as string : "" - const entry = registry.get(tag) - if (!entry) { + const rpc = group.requests.get(tag) + if (!rpc) { yield* sendDefect(client, `Unknown request tag: ${tag}`) break } @@ -527,18 +540,21 @@ export const makeEncoded: ( yield* sendDefect(client, `Invalid request id: ${request.id}`) break } - const schemas = getSchemas(entry) - yield* Effect.matchEffect(schemas.decode(request.payload), { - onFailure: (error) => sendRequestDefect(client, request.id, ArrayFormatter.formatErrorSync(error)), - onSuccess: (payload) => { - client.schemas.set(request.id, schemas) - return server.write(clientId, { - ...request, - payload, - headers: Headers.fromInput(request.headers) - } as any) + const schemas = getSchemas(rpc as any) + yield* Effect.matchEffect( + Effect.locally(schemas.decode(request.payload), FiberRef.currentContext, schemas.context), + { + onFailure: (error) => sendRequestDefect(client, request.id, ArrayFormatter.formatErrorSync(error)), + onSuccess: (payload) => { + client.schemas.set(request.id, schemas) + return server.write(clientId, { + ...request, + payload, + headers: Headers.fromInput(request.headers) + } as any) + } } - }) + ) break } default: { @@ -553,7 +569,12 @@ export const makeEncoded: ( } return yield* Effect.never - }).pipe(Effect.tapErrorCause(Effect.logError)) + }).pipe( + Effect.tapErrorCause(Effect.logError), + Effect.annotateLogs({ + module: "RpcServer" + }) + ) }) /** @@ -566,11 +587,14 @@ export const layer = ( readonly disableSpanPropagation?: boolean | undefined readonly spanPrefix?: string | undefined } -): Layer.Layer | Protocol | RpcSerialization.RpcSerialization> => - Layer.provide( - Layer.scopedDiscard(Effect.forkScoped(makeEncoded(group, options))), - RpcGroup.RpcRegistry.layer - ) as any +): Layer.Layer< + never, + never, + | Protocol + | RpcSerialization.RpcSerialization + | Rpc.ToHandler + | Rpc.Middleware +> => Layer.scopedDiscard(Effect.forkScoped(makeEncoded(group, options))) /** * @since 1.0.0 @@ -858,9 +882,9 @@ export const toHttpApp: ( ) => Effect.Effect< HttpApp.Default, never, - | RpcGroup.RpcRegistry | Scope.Scope | Rpc.ToHandler + | Rpc.Middleware > = Effect.fnUntraced(function*( group: RpcGroup.RpcGroup, options?: { @@ -893,9 +917,9 @@ export const toHttpAppWebsocket: ( ) => Effect.Effect< HttpApp.Default, never, - | RpcGroup.RpcRegistry | Scope.Scope | Rpc.ToHandler + | Rpc.Middleware > = Effect.fnUntraced(function*( group: RpcGroup.RpcGroup, options?: { @@ -925,7 +949,7 @@ export const toWebHandler = ( group: RpcGroup.RpcGroup, options: { readonly layer: Layer.Layer< - Rpc.ToHandler | HttpRouter.HttpRouter.DefaultServices, + Rpc.ToHandler | Rpc.Middleware | HttpRouter.HttpRouter.DefaultServices, LE > readonly disableSpanPropagation?: boolean | undefined @@ -942,10 +966,7 @@ export const toWebHandler = ( readonly handler: (request: globalThis.Request, context?: Context.Context | undefined) => Promise readonly dispose: () => Promise } => { - const runtime = ManagedRuntime.make( - Layer.mergeAll(options.layer, RpcGroup.RpcRegistry.layer, Layer.scope), - options?.memoMap - ) + const runtime = ManagedRuntime.make(Layer.mergeAll(options.layer, Layer.scope), options?.memoMap) let handlerCached: | ((request: globalThis.Request, context?: Context.Context | undefined) => Promise) | undefined