Skip to content

Commit

Permalink
Make sure we correctly set the completed flag for GetState and GetS…
Browse files Browse the repository at this point in the history
…tateKeys
  • Loading branch information
slinkydeveloper committed Feb 8, 2024
1 parent d7357af commit bb1dad8
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 39 deletions.
48 changes: 22 additions & 26 deletions src/local_state_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ClearStateEntryMessage,
GetStateEntryMessage,
GetStateKeysEntryMessage,
GetStateKeysEntryMessage_StateKeys,
SetStateEntryMessage,
StartMessage_StateEntry,
} from "./generated/proto/protocol";
Expand All @@ -28,44 +29,39 @@ export class LocalStateStore {
);
}

public get(key: string): GetStateEntryMessage {
const present = this.state.has(key.toString());
if (!present && this.isPartial) {
// Partial eager state, so retrieve state from the runtime
return GetStateEntryMessage.create({ key: Buffer.from(key) });
} else if (!present) {
// Complete eager state, so state entry is null
return GetStateEntryMessage.create({
key: Buffer.from(key),
empty: Empty.create({}),
});
// Returns true if completed
public tryCompleteGet(key: string, msg: GetStateEntryMessage): boolean {
const stateEntry = this.state.get(key);
if (stateEntry === undefined) {
if (this.isPartial) {
// Partial eager state, so retrieve state from the runtime
return false;
} else {
// Complete eager state, so state entry is null
msg.empty = Empty.create({});
return true;
}
}

const stateEntry = this.state.get(key.toString());
if (stateEntry instanceof Buffer) {
return GetStateEntryMessage.create({
key: Buffer.from(key),
value: stateEntry,
});
msg.value = stateEntry;
} else {
// stateEntry is Empty
return GetStateEntryMessage.create({
key: Buffer.from(key),
empty: stateEntry,
});
msg.empty = stateEntry;
}
return true;
}

public getStateKeys(): GetStateKeysEntryMessage {
// Returns true if completed
public tryCompletedGetStateKeys(msg: GetStateKeysEntryMessage): boolean {
if (this.isPartial) {
return {};
return false;
}

return GetStateKeysEntryMessage.create({
value: {
keys: Array.from(this.state.keys()).map((b) => Buffer.from(b)),
},
msg.value = GetStateKeysEntryMessage_StateKeys.create({
keys: Array.from(this.state.keys()).map((b) => Buffer.from(b)),
});
return true;
}

public set<T>(key: string, value: T): SetStateEntryMessage {
Expand Down
22 changes: 15 additions & 7 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
BackgroundInvokeEntryMessage,
CompleteAwakeableEntryMessage,
DeepPartial,
GetStateEntryMessage,
GetStateKeysEntryMessage,
GetStateKeysEntryMessage_StateKeys,
InvokeEntryMessage,
SleepEntryMessage,
Expand Down Expand Up @@ -112,21 +114,24 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
this.checkState("get state");

// Create the message and let the state machine process it
const msg = this.stateMachine.localStateStore.get(name);
const msg = GetStateEntryMessage.create({ key: Buffer.from(name) });
const completed = this.stateMachine.localStateStore.tryCompleteGet(
name,
msg
);

const getState = async (): Promise<T | null> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_ENTRY_MESSAGE_TYPE,
msg
msg,
completed
);

// TODO WHERE's the completed flag?

// If the GetState message did not have a value or empty,
// then we went to the runtime to get the value.
// When we get the response, we set it in the localStateStore,
// to answer subsequent requests
if (msg.value === undefined && msg.empty === undefined) {
if (!completed) {
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
}

Expand All @@ -145,12 +150,15 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
this.checkState("state keys");

// Create the message and let the state machine process it
const msg = this.stateMachine.localStateStore.getStateKeys();
const msg = GetStateKeysEntryMessage.create({});
const completed =
this.stateMachine.localStateStore.tryCompletedGetStateKeys(msg);

const getStateKeys = async (): Promise<Array<string>> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
msg
msg,
completed
);

return (result as GetStateKeysEntryMessage_StateKeys).keys.map((b) =>
Expand Down
21 changes: 15 additions & 6 deletions test/protoutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,45 +157,54 @@ export function getStateMessage<T>(
GetStateEntryMessage.create({
key: Buffer.from(key),
empty: Empty.create({}),
})
}),
true
);
} else if (value !== undefined) {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
value: Buffer.from(jsonSerialize(value)),
})
}),
true
);
} else if (failure !== undefined) {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
failure: failure,
})
}),
true
);
} else {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
})
}),
false
);
}
}

export function getStateKeysMessage(value?: Array<string>): Message {
if (value === undefined) {
return new Message(GET_STATE_KEYS_ENTRY_MESSAGE_TYPE, {});
return new Message(
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
GetStateKeysEntryMessage.create({}),
false
);
} else {
return new Message(
GET_STATE_KEYS_ENTRY_MESSAGE_TYPE,
GetStateKeysEntryMessage.create({
value: {
keys: value.map((b) => Buffer.from(b)),
},
})
}),
true
);
}
}
Expand Down

0 comments on commit bb1dad8

Please sign in to comment.