Skip to content

Commit

Permalink
Support KeyedEventHandler
Browse files Browse the repository at this point in the history
This commit adds the support for the Event API for the
keyed dynamic handlers. Since the dynamic handler API requires a
string key, this means that the registered gRPC methods are of the form

rpc Handle(StringKeyedEvent) returns (google.protobuf.Empty) {};
  • Loading branch information
igalshilman committed Sep 18, 2023
1 parent 08458c0 commit 0b19442
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 30 deletions.
4 changes: 2 additions & 2 deletions buf.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ deps:
- remote: buf.build
owner: restatedev
repository: proto
commit: 7c1d0063691147dab51a87fc7d2befa5
digest: shake256:41fb2128bd34a84f363d2b0cafbf0746b82d2b18c400cf85a1b3a94b28a2f4643fc17c9cf523cf98720b227b8d75b24fdc03fb87f8fa9a7ef75f788a5cbfe0c4
commit: 4c536701ef5348ecbf3cd1ef6cf825fc
digest: shake256:0fdebe27d9653dc31f9951623e0c8dc68a161d2b55146cc22c0501d2bccb22d49ab9b2b80c8f4de8827c4ba168119296f14d323eed5162ef63f128803cc64f47
56 changes: 56 additions & 0 deletions examples/handler_example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate SDK for Node.js/TypeScript,
* which is released under the MIT license.
*
* You can find a copy of the license in file LICENSE in the root
* directory of this repository or package, or at
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/

/* eslint-disable no-console */

/*
* A simple example program using the Restate's event handlers.
*/

import * as restate from "../src/public_api";

const registration = async (ctx: restate.RpcContext, event: restate.Event) => {
// store in state the user's information as coming from the registeration event
const { name } = event.json<{ name: string }>();
ctx.set("name", name);
};

const email = async (ctx: restate.RpcContext, event: restate.Event) => {
// store in state the user's information as coming from the email event
const { email } = event.json<{ email: string }>();
ctx.set("email", email);
};

type UserProfile = {
id: string;
name: string;
email: string;
};

const get = async (
ctx: restate.RpcContext,
id: string
): Promise<UserProfile> => {
return {
id,
name: (await ctx.get<string>("name")) ?? "",
email: (await ctx.get<string>("email")) ?? "",
};
};

const profile = restate.keyedRouter({
registration: restate.keyedEventHandler(registration),
email: restate.keyedEventHandler(email),
get,
});

// restate server
restate.createServer().bindKeyedRouter("profile", profile).listen(8080);
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"verify": "npm run format -- --check && npm run test && npm run lint && npm run build",
"release": "release-it",
"example": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/example.ts",
"grpcexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/grpc_example.ts"
"grpcexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/grpc_example.ts",
"handlerexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/handler_example.ts"
},
"files": [
"dist"
Expand Down
3 changes: 3 additions & 0 deletions proto/dynrpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
syntax = "proto3";

import "dev/restate/ext.proto";
import "dev/restate/events.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/empty.proto";

service RpcEndpoint {
option (dev.restate.ext.service_type) = KEYED;

rpc call(RpcRequest) returns (RpcResponse) {};

rpc handle(dev.restate.StringKeyedEvent) returns (google.protobuf.Empty) {};
}

service UnkeyedRpcEndpoint {
Expand Down
3 changes: 3 additions & 0 deletions src/public_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ export {
export {
router,
keyedRouter,
keyedEventHandler,
UnKeyedRouter,
KeyedRouter,
KeyedEventHandler,
Client,
SendClient,
} from "./types/router";
Expand All @@ -32,3 +34,4 @@ export {
} from "./server/restate_lambda_handler";
export * as RestateUtils from "./utils/public_utils";
export { ErrorCodes, RestateError, TerminalError } from "./types/errors";
export { Event } from "./types/types";
144 changes: 121 additions & 23 deletions src/server/base_restate_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import {
ProtocolMode,
ServiceDiscoveryResponse,
} from "../generated/proto/discovery";
import { Event } from "../types/types";
import { StringKeyedEvent } from "../generated/dev/restate/events";
import {
FileDescriptorProto,
UninterpretedOption,
} from "../generated/google/protobuf/descriptor";
import { Empty } from "../generated/google/protobuf/empty";
import {
FileDescriptorProto as FileDescriptorProto1,
ServiceDescriptorProto as ServiceDescriptorProto1,
Expand All @@ -45,6 +48,7 @@ import { RestateContext, useContext } from "../restate_context";
import { RpcContextImpl } from "../restate_context_impl";
import { verifyAssumptions } from "../utils/assumpsions";
import { TerminalError } from "../public_api";
import { isEventHandler } from "../types/router";

export interface ServiceOpts {
descriptor: ProtoMetadata;
Expand Down Expand Up @@ -148,6 +152,78 @@ export abstract class BaseRestateServer {
}
}

rpcHandler(
keyed: boolean,
route: string,
handler: Function
): {
descriptor: MethodDescriptorProto1;
method: GrpcServiceMethod<unknown, unknown>;
} {
const descriptor = createRpcMethodDescriptor(route);

const localMethod = (instance: unknown, input: RpcRequest) => {
const ctx = useContext(instance);
if (keyed) {
return dispatchKeyedRpcHandler(ctx, input, handler);
} else {
return dispatchUnkeyedRpcHandler(ctx, input, handler);
}
};

const decoder = RpcRequest.decode;
const encoder = (message: RpcResponse) =>
RpcResponse.encode(message).finish();

const method = new GrpcServiceMethod<RpcRequest, RpcResponse>(
route,
route,
localMethod,
decoder,
encoder
);

return {
descriptor: descriptor,
method: method as GrpcServiceMethod<unknown, unknown>,
};
}

stringKeyedEventHandler(
keyed: boolean,
route: string,
handler: Function
): {
descriptor: MethodDescriptorProto1;
method: GrpcServiceMethod<unknown, unknown>;
} {
if (!keyed) {
// TODO: support unkeyed rpc event handler
throw new TerminalError("Unkeyed Event handlers are not yet supported.");
}
const descriptor = createStringKeyedMethodDescriptor(route);
const localMethod = (instance: unknown, input: StringKeyedEvent) => {
const ctx = useContext(instance);
return dispatchKeyedEventHandler(ctx, input, handler);
};

const decoder = StringKeyedEvent.decode;
const encoder = (message: Empty) => Empty.encode(message).finish();

const method = new GrpcServiceMethod<StringKeyedEvent, Empty>(
route,
route,
localMethod,
decoder,
encoder
);

return {
descriptor,
method: method as GrpcServiceMethod<unknown, unknown>,
};
}

protected bindRpcService(name: string, router: RpcRouter, keyed: boolean) {
const lastDot = name.indexOf(".");
const serviceName = lastDot === -1 ? name : name.substring(lastDot + 1);
Expand All @@ -161,40 +237,33 @@ export abstract class BaseRestateServer {
? pushKeyedService(desc, name)
: pushUnKeyedService(desc, name);

const decoder = RpcRequest.decode;
const encoder = (message: RpcResponse) =>
RpcResponse.encode(message).finish();

for (const [route, handler] of Object.entries(router)) {
serviceGrpcSpec.method.push(createRpcMethodDescriptor(route));

const localFn = (instance: unknown, input: RpcRequest) => {
const ctx = useContext(instance);
if (keyed) {
return dispatchKeyedRpcHandler(ctx, input, handler);
} else {
return dispatchUnkeyedRpcHandler(ctx, input, handler);
}
let registration: {
descriptor: MethodDescriptorProto1;
method: GrpcServiceMethod<unknown, unknown>;
};

const method = new GrpcServiceMethod<RpcRequest, RpcResponse>(
route,
route,
localFn,
decoder,
encoder
);

if (isEventHandler(handler)) {
const theHandler = handler.handler;
registration = this.stringKeyedEventHandler(keyed, route, theHandler);
} else {
registration = this.rpcHandler(keyed, route, handler);
}
serviceGrpcSpec.method.push(registration.descriptor);
const url = `/invoke/${name}/${route}`;
this.methods[url] = new HostedGrpcServiceMethod(
{}, // we don't actually execute on any class instance
servicePackage,
serviceName,
method
registration.method
) as HostedGrpcServiceMethod<unknown, unknown>;

rlog.info(
`Registering: ${url} -> ${JSON.stringify(method, null, "\t")}`
`Registering: ${url} -> ${JSON.stringify(
registration.method,
null,
"\t"
)}`
);
}

Expand Down Expand Up @@ -376,6 +445,25 @@ async function dispatchUnkeyedRpcHandler(
return RpcResponse.create({ response: result });
}

async function dispatchKeyedEventHandler(
origCtx: RestateContext,
req: StringKeyedEvent,
handler: Function
): Promise<Empty> {
const ctx = new RpcContextImpl(origCtx);
const key = req.key;
if (typeof key !== "string" || key.length === 0) {
// we throw a terminal error here, because this cannot be patched by updating code:
// if the request is wrong (missing a key), the request can never make it
throw new TerminalError(
"Keyed handlers must recieve a non null or empty string key"
);
}
const jsEvent = new Event(key, req.payload, req.source, req.attributes);
await handler(ctx, jsEvent);
return Empty.create({});
}

function copyProtoMetadata(
original: RpcServiceProtoMetadata
): RpcServiceProtoMetadata {
Expand Down Expand Up @@ -458,4 +546,14 @@ function createRpcMethodDescriptor(methodName: string): MethodDescriptorProto1 {
return desc;
}

function createStringKeyedMethodDescriptor(
methodName: string
): MethodDescriptorProto1 {
const desc = {
...rpcServiceProtoMetadata.fileDescriptor.service[0].method[1],
} as MethodDescriptorProto1;
desc.name = methodName;
return desc;
}

const dynrpcDescriptor = copyProtoMetadata(rpcServiceProtoMetadata);
36 changes: 32 additions & 4 deletions src/types/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import { RpcContext } from "../restate_context";
import { Event } from "../types/types";

// ----------- generics -------------------------------------------------------

Expand All @@ -25,11 +26,13 @@ type WithoutRpcContext<F> = F extends (
: never;

export type Client<M> = {
[K in keyof M]: M[K];
[K in keyof M as M[K] extends never ? never : K]: M[K];
};

export type SendClient<M> = {
[K in keyof M]: M[K] extends (...args: infer P) => any
[K in keyof M as M[K] extends never ? never : K]: M[K] extends (
...args: infer P
) => any
? (...args: P) => void
: never;
};
Expand Down Expand Up @@ -68,11 +71,15 @@ export type KeyedHandler<F> = F extends (ctx: RpcContext) => Promise<any>
: never;

export type KeyedRouterOpts<U> = {
[K in keyof U]: U[K] extends KeyedHandler<any> ? U[K] : never;
[K in keyof U]: U[K] extends KeyedHandler<any> | KeyedEventHandler<U[K]>
? U[K]
: never;
};

export type KeyedRouter<U> = {
[K in keyof U]: U[K] extends KeyedHandler<infer F>
[K in keyof U]: U[K] extends KeyedEventHandler<U[K]>
? never
: U[K] extends KeyedHandler<infer F>
? WithKeyArgument<WithoutRpcContext<F>>
: never;
};
Expand All @@ -83,3 +90,24 @@ export const keyedRouter = <M>(opts: KeyedRouterOpts<M>): KeyedRouter<M> => {
}
return opts as KeyedRouter<M>;
};

// ----------- event handlers ----------------------------------------------

export type KeyedEventHandler<U> = U extends (
ctx: RpcContext,
event: Event
) => Promise<void>
? U
: never;

export const keyedEventHandler = <H>(handler: KeyedEventHandler<H>): H => {
return { eventHandler: true, handler: handler } as H;
};

export const isEventHandler = (
handler: any
): handler is {
handler: (ctx: RpcContext, event: Event) => Promise<void>;
} => {
return typeof handler === "object" && handler["eventHandler"];
};
17 changes: 17 additions & 0 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,20 @@ export class Header {
return res;
}
}

export class Event {
constructor(
readonly key: string,
readonly payload: Buffer,
readonly source: string,
readonly attributes: Record<string, string>
) {}

public json<T>(): T {
return JSON.parse(this.payload.toString("utf-8")) as T;
}

public body(): Uint8Array {
return this.payload;
}
}

0 comments on commit 0b19442

Please sign in to comment.