From 714767b12e9f7dbf2080e7f2cfd3c018ff593a19 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Wed, 4 Sep 2024 10:14:25 +0100 Subject: [PATCH] Add priority-queue example for TS (#182) * Add priority-queue example for TS * Named queues * Add actual prioritization * just queue * Split out methods * Add priority to readme * Move run into op --- .../priority-queue-typescript/README.md | 18 +++ .../priority-queue-typescript/package.json | 19 +++ .../priority-queue-typescript/src/app.ts | 17 +++ .../priority-queue-typescript/src/queue.ts | 116 ++++++++++++++++++ .../src/queue_client.ts | 59 +++++++++ .../priority-queue-typescript/src/service.ts | 44 +++++++ .../priority-queue-typescript/tsconfig.json | 18 +++ 7 files changed, 291 insertions(+) create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/README.md create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/package.json create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/src/app.ts create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/src/queue.ts create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/src/queue_client.ts create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/src/service.ts create mode 100644 patterns-use-cases/priority-queue/priority-queue-typescript/tsconfig.json diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/README.md b/patterns-use-cases/priority-queue/priority-queue-typescript/README.md new file mode 100644 index 00000000..d01854d3 --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/README.md @@ -0,0 +1,18 @@ +# Priority queue + +An example of implementing your own priority queue using Restate state and +awakeables. + +Run the example with `npm run app-dev`. + +You can simulate adding work to the queue like this: +```shell +# add a single entry +curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2, "priority": 1}' +# add lots +for i in $(seq 1 30); do curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2, "priority": 2}'; done +``` + +As you do so, you can observe the logs; in flight requests will increase up to 10, beyond which items will be enqueued. + +You can write your own queue item selection logic in `selectAndPopItem`; doing so is outside the scope of this example. diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/package.json b/patterns-use-cases/priority-queue/priority-queue-typescript/package.json new file mode 100644 index 00000000..a715782d --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/package.json @@ -0,0 +1,19 @@ +{ + "name": "@restatedev/example-pattern-priority-queue", + "version": "0.1.0", + "description": "A Restate example showing the implementation of a distributed priority queue", + "type": "commonjs", + "scripts": { + "build": "tsc --noEmitOnError", + "app-dev": "tsx --watch ./src/app.ts", + "app": "tsx ./src/app.ts" + }, + "dependencies": { + "@restatedev/restate-sdk": "^1.2.1" + }, + "devDependencies": { + "@types/node": "^20.12.7", + "tsx": "^4.17.0", + "typescript": "^5.0.2" + } +} diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/src/app.ts b/patterns-use-cases/priority-queue/priority-queue-typescript/src/app.ts new file mode 100644 index 00000000..cdf12319 --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/src/app.ts @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { endpoint } from "@restatedev/restate-sdk"; + +import { queue } from "./queue"; +import { myService } from "./service"; + +endpoint().bind(queue).bind(myService).listen(); diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue.ts b/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue.ts new file mode 100644 index 00000000..d766b8ad --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue.ts @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { object, ObjectContext, TerminalError } from "@restatedev/restate-sdk"; + +type QueueState = { + items: QueueItem[]; + inFlight: number; +}; + +type QueueItem = { + awakeable: string; + priority: number; +}; + +type TickCause = + | { type: "done" } + | { type: "push"; item: QueueItem } + | { type: "drop"; awakeable: string }; + +// Put your super clever queue fairness algorithm here +function selectAndPopItem(items: QueueItem[]): QueueItem { + let lowest = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority < lowest.priority) { + lowest.priority = item.priority; + lowest.index = i; + } + } + const [item] = items.splice(lowest.index, 1); + return item; +} + +const MAX_IN_FLIGHT = 10; + +export const queue = object({ + name: "queue", + handlers: { + done: async (ctx: ObjectContext): Promise => { + const state = await getState(ctx); + + state.inFlight--; + + tick(ctx, state); + + setState(ctx, state); + }, + push: async ( + ctx: ObjectContext, + item: QueueItem, + ): Promise => { + const state = await getState(ctx); + + state.items.push(item); + + tick(ctx, state); + + setState(ctx, state); + }, + drop: async ( + ctx: ObjectContext, + awakeable: string, + ): Promise => { + const state = await getState(ctx); + + const index = state.items.findIndex( + (item) => item.awakeable == awakeable, + ); + if (index == -1) { + // we have already popped it; treat this as a 'done' + state.inFlight--; + } else { + // remove from the queue + state.items.splice(index, 1); + } + + tick(ctx, state); + + setState(ctx, state); + }, + }, +}); + +async function getState(ctx: ObjectContext): Promise { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +function tick(ctx: ObjectContext, state: QueueState) { + while (state.inFlight < MAX_IN_FLIGHT && state.items.length > 0) { + let item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } + + ctx.console.log( + `Tick end. Queue length: ${state.items.length}, In Flight: ${state.inFlight}`, + ); +} + +export type Queue = typeof queue; diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue_client.ts b/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue_client.ts new file mode 100644 index 00000000..a32c0cd2 --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/src/queue_client.ts @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { Context, TerminalError } from "@restatedev/restate-sdk"; +import type { Queue as QueueObject } from "./queue"; + +export interface Queue { + run(priority: number, op: () => Promise): Promise; +} + +export namespace Queue { + export function fromContext(ctx: Context, name: string): Queue { + return { + async run(priority: number, op: () => Promise): Promise { + const client = ctx.objectSendClient( + { name: "queue" }, + name, + ); + + const awakeable = ctx.awakeable(); + client.push({ + awakeable: awakeable.id, + priority, + }); + + try { + await awakeable.promise; + } catch (e) { + if (e instanceof TerminalError) { + // this should only happen on cancellation; inform the queue that we no longer need to be scheduled + client.drop(awakeable.id); + } + throw e; + } + + try { + const result = await op(); + + client.done(); + + return result; + } catch (e) { + if (e instanceof TerminalError) { + client.done(); + } + throw e; + } + }, + }; + } +} diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/src/service.ts b/patterns-use-cases/priority-queue/priority-queue-typescript/src/service.ts new file mode 100644 index 00000000..41ec9a06 --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/src/service.ts @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate Examples for the Node.js/TypeScript SDK, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/blob/main/LICENSE + */ + +import { Context, service } from "@restatedev/restate-sdk"; +import { Queue } from "./queue_client"; + +const QUEUE_NAME = "myService/expensiveMethod"; + +export const myService = service({ + name: "myService", + handlers: { + expensiveMethod: async ( + ctx: Context, + params: { left: number; right: number; priority?: number }, + ): Promise => { + const queue = Queue.fromContext(ctx, QUEUE_NAME); + return queue.run(params.priority ?? 1, () => + expensiveOperation(ctx, params.left, params.right), + ); + }, + }, +}); + +async function expensiveOperation( + ctx: Context, + left: number, + right: number, +): Promise { + return ctx.run(async () => { + // very cpu heavy - important that the queue protects this + await new Promise((resolve) => setTimeout(resolve, 5_000)); + return left + right; + }); +} + +export type MyService = typeof myService; diff --git a/patterns-use-cases/priority-queue/priority-queue-typescript/tsconfig.json b/patterns-use-cases/priority-queue/priority-queue-typescript/tsconfig.json new file mode 100644 index 00000000..c2946b24 --- /dev/null +++ b/patterns-use-cases/priority-queue/priority-queue-typescript/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "esnext", + "lib": ["esnext"], + "module": "nodenext", + "allowJs": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipDefaultLibCheck": true, + "skipLibCheck": true + } +}