Skip to content

Commit

Permalink
support custom errors in RpcClient
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 6, 2025
1 parent a2ca8c2 commit 9868f3a
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions packages/rpc/src/RpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import * as RpcWorker from "./RpcWorker.js"
* @since 1.0.0
* @category client
*/
export type RpcClient<Rpcs extends Rpc.Any> = {
export type RpcClient<Rpcs extends Rpc.Any, E = never> = {
readonly [Current in Rpcs as Current["_tag"]]: <const AsMailbox extends boolean = false>(
input: Rpc.PayloadConstructor<Current>,
options?: {
Expand All @@ -55,14 +55,14 @@ export type RpcClient<Rpcs extends Rpc.Any> = {
}
) => Rpc.Success<Current> extends Stream.Stream<infer _A, infer _E, infer _R>
? AsMailbox extends true ? Effect.Effect<
Mailbox.ReadonlyMailbox<_A, _E | Rpc.Error<Current>>,
Mailbox.ReadonlyMailbox<_A, _E | Rpc.Error<Current> | E>,
never,
Scope.Scope
>
: Stream.Stream<_A, _E | Rpc.Error<Current>>
: Stream.Stream<_A, _E | Rpc.Error<Current> | E>
: Effect.Effect<
Rpc.Success<Current>,
Rpc.Error<Current>
Rpc.Error<Current> | E
>
}

Expand All @@ -76,32 +76,32 @@ export type FromGroup<Group> = RpcClient<RpcGroup.Rpcs<Group>>
* @since 1.0.0
* @category client
*/
export const makeNoSerialization: <Rpcs extends Rpc.Any>(
export const makeNoSerialization: <Rpcs extends Rpc.Any, E>(
group: RpcGroup.RpcGroup<Rpcs>,
options: {
readonly onFromClient: (
message: FromClient<Rpcs>,
context: Context.Context<never>
) => Effect.Effect<void>
) => Effect.Effect<void, E>
readonly supportsAck?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly generateRequestId?: (() => RequestId) | undefined
readonly disableTracing?: boolean | undefined
}
) => Effect.Effect<
{
readonly client: RpcClient<Rpcs>
readonly client: RpcClient<Rpcs, E>
readonly write: (message: FromServer<Rpcs>) => Effect.Effect<void>
},
never,
Scope.Scope | Rpc.MiddlewareClient<Rpcs>
> = Effect.fnUntraced(function*<Rpcs extends Rpc.Any>(
> = Effect.fnUntraced(function*<Rpcs extends Rpc.Any, E>(
group: RpcGroup.RpcGroup<Rpcs>,
options: {
readonly onFromClient: (
message: FromClient<Rpcs>,
context: Context.Context<never>
) => Effect.Effect<void>
) => Effect.Effect<void, E>
readonly supportsAck?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly generateRequestId?: (() => RequestId) | undefined
Expand Down Expand Up @@ -228,7 +228,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
),
Effect.onInterrupt(() => {
entries.delete(id)
return options.onFromClient({ _tag: "Interrupt", requestId: id }, context)
return Effect.ignore(options.onFromClient({ _tag: "Interrupt", requestId: id }, context))
})
)
})
Expand Down Expand Up @@ -256,7 +256,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
Effect.suspend(() => {
if (!entries.has(id)) return Effect.void
entries.delete(id)
return options.onFromClient({ _tag: "Interrupt", requestId: id }, context)
return Effect.ignore(options.onFromClient({ _tag: "Interrupt", requestId: id }, context))
})
)

Expand All @@ -269,18 +269,20 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
context
})

yield* Effect.flatMap(
middleware({
_tag: "Request",
id,
tag: rpc._tag as Rpc.Tag<Rpcs>,
traceId: span.traceId,
payload,
spanId: span.spanId,
sampled: span.sampled,
headers: Headers.merge(fiber.getFiberRef(currentHeaders), headers)
}),
(request) => options.onFromClient(request, context)
yield* middleware({
_tag: "Request",
id,
tag: rpc._tag as Rpc.Tag<Rpcs>,
traceId: span.traceId,
payload,
spanId: span.spanId,
sampled: span.sampled,
headers: Headers.merge(fiber.getFiberRef(currentHeaders), headers)
}).pipe(
Effect.flatMap(
(request) => options.onFromClient(request, context)
),
Effect.catchAll((error) => mailbox.fail(error))
)

return mailbox
Expand Down

0 comments on commit 9868f3a

Please sign in to comment.