Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to get extra args provided to functional handlers #409

Merged
merged 3 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions packages/restate-sdk/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ export interface Request {
* Raw unparsed request body
*/
readonly body: Uint8Array;

/**
* Extra arguments provided to the request handler:
* Lambda: [Context]
* Cloudflare workers: [Env, ExecutionContext]
* Deno: [ConnInfo]
* Bun: [Server]
*/
readonly extraArgs: unknown[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably something to say here about determinism. People should be extra careful about those arguments, and really consume them only within ctx.run.

Perhaps we should rather think about exposing them only within the run closure (e.g. as optional argument of the run closure itself.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, although it applies to many fields in this struct (eg body, attemptHeaders)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body is deterministic, headers too. attemptHeaders though is indeed not deterministic...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible to consume via a run.
It actually carries things that aren’t seirlazable, like clients to an object store etc’

I think that it needs to be clear, what that argument is, and a warning about determinism.

also the name should probably include that.

p.s, can it not be an array?
Can it be just unknown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@igalshilman why just unknown given it is always an array?

}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down Expand Up @@ -228,15 +237,15 @@ export interface Context extends RestateContext {
* // this action will not be retried anymore
* throw new TerminalError("Payment failed");
* } else if (result.paymentGatewayBusy) {
* // restate will retry automatically
* // restate will retry automatically
* throw new Exception("Payment gateway busy");
* } else {
* // success!
* }
* });
*
*
* ```
*
*
* @param action The function to run.
*/
run<T>(action: RunAction<T>): Promise<T>;
Expand Down
2 changes: 2 additions & 0 deletions packages/restate-sdk/src/context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
invocationValue: Uint8Array,
invocationHeaders: ReadonlyMap<string, string>,
attemptHeaders: ReadonlyMap<string, string | string[] | undefined>,
extraArgs: unknown[],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly stateMachine: StateMachine
) {
Expand All @@ -120,6 +121,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
headers: invocationHeaders,
attemptHeaders,
body: invocationValue,
extraArgs,
};
this.rand = new RandImpl(id, this.checkState.bind(this));
}
Expand Down
6 changes: 5 additions & 1 deletion packages/restate-sdk/src/endpoint/handlers/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import type { GenericHandler, RestateRequest } from "./generic.js";

export function fetcher(handler: GenericHandler) {
return {
fetch: async (event: Request): Promise<Response> => {
fetch: async (
event: Request,
...extraArgs: unknown[]
): Promise<Response> => {
const url = event.url;
const headers = Object.fromEntries(event.headers.entries());

const request: RestateRequest = {
url,
headers,
body: event.body,
extraArgs,
};

const resp = await handler.handle(request);
Expand Down
6 changes: 5 additions & 1 deletion packages/restate-sdk/src/endpoint/handlers/generic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface RestateRequest {
readonly url: string;
readonly headers: Headers;
readonly body: ReadableStream<Uint8Array> | null;
readonly extraArgs: unknown[];
}

export interface RestateResponse {
Expand Down Expand Up @@ -165,6 +166,7 @@ export class GenericHandler implements RestateHandler {
request.body,
request.headers,
serviceProtocolVersion,
request.extraArgs,
context ?? {}
);
}
Expand Down Expand Up @@ -211,6 +213,7 @@ export class GenericHandler implements RestateHandler {
body: ReadableStream<Uint8Array>,
headers: Record<string, string | string[] | undefined>,
serviceProtocolVersion: ServiceProtocolVersion,
extraArgs: unknown[],
context: AdditionalContext
): Promise<RestateResponse> {
let responseController: TransformStreamDefaultController<Uint8Array>;
Expand Down Expand Up @@ -242,7 +245,8 @@ export class GenericHandler implements RestateHandler {
invocation,
handler.kind(),
this.endpoint.logger,
invocation.inferLoggerContext(context)
invocation.inferLoggerContext(context),
extraArgs
);
connection.pipeToConsumer(stateMachine);

Expand Down
1 change: 1 addition & 0 deletions packages/restate-sdk/src/endpoint/handlers/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class LambdaHandler {
body,
headers: event.headers,
url: path,
extraArgs: [context],
};

const resp = await this.handler.handle(request, {
Expand Down
1 change: 1 addition & 0 deletions packages/restate-sdk/src/endpoint/node_endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class NodeEndpoint implements RestateEndpoint {
url,
headers: request.headers,
body: Readable.toWeb(request),
extraArgs: [],
});

response.writeHead(resp.statusCode, resp.headers);
Expand Down
2 changes: 2 additions & 0 deletions packages/restate-sdk/src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export class StateMachine implements RestateStreamConsumer {
handlerKind: HandlerKind,
logger: Logger,
loggerContext: LoggerContext,
extraArgs: unknown[],
private readonly suspensionMillis: number = 30_000
) {
this.localStateStore = invocation.localStateStore;
Expand All @@ -110,6 +111,7 @@ export class StateMachine implements RestateStreamConsumer {
invocation.invocationValue,
invocation.invocationHeaders,
connection.headers(),
extraArgs,
this
);
this.promiseCombinatorTracker = new PromiseCombinatorTracker(
Expand Down
3 changes: 2 additions & 1 deletion packages/restate-sdk/test/testdriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ export class UUT<N extends string, T> {
method.kind(),
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(restateServer as unknown as { builder: EndpointBuilder }).builder.logger,
invocation.inferLoggerContext()
invocation.inferLoggerContext(),
[]
);

const completed = stateMachine.invoke();
Expand Down