Skip to content

Commit

Permalink
Make it possible to get extra args provided to functional handlers (#409
Browse files Browse the repository at this point in the history
)

* Make it possible to get extra args provided to functional handlers

* Use unknown instead of object

* Add comment re determinism
  • Loading branch information
jackkleeman authored Aug 3, 2024
1 parent 451e796 commit 1750275
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 6 deletions.
17 changes: 14 additions & 3 deletions packages/restate-sdk/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ export interface Request {
* Raw unparsed request body
*/
readonly body: Uint8Array;

/**
* Extra arguments provided to the request handler:
* Lambda: [Context]
* Cloudflare workers: [Env, ExecutionContext]
* Deno: [ConnInfo]
* Bun: [Server]
* These arguments can contain request-specific values that could change after a suspension.
* Care should be taken to use them deterministically.
*/
readonly extraArgs: unknown[];
}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down Expand Up @@ -228,15 +239,15 @@ export interface Context extends RestateContext {
* // this action will not be retried anymore
* throw new TerminalError("Payment failed");
* } else if (result.paymentGatewayBusy) {
* // restate will retry automatically
* // restate will retry automatically
* throw new Exception("Payment gateway busy");
* } else {
* // success!
* }
* });
*
*
* ```
*
*
* @param action The function to run.
*/
run<T>(action: RunAction<T>): Promise<T>;
Expand Down
2 changes: 2 additions & 0 deletions packages/restate-sdk/src/context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
invocationValue: Uint8Array,
invocationHeaders: ReadonlyMap<string, string>,
attemptHeaders: ReadonlyMap<string, string | string[] | undefined>,
extraArgs: unknown[],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly stateMachine: StateMachine
) {
Expand All @@ -120,6 +121,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
headers: invocationHeaders,
attemptHeaders,
body: invocationValue,
extraArgs,
};
this.rand = new RandImpl(id, this.checkState.bind(this));
}
Expand Down
6 changes: 5 additions & 1 deletion packages/restate-sdk/src/endpoint/handlers/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import type { GenericHandler, RestateRequest } from "./generic.js";

export function fetcher(handler: GenericHandler) {
return {
fetch: async (event: Request): Promise<Response> => {
fetch: async (
event: Request,
...extraArgs: unknown[]
): Promise<Response> => {
const url = event.url;
const headers = Object.fromEntries(event.headers.entries());

const request: RestateRequest = {
url,
headers,
body: event.body,
extraArgs,
};

const resp = await handler.handle(request);
Expand Down
6 changes: 5 additions & 1 deletion packages/restate-sdk/src/endpoint/handlers/generic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface RestateRequest {
readonly url: string;
readonly headers: Headers;
readonly body: ReadableStream<Uint8Array> | null;
readonly extraArgs: unknown[];
}

export interface RestateResponse {
Expand Down Expand Up @@ -165,6 +166,7 @@ export class GenericHandler implements RestateHandler {
request.body,
request.headers,
serviceProtocolVersion,
request.extraArgs,
context ?? {}
);
}
Expand Down Expand Up @@ -211,6 +213,7 @@ export class GenericHandler implements RestateHandler {
body: ReadableStream<Uint8Array>,
headers: Record<string, string | string[] | undefined>,
serviceProtocolVersion: ServiceProtocolVersion,
extraArgs: unknown[],
context: AdditionalContext
): Promise<RestateResponse> {
let responseController: TransformStreamDefaultController<Uint8Array>;
Expand Down Expand Up @@ -242,7 +245,8 @@ export class GenericHandler implements RestateHandler {
invocation,
handler.kind(),
this.endpoint.logger,
invocation.inferLoggerContext(context)
invocation.inferLoggerContext(context),
extraArgs
);
connection.pipeToConsumer(stateMachine);

Expand Down
1 change: 1 addition & 0 deletions packages/restate-sdk/src/endpoint/handlers/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class LambdaHandler {
body,
headers: event.headers,
url: path,
extraArgs: [context],
};

const resp = await this.handler.handle(request, {
Expand Down
1 change: 1 addition & 0 deletions packages/restate-sdk/src/endpoint/node_endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class NodeEndpoint implements RestateEndpoint {
url,
headers: request.headers,
body: Readable.toWeb(request),
extraArgs: [],
});

response.writeHead(resp.statusCode, resp.headers);
Expand Down
2 changes: 2 additions & 0 deletions packages/restate-sdk/src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export class StateMachine implements RestateStreamConsumer {
handlerKind: HandlerKind,
logger: Logger,
loggerContext: LoggerContext,
extraArgs: unknown[],
private readonly suspensionMillis: number = 30_000
) {
this.localStateStore = invocation.localStateStore;
Expand All @@ -110,6 +111,7 @@ export class StateMachine implements RestateStreamConsumer {
invocation.invocationValue,
invocation.invocationHeaders,
connection.headers(),
extraArgs,
this
);
this.promiseCombinatorTracker = new PromiseCombinatorTracker(
Expand Down
3 changes: 2 additions & 1 deletion packages/restate-sdk/test/testdriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ export class UUT<N extends string, T> {
method.kind(),
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(restateServer as unknown as { builder: EndpointBuilder }).builder.logger,
invocation.inferLoggerContext()
invocation.inferLoggerContext(),
[]
);

const completed = stateMachine.invoke();
Expand Down

0 comments on commit 1750275

Please sign in to comment.