Skip to content

Commit

Permalink
Adapt to the new start message and awakeable identifier format
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Aug 21, 2023
1 parent 553d31c commit 62ab95c
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 208 deletions.
23 changes: 9 additions & 14 deletions src/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
CompletablePromise,
makeFqServiceName,
printMessageAsJson,
uuidV7FromBuffer,
} from "./utils/utils";
import {
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
Expand All @@ -45,8 +44,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {

private runtimeReplayIndex = 0;
private replayEntries = new Map<number, Message>();
private instanceKey?: Buffer = undefined;
private invocationId?: Buffer = undefined;
private id?: Buffer = undefined;
private debugId?: string = undefined;
private invocationValue?: Buffer = undefined;
private nbEntriesToReplay?: number = undefined;
private localStateStore?: LocalStateStore;
Expand Down Expand Up @@ -120,8 +119,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
partialState: boolean
): InvocationBuilder<I, O> {
this.nbEntriesToReplay = m.knownEntries;
this.instanceKey = m.instanceKey;
this.invocationId = m.invocationId;
this.id = m.id;
this.debugId = m.debugId;
this.localStateStore = new LocalStateStore(partialState, m.stateMap);
return this;
}
Expand Down Expand Up @@ -153,8 +152,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}
return new Invocation(
this.method!,
this.instanceKey!,
this.invocationId!,
this.id!,
this.debugId!,
this.nbEntriesToReplay!,
this.replayEntries!,
this.invocationValue!,
Expand All @@ -164,24 +163,20 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}

export class Invocation<I, O> {
public readonly invocationIdString;
public readonly logPrefix;
constructor(
public readonly method: HostedGrpcServiceMethod<I, O>,
public readonly instanceKey: Buffer,
public readonly invocationId: Buffer,
public readonly id: Buffer,
public readonly debugId: string,
public readonly nbEntriesToReplay: number,
public readonly replayEntries: Map<number, Message>,
public readonly invocationValue: Buffer,
public readonly localStateStore: LocalStateStore
) {
this.invocationIdString = uuidV7FromBuffer(this.invocationId);
this.logPrefix = `[${makeFqServiceName(
this.method.packge,
this.method.service
)}-${this.instanceKey.toString("base64")}-${this.invocationIdString}] [${
this.method.method.name
}]`;
)}/${this.method.method.name}] [${this.debugId}]`;
}
}

Expand Down
17 changes: 3 additions & 14 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,16 @@ import { Client, SendClient } from "./types/router";
*/
export interface RestateBaseContext {
/**
* The key associated with the current function invocation.
*
* For keyed services, this is the key extracted from the input argument, as annotated in the
* protobuf service definition.
*
* For unkeyed services, this is the internal key under which restate stores the journal and
* transient state of the function execution.
* The unique id that identifies the current function invocation. This id is guaranteed to be
* unique across invocations, but constant across reties and suspensions.
*/
instanceKey: Buffer;
id: Buffer;

/**
* Name of the service.
*/
serviceName: string;

/**
* The unique id that identifies the current function invocation. This id is guaranteed to be
* unique across invocations, but constant across reties and suspensions.
*/
invocationId: Buffer;

/**
* Get/retrieve state from the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
Expand Down
33 changes: 9 additions & 24 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
} from "./generated/proto/protocol";
import {
AWAKEABLE_ENTRY_MESSAGE_TYPE,
AwakeableIdentifier,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -75,8 +74,7 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
private static callContext = new AsyncLocalStorage<CallContext>();

constructor(
public readonly instanceKey: Buffer,
public readonly invocationId: Buffer,
public readonly id: Buffer,
public readonly serviceName: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly stateMachine: StateMachine<any, any>
Expand Down Expand Up @@ -326,17 +324,13 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {

// This needs to be done after handling the message in the state machine
// otherwise the index is not yet incremented.
const id = AwakeableIdentifier.create({
serviceName: this.stateMachine.getFullServiceName(),
instanceKey: this.instanceKey,
invocationId: this.invocationId,
entryIndex: this.stateMachine.getUserCodeJournalIndex(),
});
const encodedEntryIndex = Buffer.alloc(4 /* Size of u32 */);
encodedEntryIndex.writeUInt32BE(
this.stateMachine.getUserCodeJournalIndex()
);

return {
id: Buffer.from(AwakeableIdentifier.encode(id).finish()).toString(
"base64url"
),
id: Buffer.concat([this.id, encodedEntryIndex]).toString("base64url"),
promise: promise,
};
}
Expand All @@ -359,15 +353,7 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
id: string,
base: DeepPartial<CompleteAwakeableEntryMessage>
): void {
const awakeableIdentifier = AwakeableIdentifier.decode(
Buffer.from(id, "base64url")
);

base.serviceName = awakeableIdentifier.serviceName;
base.instanceKey = awakeableIdentifier.instanceKey;
base.invocationId = awakeableIdentifier.invocationId;
base.entryIndex = awakeableIdentifier.entryIndex;

base.id = id;
this.stateMachine.handleUserCodeMessage(
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
CompleteAwakeableEntryMessage.create(base)
Expand Down Expand Up @@ -487,9 +473,8 @@ async function executeWithRetries<T>(
export class RpcContextImpl implements RpcContext {
constructor(
private readonly ctx: RestateGrpcContext,
public readonly instanceKey: Buffer = ctx.instanceKey,
public readonly serviceName: string = ctx.serviceName,
public readonly invocationId: Buffer = ctx.invocationId
public readonly id: Buffer = ctx.id,
public readonly serviceName: string = ctx.serviceName
) {}

public rpc<M>({ path }: ServiceApi): Client<M> {
Expand Down
3 changes: 1 addition & 2 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
this.localStateStore = invocation.localStateStore;

this.restateContext = new RestateGrpcContextImpl(
this.invocation.instanceKey,
this.invocation.invocationId,
this.invocation.id,
this.invocation.method.service,
this
);
Expand Down
1 change: 0 additions & 1 deletion src/types/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ export {
SleepEntryMessage,
StartMessage,
SuspensionMessage,
AwakeableIdentifier,
} from "../generated/proto/protocol";

// Export the protocol message types as defined by the restate protocol.
Expand Down
20 changes: 1 addition & 19 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,6 @@ export function printMessageAsJson(obj: any): string {
);
}

// Only used for logging the invocation ID in debug logging mode
export function uuidV7FromBuffer(buffer: Buffer): string {
// if (buffer.length !== 16) {
// throw new Error('Invalid UUIDv7 buffer length');
// }
const bytes = new Uint8Array(buffer);
return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(
""
);
}

export function makeFqServiceName(pckg: string, name: string): string {
return pckg ? `${pckg}.${name}` : name;
}
Expand Down Expand Up @@ -142,14 +131,7 @@ const completeAwakeableMsgEquality = (
msg1: CompleteAwakeableEntryMessage,
msg2: CompleteAwakeableEntryMessage
) => {
if (
!(
msg1.serviceName === msg2.serviceName &&
msg1.instanceKey.equals(msg2.instanceKey) &&
msg1.invocationId.equals(msg2.invocationId) &&
msg1.entryIndex === msg2.entryIndex
)
) {
if (!(msg1.id === msg2.id)) {
return false;
}

Expand Down
8 changes: 1 addition & 7 deletions test/awakeable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,7 @@ describe("AwakeableGreeter", () => {
const result = await new TestDriver(new AwakeableGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
"hello"
), // should have been an awakeableMessage
resolveAwakeableMessage("awakeable-1", "hello"), // should have been an awakeableMessage
]).run();

expect(result.length).toStrictEqual(1);
Expand Down
98 changes: 7 additions & 91 deletions test/complete_awakeable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ describe("ResolveAwakeableGreeter", () => {
]).run();

expect(result).toStrictEqual([
resolveAwakeableMessage(
"test.TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
"hello"
),
resolveAwakeableMessage(getAwakeableId(1), "hello"),
outputMessage(greetResponse("Hello")),
]);
});
Expand All @@ -65,13 +59,7 @@ describe("ResolveAwakeableGreeter", () => {
]).run();

expect(result).toStrictEqual([
resolveAwakeableMessage(
"test.TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
""
),
resolveAwakeableMessage(getAwakeableId(1), ""),
outputMessage(greetResponse("Hello")),
]);
});
Expand All @@ -80,13 +68,7 @@ describe("ResolveAwakeableGreeter", () => {
const result = await new TestDriver(new ResolveAwakeableGreeter("hello"), [
startMessage(),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"test.TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
"hello"
),
resolveAwakeableMessage(getAwakeableId(1), "hello"),
]).run();

expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
Expand All @@ -96,13 +78,7 @@ describe("ResolveAwakeableGreeter", () => {
const result = await new TestDriver(new ResolveAwakeableGreeter(""), [
startMessage(),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"test.TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
""
),
resolveAwakeableMessage(getAwakeableId(1), ""),
]).run();

expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
Expand All @@ -124,66 +100,12 @@ describe("ResolveAwakeableGreeter", () => {
checkJournalMismatchError(result[0]);
});

it("fails on journal mismatch. Completed with wrong service name", async () => {
const result = await new TestDriver(new ResolveAwakeableGreeter("hello"), [
startMessage(2),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"TestGreeterzzz", // this should have been TestGreeter
Buffer.from("123"),
Buffer.from("abcd"),
1,
"hello"
),
]).run();

expect(result.length).toStrictEqual(1);
checkJournalMismatchError(result[0]);
});

it("fails on journal mismatch. Completed with wrong instance key.", async () => {
it("fails on journal mismatch. Completed with wrong id.", async () => {
const result = await new TestDriver(new ResolveAwakeableGreeter("hello"), [
startMessage(2),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"TestGreeter",
Buffer.from("1234"), // this should have been a Buffer.from("123")
Buffer.from("abcd"),
1,
"hello"
),
]).run();

expect(result.length).toStrictEqual(1);
checkJournalMismatchError(result[0]);
});

it("fails on journal mismatch. Completed with wrong invocation id.", async () => {
const result = await new TestDriver(new ResolveAwakeableGreeter("hello"), [
startMessage(2),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"TestGreeter",
Buffer.from("123"),
Buffer.from("abcde"), // this should have been a Buffer.from("abcd")
1,
"hello"
),
]).run();

expect(result.length).toStrictEqual(1);
checkJournalMismatchError(result[0]);
});

it("fails on journal mismatch. Completed with wrong entry index.", async () => {
const result = await new TestDriver(new ResolveAwakeableGreeter("hello"), [
startMessage(2),
inputMessage(greetRequest("Till")),
resolveAwakeableMessage(
"TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
2, // this should have been 1
"1234", // this should have been getAwakeableId(1)
"hello"
),
]).run();
Expand Down Expand Up @@ -214,13 +136,7 @@ describe("RejectAwakeableGreeter", () => {
).run();

expect(result).toStrictEqual([
rejectAwakeableMessage(
"test.TestGreeter",
Buffer.from("123"),
Buffer.from("abcd"),
1,
"my bad error"
),
rejectAwakeableMessage(getAwakeableId(1), "my bad error"),
outputMessage(greetResponse("Hello")),
]);
});
Expand Down
3 changes: 2 additions & 1 deletion test/protocol_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ describe("Restate Streaming Connection", () => {
new Message(
START_MESSAGE_TYPE,
StartMessage.create({
invocationId: Buffer.from("abcd"),
id: Buffer.from("abcd"),
debugId: "abcd",
knownEntries: 1337,
})
)
Expand Down
Loading

0 comments on commit 62ab95c

Please sign in to comment.