diff --git a/src/RPCServer.ts b/src/RPCServer.ts index b91e47b..1bde438 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -138,7 +138,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof RawHandler) { this.registerRawStreamHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; @@ -146,7 +146,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof DuplexHandler) { this.registerDuplexStreamHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; @@ -154,7 +154,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof ServerHandler) { this.registerServerStreamHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; @@ -162,7 +162,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof ClientHandler) { this.registerClientStreamHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; @@ -170,7 +170,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof ClientHandler) { this.registerClientStreamHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; @@ -178,7 +178,7 @@ class RPCServer extends EventTarget { if (manifestItem instanceof UnaryHandler) { this.registerUnaryHandler( key, - manifestItem.handle.bind(manifestItem), + manifestItem.handle, manifestItem.timeout, ); continue; diff --git a/src/errors/errors.ts b/src/errors/errors.ts index 90930e9..0934822 100644 --- a/src/errors/errors.ts +++ b/src/errors/errors.ts @@ -187,6 +187,16 @@ export function never(): never { throw new ErrorRPC('This function should never be called'); } +class ErrorRPCMethodNotImplemented extends ErrorRPC { + constructor(message?: string) { + super(message || 'This method must be overridden'); // Default message if none provided + this.name = 'ErrorRPCMethodNotImplemented'; + this.description = + 'This abstract method must be implemented in a derived class'; + this.exitCode = sysexits.USAGE; // Or another suitable exit code + } +} + export { ErrorRPC, ErrorRPCDestroyed, @@ -200,4 +210,5 @@ export { ErrorRPCStreamEnded, ErrorRPCTimedOut, ErrorUtilsUndefinedBehaviour, + ErrorRPCMethodNotImplemented, }; diff --git a/src/handlers/clientHandler.ts b/src/handlers/clientHandler.ts index 0e1a301..a12fc4c 100644 --- a/src/handlers/clientHandler.ts +++ b/src/handlers/clientHandler.ts @@ -1,18 +1,19 @@ import type { ContainerType, JSONValue } from '../types'; import type { ContextTimed } from '@matrixai/contexts'; import { Handler } from './handler'; +import { ErrorRPCMethodNotImplemented } from '../errors'; -abstract class ClientHandler< +export abstract class ClientHandler< Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue, > extends Handler { - abstract handle( + public handle = async ( input: AsyncIterableIterator, cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise; + ): Promise => { + throw new ErrorRPCMethodNotImplemented(); + }; } - -export { ClientHandler }; diff --git a/src/handlers/duplexHandler.ts b/src/handlers/duplexHandler.ts index 6906118..bae99fc 100644 --- a/src/handlers/duplexHandler.ts +++ b/src/handlers/duplexHandler.ts @@ -1,8 +1,9 @@ import type { ContainerType, JSONValue } from '../types'; import type { ContextTimed } from '@matrixai/contexts'; import { Handler } from './handler'; +import { ErrorRPCMethodNotImplemented } from '../errors'; -abstract class DuplexHandler< +export abstract class DuplexHandler< Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue, @@ -12,12 +13,12 @@ abstract class DuplexHandler< * error. If you need to handle any clean up it should be handled in a * `finally` block and check the abort signal for potential errors. */ - abstract handle( + public handle = async function* ( input: AsyncIterableIterator, cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): AsyncIterableIterator; + ): AsyncIterableIterator { + throw new ErrorRPCMethodNotImplemented('This method must be overwrtitten.'); + }; } - -export { DuplexHandler }; diff --git a/src/handlers/handler.ts b/src/handlers/handler.ts index 6c1d387..2b1b5c8 100644 --- a/src/handlers/handler.ts +++ b/src/handlers/handler.ts @@ -1,6 +1,6 @@ import type { ContainerType, JSONValue } from '../types'; -abstract class Handler< +export abstract class Handler< Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue, @@ -17,5 +17,3 @@ abstract class Handler< constructor(protected container: Container) {} } - -export { Handler }; diff --git a/src/handlers/rawHandler.ts b/src/handlers/rawHandler.ts index ae66040..62c16f7 100644 --- a/src/handlers/rawHandler.ts +++ b/src/handlers/rawHandler.ts @@ -2,16 +2,17 @@ import type { ContextTimed } from '@matrixai/contexts'; import type { ReadableStream } from 'stream/web'; import type { ContainerType, JSONRPCRequest, JSONValue } from '../types'; import { Handler } from './handler'; +import { ErrorRPCMethodNotImplemented } from '../errors'; -abstract class RawHandler< +export abstract class RawHandler< Container extends ContainerType = ContainerType, > extends Handler { - abstract handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise<[JSONValue, ReadableStream]>; + ): Promise<[JSONValue, ReadableStream]> => { + throw new ErrorRPCMethodNotImplemented('This method must be overridden'); + }; } - -export { RawHandler }; diff --git a/src/handlers/serverHandler.ts b/src/handlers/serverHandler.ts index 9c38463..2167ec8 100644 --- a/src/handlers/serverHandler.ts +++ b/src/handlers/serverHandler.ts @@ -1,18 +1,19 @@ import type { ContextTimed } from '@matrixai/contexts'; import type { ContainerType, JSONValue } from '../types'; import { Handler } from '@/handlers/handler'; +import { ErrorRPCMethodNotImplemented } from '../errors'; -abstract class ServerHandler< +export abstract class ServerHandler< Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue, > extends Handler { - abstract handle( + public handle = async function* ( input: Input, cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): AsyncIterableIterator; + ): AsyncIterableIterator { + throw new ErrorRPCMethodNotImplemented('This method must be overridden'); + }; } - -export { ServerHandler }; diff --git a/src/handlers/unaryHandler.ts b/src/handlers/unaryHandler.ts index 6fc5bf5..8415717 100644 --- a/src/handlers/unaryHandler.ts +++ b/src/handlers/unaryHandler.ts @@ -1,18 +1,19 @@ import type { ContextTimed } from '@matrixai/contexts'; import type { ContainerType, JSONValue } from '../types'; import { Handler } from './handler'; +import { ErrorRPCMethodNotImplemented } from '../errors'; -abstract class UnaryHandler< +export abstract class UnaryHandler< Container extends ContainerType = ContainerType, Input extends JSONValue = JSONValue, Output extends JSONValue = JSONValue, > extends Handler { - abstract handle( + public handle = async ( input: Input, cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise; + ): Promise => { + throw new ErrorRPCMethodNotImplemented('This method must be overridden'); + }; } - -export { UnaryHandler }; diff --git a/tests/rpc/RPC.test.ts b/tests/rpc/RPC.test.ts index d27156e..5c891c8 100644 --- a/tests/rpc/RPC.test.ts +++ b/tests/rpc/RPC.test.ts @@ -1,6 +1,7 @@ import type { ContainerType, JSONRPCRequest } from '../../src/types'; import type { ReadableStream } from 'stream/web'; import type { JSONValue } from '../../src/types'; +import type { ContextTimed } from '@matrixai/contexts'; import { TransformStream } from 'stream/web'; import { fc, testProp } from '@fast-check/jest'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; @@ -39,17 +40,17 @@ describe('RPC', () => { let header: JSONRPCRequest | undefined; class TestMethod extends RawHandler { - public async handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { return new Promise((resolve) => { const [header_, stream] = input; header = header_; resolve(['some leading data', stream]); }); - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -139,12 +140,16 @@ describe('RPC', () => { >(); class TestMethod extends RawHandler { - public async handle(): Promise< - [JSONRPCRequest, ReadableStream] - > { - throw Error('some error'); - } + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise<[JSONValue, ReadableStream]> => { + throw new Error('some error'); + }; } + const rpcServer = await RPCServer.createRPCServer({ manifest: { testMethod: new TestMethod({}), @@ -187,11 +192,14 @@ describe('RPC', () => { Uint8Array >(); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { yield* input; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -242,11 +250,13 @@ describe('RPC', () => { >(); class TestMethod extends ServerHandler { - public async *handle(input: number): AsyncGenerator { + public handle = async function* ( + input: number, + ): AsyncGenerator { for (let i = 0; i < input; i++) { yield i; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ @@ -294,14 +304,17 @@ describe('RPC', () => { >(); class TestMethod extends ClientHandler { - public async handle(input: AsyncIterable): Promise { + public handle = async ( + input: AsyncIterable, + ): Promise => { let acc = 0; for await (const number of input) { acc += number; } return acc; - } + }; } + const rpcServer = await RPCServer.createRPCServer({ manifest: { testMethod: new TestMethod({}), @@ -348,9 +361,9 @@ describe('RPC', () => { >(); class TestMethod extends UnaryHandler { - public async handle(input: JSONValue): Promise { + public handle = async (input: JSONValue): Promise => { return input; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -395,9 +408,14 @@ describe('RPC', () => { >(); class TestMethod extends UnaryHandler { - public async handle(): Promise { + public handle = async ( + _input: JSONValue, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise => { throw error; - } + }; } const rpcServer = await RPCServer.createRPCServer({ @@ -447,9 +465,14 @@ describe('RPC', () => { >(); class TestMethod extends UnaryHandler { - public async handle(): Promise { + public handle = async ( + _input: JSONValue, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise => { throw error; - } + }; } const rpcServer = await RPCServer.createRPCServer({ @@ -488,12 +511,16 @@ describe('RPC', () => { Uint8Array >(); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncIterableIterator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncIterableIterator { yield* input; - } + }; } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { return { forward: new TransformStream({ diff --git a/tests/rpc/RPCServer.test.ts b/tests/rpc/RPCServer.test.ts index a29a04e..533cfc6 100644 --- a/tests/rpc/RPCServer.test.ts +++ b/tests/rpc/RPCServer.test.ts @@ -64,12 +64,12 @@ describe(`${RPCServer.name}`, () => { rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6]), ); class TestHandler extends RawHandler { - public async handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { for await (const _ of input[1]) { // No touch, only consume } @@ -80,8 +80,9 @@ describe(`${RPCServer.name}`, () => { }, }); return Promise.resolve([null, readableStream]); - } + }; } + const rpcServer = await RPCServer.createRPCServer({ manifest: { testMethod: new TestHandler({}), @@ -106,14 +107,17 @@ describe(`${RPCServer.name}`, () => { async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { for await (const val of input) { yield val; break; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -138,15 +142,18 @@ describe(`${RPCServer.name}`, () => { async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends ClientHandler { - public async handle( + public handle = async ( input: AsyncGenerator, - ): Promise { + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise => { let count = 0; for await (const _ of input) { count += 1; } return count; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -171,11 +178,13 @@ describe(`${RPCServer.name}`, () => { async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends ServerHandler { - public async *handle(input: number): AsyncGenerator { + public handle = async function* ( + input: number, + ): AsyncGenerator { for (let i = 0; i < input; i++) { yield i; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -200,9 +209,9 @@ describe(`${RPCServer.name}`, () => { async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends UnaryHandler { - public async handle(input: JSONValue): Promise { + public handle = async (input: JSONValue): Promise => { return input; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -232,14 +241,17 @@ describe(`${RPCServer.name}`, () => { C: Symbol('c'), }; class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { expect(this.container).toBe(container); for await (const val of input) { yield val; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ @@ -273,16 +285,17 @@ describe(`${RPCServer.name}`, () => { }; let handledMeta; class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, - _cancel, - meta, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { handledMeta = meta; for await (const val of input) { yield val; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -306,17 +319,17 @@ describe(`${RPCServer.name}`, () => { testProp('handler can be aborted', [specificMessageArb], async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, - _cancel, - _meta, + cancel: (reason?: any) => void, + meta: Record | undefined, ctx: ContextTimed, ): AsyncGenerator { for await (const val of input) { if (ctx.signal.aborted) throw ctx.signal.reason; yield val; } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -359,13 +372,16 @@ describe(`${RPCServer.name}`, () => { testProp('handler yields nothing', [specificMessageArb], async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { for await (const _ of input) { // Do nothing, just consume } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -390,9 +406,9 @@ describe(`${RPCServer.name}`, () => { async (messages, error) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle(): AsyncGenerator { + public handle = async function* (): AsyncGenerator { throw error; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -430,10 +446,11 @@ describe(`${RPCServer.name}`, () => { async (messages, error) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle(): AsyncGenerator { + public handle = async function* (): AsyncGenerator { throw error; - } + }; } + const rpcServer = await RPCServer.createRPCServer({ manifest: { testMethod: new TestMethod({}), @@ -472,7 +489,7 @@ describe(`${RPCServer.name}`, () => { async (messages) => { const handlerEndedProm = promise(); class TestMethod extends DuplexHandler { - public async *handle(input): AsyncGenerator { + public handle = async function* (input): AsyncGenerator { try { for await (const _ of input) { // Consume but don't yield anything @@ -480,7 +497,7 @@ describe(`${RPCServer.name}`, () => { } finally { handlerEndedProm.resolveP(); } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -526,7 +543,7 @@ describe(`${RPCServer.name}`, () => { const handlerEndedProm = promise(); let ctx: ContextTimed | undefined; class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input, _cancel, _meta, @@ -539,7 +556,7 @@ describe(`${RPCServer.name}`, () => { } finally { handlerEndedProm.resolveP(); } - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -596,11 +613,14 @@ describe(`${RPCServer.name}`, () => { testProp('forward middlewares', [specificMessageArb], async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { yield* input; - } + }; } const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( () => { @@ -644,11 +664,14 @@ describe(`${RPCServer.name}`, () => { testProp('reverse middlewares', [specificMessageArb], async (messages) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { yield* input; - } + }; } const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { return { @@ -693,11 +716,14 @@ describe(`${RPCServer.name}`, () => { async (message) => { const stream = rpcTestUtils.messagesToReadableStream([message]); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { yield* input; - } + }; } const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( () => { @@ -772,12 +798,12 @@ describe(`${RPCServer.name}`, () => { // Diagnostic log to indicate the start of the test class TestHandler extends RawHandler { - public async handle( + public handle = async ( _input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, ctx_: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { return new Promise((resolve, reject) => { ctxProm.resolveP(ctx_); @@ -795,7 +821,7 @@ describe(`${RPCServer.name}`, () => { // Return something to fulfill the Promise type expectation. resolve([null, stream]); }); - } + }; } const rpcServer = await RPCServer.createRPCServer({ @@ -872,30 +898,30 @@ describe(`${RPCServer.name}`, () => { const ctxShortProm = promise(); class TestMethodShortTimeout extends UnaryHandler { timeout = 25; - public async handle( + public handle = async ( input: JSONValue, _cancel, _meta, ctx_, - ): Promise { + ): Promise => { ctxShortProm.resolveP(ctx_); await waitProm.p; return input; - } + }; } const ctxLongProm = promise(); class TestMethodLongTimeout extends UnaryHandler { timeout = 100; - public async handle( + public handle = async ( input: JSONValue, _cancel, _meta, ctx_, - ): Promise { + ): Promise => { ctxLongProm.resolveP(ctx_); await waitProm.p; return input; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -948,11 +974,11 @@ describe(`${RPCServer.name}`, () => { const stepProm2 = promise(); const passthroughStream = new TransformStream(); class TestHandler extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, - _cancel, - _meta, - ctx, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { contextProm.resolveP(ctx); for await (const _ of input) { @@ -962,7 +988,7 @@ describe(`${RPCServer.name}`, () => { yield 1; await stepProm2.p; yield 2; - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -1010,12 +1036,12 @@ describe(`${RPCServer.name}`, () => { test('stream ending cleans up timer and abortSignal', async () => { const ctxProm = promise(); class TestHandler extends RawHandler { - public async handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, ctx_: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { return new Promise((resolve) => { ctxProm.resolveP(ctx_); void (async () => { @@ -1030,7 +1056,7 @@ describe(`${RPCServer.name}`, () => { }); resolve([null, readableStream]); }); - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -1064,16 +1090,16 @@ describe(`${RPCServer.name}`, () => { test('Timeout has a grace period before forcing the streams closed', async () => { const ctxProm = promise(); class TestHandler extends RawHandler { - public async handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { ctxProm.resolveP(ctx); return Promise.resolve([null, new ReadableStream()]); - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: { @@ -1120,15 +1146,15 @@ describe(`${RPCServer.name}`, () => { const stream = rpcTestUtils.messagesToReadableStream(messages); const ctxProm = promise(); class TestMethod extends DuplexHandler { - public async *handle( + public handle = async function* ( input: AsyncGenerator, - _cancel, - _meta, - ctx, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { ctxProm.resolveP(ctx); yield* input; - } + }; } const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper((ctx) => { @@ -1160,18 +1186,18 @@ describe(`${RPCServer.name}`, () => { test('destroying the `RPCServer` sends an abort signal and closes connection', async () => { const ctxProm = promise(); class TestHandler extends RawHandler { - public async handle( + public handle = async ( input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, ctx_: ContextTimed, - ): Promise<[JSONValue, ReadableStream]> { + ): Promise<[JSONValue, ReadableStream]> => { return new Promise((resolve) => { ctxProm.resolveP(ctx_); // Echo messages return [null, input[1]]; }); - } + }; } const rpcServer = await RPCServer.createRPCServer({ manifest: {