Skip to content

Commit

Permalink
add worker tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 1, 2025
1 parent 5905eee commit 3bd36df
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 142 deletions.
2 changes: 1 addition & 1 deletion packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ export const makePlatform = <W>() =>
port,
scope,
emit(data) {
FiberSet.unsafeAdd(runFork(handler(data)))
FiberSet.unsafeAdd(fiberSet, runFork(handler(data)))
},
deferred: fiberSet.deferred as any
})
Expand Down
3 changes: 2 additions & 1 deletion packages/rpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"devDependencies": {
"@effect/experimental": "workspace:^",
"@effect/platform": "workspace:^",
"effect": "workspace:^"
"effect": "workspace:^",
"happy-dom": "^14.12.3"
},
"peerDependencies": {
"@effect/experimental": "workspace:^",
Expand Down
145 changes: 5 additions & 140 deletions packages/rpc/test/RpcServer.test.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,13 @@
import * as NodeSocketServer from "@effect/experimental/SocketServer/Node"
import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpServer, NodeSocket } from "@effect/platform-node"
import { Rpc, RpcClient, RpcGroup, RpcMiddleware, RpcSchema, RpcSerialization, RpcServer } from "@effect/rpc"
import { assert, describe, it } from "@effect/vitest"
import { Context, Effect, Layer, Mailbox, Schema, Stream, TestClock } from "effect"

const liveSleep = (ms: number) =>
Effect.async<void>((resume) => {
setTimeout(() => resume(Effect.void), ms)
})
import { RpcClient, RpcSerialization, RpcServer } from "@effect/rpc"
import { describe } from "@effect/vitest"
import { Effect, Layer } from "effect"
import { e2eSuite } from "./e2e.js"
import { RpcLive, UsersClient } from "./fixtures/schemas.js"

describe("RpcServer", () => {
const e2eSuite = <E>(name: string, layer: Layer.Layer<UsersClient, E>) => {
describe.concurrent(name, () => {
it.effect("should get user", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const user = yield* client.GetUser({ id: "1" })
assert.instanceOf(user, User)
assert.deepStrictEqual(user, new User({ id: "1", name: "Logged in user" }))
}).pipe(Effect.provide(layer)))

it.effect("headers", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const user = yield* client.GetUser({ id: "1" })
assert.instanceOf(user, User)
assert.deepStrictEqual(user, new User({ id: "1", name: "John" }))
}).pipe(
RpcClient.withHeaders({ name: "John" }),
Effect.provide(layer)
))

it.effect("Stream", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const users: Array<User> = []
yield* client.StreamUsers({ id: "1" }).pipe(
Stream.take(5),
Stream.runForEach((user) =>
Effect.sync(() => {
users.push(user)
})
),
Effect.fork
)

// wait for socket to connect
yield* liveSleep(100)
yield* TestClock.adjust(1000)
assert.deepStrictEqual(users, [new User({ id: "1", name: "John" })])
yield* TestClock.adjust(4000)
assert.lengthOf(users, 5)

yield* liveSleep(100)
const interrupts = yield* client.GetInterrupts({})
assert.equal(interrupts, 1)
}).pipe(Effect.provide(layer)))
})
}

// http ndjson
const HttpNdjsonServer = HttpRouter.Default.serve().pipe(
Layer.provide(RpcLive),
Expand Down Expand Up @@ -155,86 +103,3 @@ describe("RpcServer", () => {
)
)
})

class User extends Schema.Class<User>("User")({
id: Schema.String,
name: Schema.String
}) {}

class StreamUsers extends Schema.TaggedRequest<StreamUsers>()("StreamUsers", {
success: RpcSchema.Stream({
success: User,
failure: Schema.Never
}),
failure: Schema.Never,
payload: {
id: Schema.String
}
}) {}

class CurrentUser extends Context.Tag("CurrentUser")<CurrentUser, User>() {}

class AuthMiddleware extends RpcMiddleware.Tag<AuthMiddleware>()("TestMiddleware", {
provides: CurrentUser
}) {}

const UserRpcs = RpcGroup
.make(
Rpc.make("GetUser", {
success: User,
payload: { id: Schema.String }
}),
StreamUsers,
Rpc.make("GetInterrupts", {
success: Schema.Number
})
)
.middleware(AuthMiddleware)

class UsersClient extends Context.Tag("UsersClient")<
UsersClient,
RpcClient.RpcClient<RpcGroup.Rpcs<typeof UserRpcs>>
>() {
static layer = Layer.scoped(UsersClient, RpcClient.make(UserRpcs))
}

const AuthLive = Layer.succeed(
AuthMiddleware,
AuthMiddleware.of((options) =>
Effect.succeed(
new User({ id: "1", name: options.headers.name ?? "Logged in user" })
)
)
)

const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
let interrupts = 0
return {
GetUser: (_) => CurrentUser,
StreamUsers: Effect.fnUntraced(function*(req) {
const mailbox = yield* Mailbox.make<User>()

yield* Effect.addFinalizer(() =>
Effect.sync(() => {
interrupts++
})
)

yield* mailbox.offer(new User({ id: req.id, name: "John" })).pipe(
Effect.delay(1000),
Effect.forever,
Effect.forkScoped
)

return mailbox
}),
GetInterrupts: () => Effect.sync(() => interrupts)
}
}))

const RpcLive = RpcServer.layer(UserRpcs).pipe(
Layer.provide([
UsersLive,
AuthLive
])
)
17 changes: 17 additions & 0 deletions packages/rpc/test/RpcWorker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// @vitest-environment happy-dom
import "@vitest/web-worker"

import * as BrowserWorker from "@effect/platform-browser/BrowserWorker"
import * as RpcClient from "@effect/rpc/RpcClient"
import { describe } from "@effect/vitest"
import { Layer } from "effect"
import { e2eSuite } from "./e2e.js"
import { UsersClient } from "./fixtures/schemas.js"

describe("RpcWorker", () => {
const WorkerClient = UsersClient.layer.pipe(
Layer.provide(RpcClient.layerProtocolWorker({ size: 1 })),
Layer.provide(BrowserWorker.layerPlatform(() => new Worker(new URL("./fixtures/worker.ts", import.meta.url))))
)
e2eSuite("e2e worker", WorkerClient)
})
49 changes: 49 additions & 0 deletions packages/rpc/test/e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { RpcClient } from "@effect/rpc"
import { assert, describe, it } from "@effect/vitest"
import type { Layer } from "effect"
import { Effect, Stream } from "effect"
import { User, UsersClient } from "./fixtures/schemas.js"

export const e2eSuite = <E>(name: string, layer: Layer.Layer<UsersClient, E>) => {
describe(name, () => {
it.effect("should get user", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const user = yield* client.GetUser({ id: "1" })
assert.instanceOf(user, User)
assert.deepStrictEqual(user, new User({ id: "1", name: "Logged in user" }))
}).pipe(Effect.provide(layer)))

it.effect("headers", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const user = yield* client.GetUser({ id: "1" })
assert.instanceOf(user, User)
assert.deepStrictEqual(user, new User({ id: "1", name: "John" }))
}).pipe(
RpcClient.withHeaders({ name: "John" }),
Effect.provide(layer)
))

it.live("Stream", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const users: Array<User> = []
yield* client.StreamUsers({ id: "1" }).pipe(
Stream.take(5),
Stream.runForEach((user) =>
Effect.sync(() => {
users.push(user)
})
),
Effect.fork
)

yield* Effect.sleep(1000)
assert.lengthOf(users, 5)

const interrupts = yield* client.GetInterrupts({})
assert.equal(interrupts, 1)
}).pipe(Effect.provide(layer)), { timeout: 20000 })
})
}
90 changes: 90 additions & 0 deletions packages/rpc/test/fixtures/schemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import * as Rpc from "@effect/rpc/Rpc"
import * as RpcClient from "@effect/rpc/RpcClient"
import * as RpcGroup from "@effect/rpc/RpcGroup"
import * as RpcMiddleware from "@effect/rpc/RpcMiddleware"
import * as RpcSchema from "@effect/rpc/RpcSchema"
import * as RpcServer from "@effect/rpc/RpcServer"
import { Context, Effect, Layer, Mailbox, Schema } from "effect"

export class User extends Schema.Class<User>("User")({
id: Schema.String,
name: Schema.String
}) {}

class StreamUsers extends Schema.TaggedRequest<StreamUsers>()("StreamUsers", {
success: RpcSchema.Stream({
success: User,
failure: Schema.Never
}),
failure: Schema.Never,
payload: {
id: Schema.String
}
}) {}

class CurrentUser extends Context.Tag("CurrentUser")<CurrentUser, User>() {}

class AuthMiddleware extends RpcMiddleware.Tag<AuthMiddleware>()("TestMiddleware", {
provides: CurrentUser
}) {}

export const UserRpcs = RpcGroup
.make(
Rpc.make("GetUser", {
success: User,
payload: { id: Schema.String }
}),
StreamUsers,
Rpc.make("GetInterrupts", {
success: Schema.Number
})
)
.middleware(AuthMiddleware)

export class UsersClient extends Context.Tag("UsersClient")<
UsersClient,
RpcClient.RpcClient<RpcGroup.Rpcs<typeof UserRpcs>>
>() {
static layer = Layer.scoped(UsersClient, RpcClient.make(UserRpcs))
}

const AuthLive = Layer.succeed(
AuthMiddleware,
AuthMiddleware.of((options) =>
Effect.succeed(
new User({ id: "1", name: options.headers.name ?? "Logged in user" })
)
)
)

const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
let interrupts = 0
return {
GetUser: (_) => CurrentUser,
StreamUsers: Effect.fnUntraced(function*(req) {
const mailbox = yield* Mailbox.make<User>()

yield* Effect.addFinalizer(() =>
Effect.sync(() => {
interrupts++
})
)

yield* mailbox.offer(new User({ id: req.id, name: "John" })).pipe(
Effect.delay(100),
Effect.forever,
Effect.forkScoped
)

return mailbox
}),
GetInterrupts: () => Effect.sync(() => interrupts)
}
}))

export const RpcLive = RpcServer.layer(UserRpcs).pipe(
Layer.provide([
UsersLive,
AuthLive
])
)
12 changes: 12 additions & 0 deletions packages/rpc/test/fixtures/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { BrowserWorkerRunner } from "@effect/platform-browser"
import * as RpcServer from "@effect/rpc/RpcServer"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import { RpcLive } from "./schemas.js"

const MainLive = RpcLive.pipe(
Layer.provide(RpcServer.layerProtocolWorkerRunner),
Layer.provide(BrowserWorkerRunner.layer)
)

Effect.runPromise(Layer.launch(MainLive))
1 change: 1 addition & 0 deletions packages/rpc/tsconfig.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"include": ["test"],
"references": [
{ "path": "tsconfig.src.json" },
{ "path": "../platform-browser/tsconfig.src.json" },
{ "path": "../platform-node/tsconfig.src.json" },
{ "path": "../vitest/tsconfig.src.json" }
],
Expand Down
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3bd36df

Please sign in to comment.