From 1264d0469ec2971bb32495784f6ae0dc97224e28 Mon Sep 17 00:00:00 2001 From: Nicholas Date: Tue, 26 Mar 2024 21:54:55 +0000 Subject: [PATCH] chore: basic framework (not tested) --- README.md | 2 + src/consumer.ts | 153 ++++++++++++++++++++++++++++++++++++++++++ src/emitter.ts | 37 ++++++++++ src/index.ts | 5 +- src/lib/cloudflare.ts | 43 ++++++++++++ src/lib/fetch.ts | 15 +++++ src/logger.ts | 6 ++ src/types.ts | 99 +++++++++++++++++++++++++++ src/validation.ts | 41 +++++++++++ 9 files changed, 398 insertions(+), 3 deletions(-) create mode 100644 src/consumer.ts create mode 100644 src/emitter.ts create mode 100644 src/lib/cloudflare.ts create mode 100644 src/lib/fetch.ts create mode 100644 src/logger.ts create mode 100644 src/types.ts create mode 100644 src/validation.ts diff --git a/README.md b/README.md index 06136dd..aac33cd 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ Build CloudFlare Queues applications without the boilerplate. Just define an asy Based on [sqs-consumer](https://github.com/bbc/sqs-consumer). +> **Note:** This package is still in development and should be used with caution. + ## Installation To install this package, simply enter the following command into your terminal (or the variant of whatever package manager you are using): diff --git a/src/consumer.ts b/src/consumer.ts new file mode 100644 index 0000000..dd119f1 --- /dev/null +++ b/src/consumer.ts @@ -0,0 +1,153 @@ +import { TypedEventEmitter } from "./emitter.js"; +import type { + ConsumerOptions, + Message, + PullMessagesResponse, + AckMessageResponse, +} from "./types.js"; +import { assertOptions } from "./validation.js"; +import { queuesClient } from "./lib/cloudflare.js"; +import { logger } from "./logger.js"; + +/** + * [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage) + */ +export class Consumer extends TypedEventEmitter { + private accountId: string; + private queueId: string; + private handleMessage: (message: Message) => Promise; + private batchSize: number; + private visibilityTimeoutMs: number; + private pollingWaitTimeMs: number; + private stopped = true; + private isPolling = false; + + /** + * Create a new consumer + * @param options The options for the consumer + */ + constructor(options) { + super(); + assertOptions(options); + this.accountId = options.accountId; + this.queueId = options.queueId; + this.handleMessage = options.handleMessage; + this.batchSize = options.batchSize ?? 10; + this.visibilityTimeoutMs = options.visibilityTimeoutMs ?? 1000; + this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000; + } + + /** + * Creates a new SQS consumer. + */ + public static create(options: ConsumerOptions): Consumer { + return new Consumer(options); + } + + /** + * Returns the current status of the consumer. + * This includes whether it is running or currently polling. + */ + public get status(): { + isRunning: boolean; + isPolling: boolean; + } { + return { + isRunning: !this.stopped, + isPolling: this.isPolling, + }; + } + + /** + * Start polling the queue. + */ + public start(): void { + if (!this.stopped) { + return; + } + this.stopped = false; + this.poll(); + } + + /** + * Stop polling the queue. + */ + public stop(): void { + this.stopped = true; + } + + /** + * Poll the queue for messages. + */ + private async poll(): Promise { + if (this.stopped) { + logger.debug("cancelling_poll", { + detail: + "Poll was called while consumer was stopped, cancelling poll...", + }); + return; + } + + logger.debug("polling"); + + this.isPolling = true; + + try { + const response = await queuesClient({ + path: "messages/pull", + method: "POST", + body: { + batch_size: this.batchSize, + visibility_timeout_ms: this.visibilityTimeoutMs, + }, + accountId: this.accountId, + queueId: this.queueId, + }); + + const messages = response.result; + + if (!messages || messages.length === 0) { + this.emit("empty"); + this.isPolling = false; + return; + } + + const successfulMessages: string[] = []; + const failedMessages: string[] = []; + + for (const message of messages) { + this.emit("message_received", message); + try { + const result = await this.handleMessage(message); + if (result) { + successfulMessages.push(message.lease_id); + this.emit("message_processed", message); + } + } catch (e) { + failedMessages.push(message.lease_id); + this.emit("processing_error", e, message); + } + } + + logger.debug("acknowledging_messages", { + successfulMessages, + failedMessages, + }); + + await queuesClient({ + path: "messages/ack", + method: "POST", + body: { acks: successfulMessages, retries: failedMessages }, + accountId: this.accountId, + queueId: this.queueId, + }); + + this.emit("response_processed"); + } catch (e) { + this.emit("error", e); + } + + this.isPolling = false; + setTimeout(() => this.poll(), this.pollingWaitTimeMs); + } +} diff --git a/src/emitter.ts b/src/emitter.ts new file mode 100644 index 0000000..5eb5336 --- /dev/null +++ b/src/emitter.ts @@ -0,0 +1,37 @@ +import { EventEmitter } from "node:events"; + +import { logger } from "./logger.js"; +import type { Events } from "./types.js"; + +export class TypedEventEmitter extends EventEmitter { + /** + * Trigger a listener on all emitted events + * @param event The name of the event to listen to + * @param listener A function to trigger when the event is emitted + */ + on( + event: E, + listener: (...args: Events[E]) => void, + ): this { + return super.on(event, listener); + } + /** + * Trigger a listener only once for an emitted event + * @param event The name of the event to listen to + * @param listener A function to trigger when the event is emitted + */ + once( + event: E, + listener: (...args: Events[E]) => void, + ): this { + return super.once(event, listener); + } + /** + * Emits an event with the provided arguments + * @param event The name of the event to emit + */ + emit(event: E, ...args: Events[E]): boolean { + logger.debug(event, ...args); + return super.emit(event, ...args); + } +} diff --git a/src/index.ts b/src/index.ts index 6329f89..78dc2b2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,2 @@ -export function consumer() { - console.log("Hello World!"); -} +export { Consumer } from "./consumer.js"; +export * from "./types.js"; diff --git a/src/lib/cloudflare.ts b/src/lib/cloudflare.ts new file mode 100644 index 0000000..23303fe --- /dev/null +++ b/src/lib/cloudflare.ts @@ -0,0 +1,43 @@ +import { throwErrorIfResponseNotOk } from "./fetch.js"; + +const CLOUDFLARE_HOST = "https://api.cloudflare.com/client/v4"; + +export function getCredentials() { + const QUEUES_API_TOKEN = process.env.QUEUES_API_TOKEN; + + if (!QUEUES_API_TOKEN) { + throw new Error("Missing Cloudflare credentials"); + } + + return { + QUEUES_API_TOKEN, + }; +} + +export async function queuesClient({ + path, + method, + body, + accountId, + queueId, +}): Promise { + const { QUEUES_API_TOKEN } = getCredentials(); + + const response = await fetch( + `${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`, + { + method, + headers: { + "content-type": "application/json", + authorization: `Bearer ${QUEUES_API_TOKEN}`, + }, + body: JSON.stringify(body), + }, + ); + + throwErrorIfResponseNotOk(response); + + const data = (await response.json()) as T; + + return data; +} diff --git a/src/lib/fetch.ts b/src/lib/fetch.ts new file mode 100644 index 0000000..08eeaf3 --- /dev/null +++ b/src/lib/fetch.ts @@ -0,0 +1,15 @@ +export function throwErrorIfResponseNotOk(response: Response): void { + const { ok, status, statusText, url } = response; + + if (!ok) { + let error: Error; + + if (status) { + error = new Error(`[${status} - ${statusText}] ${url}`); + } else { + error = new Error(`[${statusText}] ${url}`); + } + + throw error; + } +} diff --git a/src/logger.ts b/src/logger.ts new file mode 100644 index 0000000..0fbd47a --- /dev/null +++ b/src/logger.ts @@ -0,0 +1,6 @@ +import createDebug from "debug"; +const debug = createDebug("cloudflare-queue-consumer"); + +export const logger = { + debug, +}; diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..4a04cff --- /dev/null +++ b/src/types.ts @@ -0,0 +1,99 @@ +/** + * The options for the consumer. + */ +export interface ConsumerOptions { + batchSize: number; + visibilityTimeoutMs: number; + accountId: string; + queueId: string; +} + +export type Message = { + body: string; + id: string; + timestamp_ms: number; + attempts: number; + lease_id: string; +}; + +export type CloudFlareError = { + code: number; + message: string; +}; + +export type CloudFlareResultInfo = { + page: number; + per_page: number; + total_pages: number; + count: number; + total_count: number; +}; + +export type PullMessagesResponse = { + errors: CloudFlareError[]; + messages: CloudFlareError[]; + result: Message[]; + success: boolean; + result_info: CloudFlareResultInfo; +}; + +export type AckMessageResponse = { + errors: CloudFlareError[]; + messages: CloudFlareError[]; + result: { + ackCount: number; + retryCount: number; + warnings: string[]; + }; + success: boolean; + result_info: CloudFlareResultInfo; +}; + +/** + * These are the events that the consumer emits. + */ +export interface Events { + /** + * Fired after one batch of items (up to `batchSize`) has been successfully processed. + */ + response_processed: []; + /** + * Fired when the queue is empty (All messages have been consumed). + */ + empty: []; + /** + * Fired when a message is received. + */ + message_received: [Message]; + /** + * Fired when a message is successfully processed and removed from the queue. + */ + message_processed: [Message]; + /** + * Fired when an error occurs interacting with the queue. + * + * If the error correlates to a message, that message is included in Params + */ + error: [Error, void | Message | Message[]]; + /** + * Fired when `handleMessageTimeout` is supplied as an option and if + * `handleMessage` times out. + */ + timeout_error: [Error, Message]; + /** + * Fired when an error occurs processing the message. + */ + processing_error: [Error, Message]; + /** + * Fired when requests to SQS were aborted. + */ + aborted: []; + /** + * Fired when the consumer starts its work.. + */ + started: []; + /** + * Fired when the consumer finally stops its work. + */ + stopped: []; +} diff --git a/src/validation.ts b/src/validation.ts new file mode 100644 index 0000000..522aed5 --- /dev/null +++ b/src/validation.ts @@ -0,0 +1,41 @@ +import type { ConsumerOptions } from "./types.js"; + +const requiredOptions = ["accountId", "queueId", "handleMessage"]; + +function validateOption(option: string, value: number, strict?: boolean): void { + switch (option) { + case "batchSize": + if (value < 1) { + throw new Error("batchSize must be at least 1."); + } + break; + case "visibilityTimeoutMs": + break; + default: + if (strict) { + throw new Error(`The update ${option} cannot be updated`); + } + break; + } +} + +/** + * Ensure that the required options have been set. + * @param options The options that have been set by the application. + */ +function assertOptions(options: ConsumerOptions): void { + requiredOptions.forEach((option) => { + const possibilities = option.split("|"); + if (!possibilities.find((p) => options[p])) { + throw new Error( + `Missing consumer option [ ${possibilities.join(" or ")} ].`, + ); + } + }); + + if (options.batchSize) { + validateOption("batchSize", options.batchSize, false); + } +} + +export { assertOptions, validateOption };