Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ctx.stateKeys() #256

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion proto/protocol.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
Expand Down Expand Up @@ -157,6 +157,20 @@ message ClearStateEntryMessage {
message ClearAllStateEntryMessage {
}

// Completable: Yes
// Fallible: No
// Type: 0x0800 + 4
message GetStateKeysEntryMessage {
message StateKeys {
repeated bytes keys = 1;
}

oneof result {
StateKeys value = 14;
Failure failure = 15;
};
}

// ------ Syscalls ------

// Completable: Yes
Expand Down
39 changes: 36 additions & 3 deletions src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
*/

import * as p from "./types/protocol";
import { Failure } from "./generated/proto/protocol";
import {
Failure,
GetStateKeysEntryMessage_StateKeys,
} from "./generated/proto/protocol";
import {
AWAKEABLE_ENTRY_MESSAGE_TYPE,
AwakeableEntryMessage,
Expand All @@ -22,7 +25,9 @@ import {
CompletionMessage,
EntryAckMessage,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage,
GetStateKeysEntryMessage,
INVOKE_ENTRY_MESSAGE_TYPE,
InvokeEntryMessage,
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -135,6 +140,16 @@ export class Journal<I, O> {
return this.appendJournalEntry(messageType, message);
}
}
case p.GET_STATE_KEYS_ENTRY_MESSAGE_TYPE: {
const getStateMsg = message as GetStateKeysEntryMessage;
if (getStateMsg.value !== undefined) {
// State was eagerly filled by the local state store
return Promise.resolve(getStateMsg.value);
} else {
// Need to retrieve state by going to the runtime.
return this.appendJournalEntry(messageType, message);
}
}
default: {
return this.appendJournalEntry(messageType, message);
}
Expand Down Expand Up @@ -171,8 +186,16 @@ export class Journal<I, O> {
}

if (m.value !== undefined) {
journalEntry.completablePromise.resolve(m.value);
this.pendingJournalEntries.delete(m.entryIndex);
if (journalEntry.messageType === GET_STATE_KEYS_ENTRY_MESSAGE_TYPE) {
// In case of get state keys we expect the parsed message
journalEntry.completablePromise.resolve(
GetStateKeysEntryMessage_StateKeys.decode(m.value)
);
this.pendingJournalEntries.delete(m.entryIndex);
} else {
journalEntry.completablePromise.resolve(m.value);
this.pendingJournalEntries.delete(m.entryIndex);
}
} else if (m.failure !== undefined) {
// we do all completions with Terminal Errors, because failures triggered by those exceptions
// when the bubble up would otherwise lead to re-tries, deterministic replay, re-throwing, and
Expand Down Expand Up @@ -258,6 +281,16 @@ export class Journal<I, O> {
);
break;
}
case GET_STATE_KEYS_ENTRY_MESSAGE_TYPE: {
const getStateMsg = replayMessage.message as GetStateKeysEntryMessage;
this.resolveResult(
journalIndex,
journalEntry,
getStateMsg.value,
getStateMsg.failure
);
break;
}
case INVOKE_ENTRY_MESSAGE_TYPE: {
const invokeMsg = replayMessage.message as InvokeEntryMessage;
this.resolveResult(
Expand Down
49 changes: 29 additions & 20 deletions src/local_state_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
ClearAllStateEntryMessage,
ClearStateEntryMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
GetStateKeysEntryMessage_StateKeys,
SetStateEntryMessage,
StartMessage_StateEntry,
} from "./generated/proto/protocol";
Expand All @@ -28,32 +30,39 @@ export class LocalStateStore {
);
}

public get(key: string): GetStateEntryMessage {
const present = this.state.has(key.toString());
if (!present && this.isPartial) {
// Partial eager state, so retrieve state from the runtime
return GetStateEntryMessage.create({ key: Buffer.from(key) });
} else if (!present) {
// Complete eager state, so state entry is null
return GetStateEntryMessage.create({
key: Buffer.from(key),
empty: Empty.create({}),
});
// Returns true if completed
public tryCompleteGet(key: string, msg: GetStateEntryMessage): boolean {
const stateEntry = this.state.get(key);
if (stateEntry === undefined) {
if (this.isPartial) {
// Partial eager state, so retrieve state from the runtime
return false;
} else {
// Complete eager state, so state entry is null
msg.empty = Empty.create({});
return true;
}
}

const stateEntry = this.state.get(key.toString());
if (stateEntry instanceof Buffer) {
return GetStateEntryMessage.create({
key: Buffer.from(key),
value: stateEntry,
});
msg.value = stateEntry;
} else {
// stateEntry is Empty
return GetStateEntryMessage.create({
key: Buffer.from(key),
empty: stateEntry,
});
msg.empty = stateEntry;
}
return true;
}

// Returns true if completed
public tryCompletedGetStateKeys(msg: GetStateKeysEntryMessage): boolean {
if (this.isPartial) {
return false;
}

msg.value = GetStateKeysEntryMessage_StateKeys.create({
keys: Array.from(this.state.keys()).map((b) => Buffer.from(b)),
});
return true;
}

public set<T>(key: string, value: T): SetStateEntryMessage {
Expand Down
2 changes: 2 additions & 0 deletions src/restate_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ export interface RestateBaseContext {
*/
get<T>(name: string): Promise<T | null>;

stateKeys(): Promise<Array<string>>;

/**
* Set/store state in the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
Expand Down
39 changes: 36 additions & 3 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import {
BackgroundInvokeEntryMessage,
CompleteAwakeableEntryMessage,
DeepPartial,
GetStateEntryMessage,
GetStateKeysEntryMessage,
GetStateKeysEntryMessage_StateKeys,
InvokeEntryMessage,
SleepEntryMessage,
} from "./generated/proto/protocol";
Expand All @@ -35,6 +38,7 @@ import {
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
INVOKE_ENTRY_MESSAGE_TYPE,
SET_STATE_ENTRY_MESSAGE_TYPE,
SIDE_EFFECT_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -111,19 +115,24 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
this.checkState("get state");

// Create the message and let the state machine process it
const msg = this.stateMachine.localStateStore.get(name);
const msg = GetStateEntryMessage.create({ key: Buffer.from(name) });
const completed = this.stateMachine.localStateStore.tryCompleteGet(
name,
msg
);

const getState = async (): Promise<T | null> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_ENTRY_MESSAGE_TYPE,
msg
msg,
completed
);

// If the GetState message did not have a value or empty,
// then we went to the runtime to get the value.
// When we get the response, we set it in the localStateStore,
// to answer subsequent requests
if (msg.value === undefined && msg.empty === undefined) {
if (!completed) {
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
}

Expand All @@ -136,6 +145,30 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
return getState();
}

// DON'T make this function async!!! see sideEffect comment for details.
public stateKeys(): Promise<Array<string>> {
// Check if this is a valid action
this.checkState("state keys");

// Create the message and let the state machine process it
const msg = GetStateKeysEntryMessage.create({});
const completed =
this.stateMachine.localStateStore.tryCompletedGetStateKeys(msg);

const getStateKeys = async (): Promise<Array<string>> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
msg,
completed
);

return (result as GetStateKeysEntryMessage_StateKeys).keys.map((b) =>
b.toString()
);
};
return getStateKeys();
}

public set<T>(name: string, value: T): void {
this.checkState("set state");
const msg = this.stateMachine.localStateStore.set(name, value);
Expand Down
8 changes: 8 additions & 0 deletions src/types/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
ErrorMessage,
EndMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
PollInputStreamEntryMessage,
Expand All @@ -44,6 +45,7 @@ export {
ErrorMessage,
EndMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
PollInputStreamEntryMessage,
Expand All @@ -67,6 +69,7 @@ export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n;
export const SET_STATE_ENTRY_MESSAGE_TYPE = 0x0801n;
export const CLEAR_STATE_ENTRY_MESSAGE_TYPE = 0x0802n;
export const CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE = 0x0803n;
export const GET_STATE_KEYS_ENTRY_MESSAGE_TYPE = 0x0804n;
export const SLEEP_ENTRY_MESSAGE_TYPE = 0x0c00n;
export const INVOKE_ENTRY_MESSAGE_TYPE = 0x0c01n;
export const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE = 0x0c02n;
Expand Down Expand Up @@ -94,6 +97,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
SET_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE,
Expand All @@ -116,6 +120,7 @@ const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map<bigint, string>([
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, "PollInputStreamEntryMessage"],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, "OutputStreamEntryMessage"],
[GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"],
[GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, "GetStateKeysEntryMessage"],
[SET_STATE_ENTRY_MESSAGE_TYPE, "SetStateEntryMessage"],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, "ClearStateEntryMessage"],
[CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE, "ClearAllStateEntryMessage"],
Expand Down Expand Up @@ -145,6 +150,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, PollInputStreamEntryMessage],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, OutputStreamEntryMessage],
[GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage],
[GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, GetStateKeysEntryMessage],
[SET_STATE_ENTRY_MESSAGE_TYPE, SetStateEntryMessage],
[CLEAR_STATE_ENTRY_MESSAGE_TYPE, ClearStateEntryMessage],
[CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE, ClearAllStateEntryMessage],
Expand All @@ -169,6 +175,7 @@ export type ProtocolMessage =
| PollInputStreamEntryMessage
| OutputStreamEntryMessage
| GetStateEntryMessage
| GetStateKeysEntryMessage
| SetStateEntryMessage
| ClearStateEntryMessage
| ClearAllStateEntryMessage
Expand All @@ -185,6 +192,7 @@ export type ProtocolMessage =
export const SUSPENSION_TRIGGERS: bigint[] = [
INVOKE_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
AWAKEABLE_ENTRY_MESSAGE_TYPE,
SLEEP_ENTRY_MESSAGE_TYPE,
COMBINATOR_ENTRY_MESSAGE,
Expand Down
2 changes: 2 additions & 0 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
ERROR_MESSAGE_TYPE,
formatMessageType,
GET_STATE_ENTRY_MESSAGE_TYPE,
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
KNOWN_MESSAGE_TYPES,
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PROTOBUF_MESSAGE_BY_TYPE,
Expand Down Expand Up @@ -62,6 +63,7 @@ class MessageType {
return (
messageType === POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE ||
messageType === GET_STATE_ENTRY_MESSAGE_TYPE ||
messageType === GET_STATE_KEYS_ENTRY_MESSAGE_TYPE ||
messageType === SLEEP_ENTRY_MESSAGE_TYPE ||
messageType === AWAKEABLE_ENTRY_MESSAGE_TYPE
);
Expand Down
Loading
Loading