Skip to content

Commit

Permalink
test backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 2, 2025
1 parent 9af800d commit 9832d0d
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 17 deletions.
5 changes: 4 additions & 1 deletion packages/rpc/src/Rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ export type PayloadConstructor<R> = R extends Rpc<
infer _Success,
infer _Error,
infer _Middleware
> ? Schema.Struct.Constructor<_Payload["fields"]>
> ?
Schema.Struct.Constructor<_Payload["fields"]> extends infer T ?
[keyof T] extends [never] ? void | {} : Schema.Simplify<T>
: never
: never

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/rpc/src/RpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
readonly headers?: Headers.Input | undefined
}) => {
const isStream = RpcSchema.isStreamSchema(rpc.successSchema)
const payloadDecoded = rpc.payloadSchema.make(payload)
const payloadDecoded = payload ? rpc.payloadSchema.make(payload) : {}
const headers = options?.headers ? Headers.fromInput(options.headers) : Headers.empty
if (!isStream) {
return Effect.useSpan(`${spanPrefix}.${rpc._tag}`, (span) => onEffectRequest(rpc, span, payloadDecoded, headers))
Expand Down
20 changes: 10 additions & 10 deletions packages/rpc/test/RpcServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe("RpcServer", () => {
// http ndjson
const HttpNdjsonServer = HttpRouter.Default.serve().pipe(
Layer.provide(RpcLive),
Layer.provide(RpcServer.layerProtocolHttp({ path: "/rpc" }))
Layer.provideMerge(RpcServer.layerProtocolHttp({ path: "/rpc" }))
)
const HttpNdjsonClient = UsersClient.layer.pipe(
Layer.provide(
Expand All @@ -24,22 +24,22 @@ describe("RpcServer", () => {
e2eSuite(
"e2e http ndjson",
HttpNdjsonClient.pipe(
Layer.provide(HttpNdjsonServer),
Layer.provideMerge(HttpNdjsonServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerNdjson])
)
)
e2eSuite(
"e2e http msgpack",
HttpNdjsonClient.pipe(
Layer.provide(HttpNdjsonServer),
Layer.provideMerge(HttpNdjsonServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
)
)

// websocket
const HttpWsServer = HttpRouter.Default.serve().pipe(
Layer.provide(RpcLive),
Layer.provide(RpcServer.layerProtocolWebsocket({ path: "/rpc" }))
Layer.provideMerge(RpcServer.layerProtocolWebsocket({ path: "/rpc" }))
)
const HttpWsClient = UsersClient.layer.pipe(
Layer.provide(RpcClient.layerProtocolSocket),
Expand All @@ -54,28 +54,28 @@ describe("RpcServer", () => {
e2eSuite(
"e2e ws ndjson",
HttpWsClient.pipe(
Layer.provide(HttpWsServer),
Layer.provideMerge(HttpWsServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerNdjson])
)
)
e2eSuite(
"e2e ws json",
HttpWsClient.pipe(
Layer.provide(HttpWsServer),
Layer.provideMerge(HttpWsServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerJson])
)
)
e2eSuite(
"e2e ws msgpack",
HttpWsClient.pipe(
Layer.provide(HttpWsServer),
Layer.provideMerge(HttpWsServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
)
)

// tcp
const TcpServer = RpcLive.pipe(
Layer.provide(RpcServer.layerProtocolSocketServer),
Layer.provideMerge(RpcServer.layerProtocolSocketServer),
Layer.provideMerge(NodeSocketServer.layer({ port: 0 }))
)
const TcpClient = UsersClient.layer.pipe(
Expand All @@ -91,14 +91,14 @@ describe("RpcServer", () => {
e2eSuite(
"e2e tcp ndjson",
TcpClient.pipe(
Layer.provide(TcpServer),
Layer.provideMerge(TcpServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerNdjson])
)
)
e2eSuite(
"e2e tcp msgpack",
TcpClient.pipe(
Layer.provide(TcpServer),
Layer.provideMerge(TcpServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
)
)
Expand Down
14 changes: 11 additions & 3 deletions packages/rpc/test/e2e.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { RpcClient } from "@effect/rpc"
import { RpcClient, RpcServer } from "@effect/rpc"
import { assert, describe, it } from "@effect/vitest"
import type { Layer } from "effect"
import { Effect, Stream } from "effect"
import { User, UsersClient } from "./fixtures/schemas.js"

export const e2eSuite = <E>(name: string, layer: Layer.Layer<UsersClient, E>) => {
export const e2eSuite = <E>(name: string, layer: Layer.Layer<UsersClient | RpcServer.Protocol, E>) => {
describe(name, () => {
it.effect("should get user", () =>
Effect.gen(function*() {
Expand Down Expand Up @@ -42,8 +42,16 @@ export const e2eSuite = <E>(name: string, layer: Layer.Layer<UsersClient, E>) =>
yield* Effect.sleep(1000)
assert.lengthOf(users, 5)

const interrupts = yield* client.GetInterrupts({})
const interrupts = yield* client.GetInterrupts()
assert.equal(interrupts, 1)

const { supportsAck } = yield* RpcServer.Protocol

// test backpressure
if (supportsAck) {
const emits = yield* client.GetEmits()
assert.equal(emits, 5)
}
}).pipe(Effect.provide(layer)), { timeout: 20000 })
})
}
12 changes: 10 additions & 2 deletions packages/rpc/test/fixtures/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export const UserRpcs = RpcGroup
StreamUsers,
Rpc.make("GetInterrupts", {
success: Schema.Number
}),
Rpc.make("GetEmits", {
success: Schema.Number
})
)
.middleware(AuthMiddleware)
Expand All @@ -55,10 +58,11 @@ const AuthLive = Layer.succeed(

const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
let interrupts = 0
let emits = 0
return {
GetUser: (_) => CurrentUser,
StreamUsers: Effect.fnUntraced(function*(req) {
const mailbox = yield* Mailbox.make<User>()
const mailbox = yield* Mailbox.make<User>(0)

yield* Effect.addFinalizer(() =>
Effect.sync(() => {
Expand All @@ -67,14 +71,18 @@ const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
)

yield* mailbox.offer(new User({ id: req.id, name: "John" })).pipe(
Effect.tap(() => {
emits++
}),
Effect.delay(100),
Effect.forever,
Effect.forkScoped
)

return mailbox
}),
GetInterrupts: () => Effect.sync(() => interrupts)
GetInterrupts: () => Effect.sync(() => interrupts),
GetEmits: () => Effect.sync(() => emits)
}
}))

Expand Down

0 comments on commit 9832d0d

Please sign in to comment.