Skip to content

Commit

Permalink
add rpc e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 1, 2025
1 parent 332ccf0 commit 772ba79
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 414 deletions.
12 changes: 9 additions & 3 deletions packages/rpc/src/RpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import * as FiberMap from "effect/FiberMap"
import * as FiberRef from "effect/FiberRef"
import { dual, identity } from "effect/Function"
import { globalValue } from "effect/GlobalValue"
Expand All @@ -32,8 +33,8 @@ import type { Span } from "effect/Tracer"
import type { Mutable } from "effect/Types"
import type * as Rpc from "./Rpc.js"
import type * as RpcGroup from "./RpcGroup.js"
import type { FromClient, FromClientEncoded, FromServer, FromServerEncoded, RequestId } from "./RpcMessage.js"
import { constPing } from "./RpcMessage.js"
import type { FromClient, FromClientEncoded, FromServer, FromServerEncoded } from "./RpcMessage.js"
import { constPing, RequestId } from "./RpcMessage.js"
import * as RpcSchema from "./RpcSchema.js"
import * as RpcSerialization from "./RpcSerialization.js"
import * as RpcWorker from "./RpcWorker.js"
Expand Down Expand Up @@ -470,9 +471,14 @@ export const makeProtocolHttp: <E, R>(
const responses = yield* Mailbox.make<FromServerEncoded>()
const serialization = yield* RpcSerialization.RpcSerialization
const context = yield* Effect.context<Exclude<R, Scope.Scope>>()
const fiberMap = yield* FiberMap.make<RequestId>()
const scope = yield* Effect.scope

const send = (request: FromClientEncoded): Effect.Effect<void> => {
if (request._tag === "Interrupt") {
return FiberMap.remove(fiberMap, RequestId(request.requestId))
}

const parser = serialization.unsafeMake()
if (!serialization.supportsBigInt) transformBigInt(request)

Expand All @@ -492,7 +498,7 @@ export const makeProtocolHttp: <E, R>(
Effect.orDie,
Effect.catchAllDefect((defect) => responses.offer({ _tag: "Defect", defect })),
Effect.scoped,
Effect.forkIn(scope)
request._tag === "Request" ? FiberMap.run(fiberMap, RequestId(request.id)) : Effect.forkIn(scope)
)
}

Expand Down
6 changes: 6 additions & 0 deletions packages/rpc/src/RpcGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ export type HandlersContext<Rpcs extends Rpc.Any, Handlers> = keyof Handlers ext
: never
: never

/**
* @since 1.0.0
* @category groups
*/
export type Rpcs<Group extends RpcGroup<any>> = Group extends RpcGroup<infer R> ? R : never

const RpcGroupProto = {
add(this: RpcGroup<any>, ...rpcs: Array<any>) {
return makeProto({
Expand Down
26 changes: 18 additions & 8 deletions packages/rpc/src/RpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
}) :
identity,
Effect.locally(FiberRef.currentContext, entry.context),
Effect.forkIn(scope),
Effect.interruptible,
Effect.forkIn(scope),
Effect.flatMap((fiber) => {
client.fibers.set(request.id, fiber)
fiber.addObserver(() => {
Expand Down Expand Up @@ -757,11 +757,15 @@ export const makeProtocolWithHttpApp: Effect.Effect<
end: mailbox.end
})

const requestIds: Array<RequestId> = []

try {
const decoded = parser.decode(new Uint8Array(data)) as ReadonlyArray<FromClientEncoded>

for (const message of decoded) {
requests.unsafeOffer([id, message])
if (message._tag === "Request") {
requestIds.push(RequestId(message.id))
}
}
} catch (cause) {
yield* offer(parser.encode(ResponseDefectEncoded(cause)))
Expand All @@ -770,12 +774,18 @@ export const makeProtocolWithHttpApp: Effect.Effect<
requests.unsafeOffer([id, constEof])

return HttpServerResponse.stream(
Stream.ensuring(
Mailbox.toStream(mailbox),
Effect.sync(() => {
clients.delete(id)
disconnects.unsafeOffer(id)
})
Mailbox.toStream(mailbox).pipe(
Stream.ensuringWith((exit) =>
Effect.sync(() => {
clients.delete(id)
disconnects.unsafeOffer(id)
if (Exit.isInterrupted(exit)) {
for (const requestId of requestIds) {
requests.unsafeOffer([id, { _tag: "Interrupt", requestId }])
}
}
})
)
),
{ contentType: serialization.contentType }
)
Expand Down
Loading

0 comments on commit 772ba79

Please sign in to comment.