Skip to content

Commit

Permalink
add concurrency option to RpcServer
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 4, 2025
1 parent f671119 commit a99b8e9
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions packages/rpc/src/RpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
readonly disableSpanPropagation?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly disableClientAcks?: boolean | undefined
readonly concurrency?: number | "unbounded" | undefined
} | undefined
) => Effect.Effect<
RpcServer<Rpcs>,
Expand All @@ -78,14 +79,19 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
readonly disableSpanPropagation?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly disableClientAcks?: boolean | undefined
readonly concurrency?: number | "unbounded" | undefined
}
) {
const tracingEnabled = options?.disableSpanPropagation !== true
const supportsAck = options?.disableClientAcks !== true
const spanPrefix = options?.spanPrefix ?? "RpcServer"
const concurrency = options?.concurrency ?? "unbounded"
const context = yield* Effect.context<Rpc.ToHandler<Rpcs> | Scope.Scope>()
const scope = Context.get(context, Scope.Scope)
const runSemaphore = yield* Effect.makeSemaphore(1)
const concurrencySemaphore = concurrency === "unbounded"
? undefined
: yield* Effect.makeSemaphore(concurrency)
let writeResponse: (response: FromServer<Rpcs>) => Effect.Effect<void> = () => Effect.void

type Client = {
Expand Down Expand Up @@ -197,6 +203,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
}) :
identity,
Effect.locally(FiberRef.currentContext, entry.context),
concurrencySemaphore ? concurrencySemaphore.withPermits(1) : identity,
Effect.forkIn(scope),
Effect.flatMap((fiber) => {
client.fibers.set(request.id, fiber)
Expand Down Expand Up @@ -339,6 +346,7 @@ export const make: <Rpcs extends Rpc.Any>(
| {
readonly disableSpanPropagation?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly concurrency?: number | "unbounded" | undefined
}
| undefined
) => Effect.Effect<
Expand All @@ -350,6 +358,7 @@ export const make: <Rpcs extends Rpc.Any>(
options?: {
readonly disableSpanPropagation?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly concurrency?: number | "unbounded" | undefined
}
) {
const { disconnects, end, requests, send, supportsAck, supportsTransferables } = yield* Protocol
Expand Down Expand Up @@ -593,6 +602,7 @@ export const layer = <Rpcs extends Rpc.Any>(
options?: {
readonly disableSpanPropagation?: boolean | undefined
readonly spanPrefix?: string | undefined
readonly concurrency?: number | "unbounded" | undefined
}
): Layer.Layer<
never,
Expand Down

0 comments on commit a99b8e9

Please sign in to comment.