Skip to content

Commit

Permalink
chore: basic framework (not tested)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Mar 26, 2024
1 parent 20731ed commit 1264d04
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Build CloudFlare Queues applications without the boilerplate. Just define an asy

Based on [sqs-consumer](https://github.com/bbc/sqs-consumer).

> **Note:** This package is still in development and should be used with caution.
## Installation

To install this package, simply enter the following command into your terminal (or the variant of whatever package manager you are using):
Expand Down
153 changes: 153 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { TypedEventEmitter } from "./emitter.js";
import type {
ConsumerOptions,
Message,
PullMessagesResponse,
AckMessageResponse,
} from "./types.js";
import { assertOptions } from "./validation.js";
import { queuesClient } from "./lib/cloudflare.js";
import { logger } from "./logger.js";

/**
* [Usage](https://bbc.github.io/cloudflare-queue-consumer/index.html#usage)
*/
export class Consumer extends TypedEventEmitter {
private accountId: string;
private queueId: string;
private handleMessage: (message: Message) => Promise<Message | void>;
private batchSize: number;
private visibilityTimeoutMs: number;
private pollingWaitTimeMs: number;
private stopped = true;
private isPolling = false;

/**
* Create a new consumer
* @param options The options for the consumer
*/
constructor(options) {
super();
assertOptions(options);
this.accountId = options.accountId;
this.queueId = options.queueId;
this.handleMessage = options.handleMessage;
this.batchSize = options.batchSize ?? 10;
this.visibilityTimeoutMs = options.visibilityTimeoutMs ?? 1000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 1000;
}

/**
* Creates a new SQS consumer.
*/
public static create(options: ConsumerOptions): Consumer {
return new Consumer(options);
}

/**
* Returns the current status of the consumer.
* This includes whether it is running or currently polling.
*/
public get status(): {
isRunning: boolean;
isPolling: boolean;
} {
return {
isRunning: !this.stopped,
isPolling: this.isPolling,
};
}

/**
* Start polling the queue.
*/
public start(): void {
if (!this.stopped) {
return;
}
this.stopped = false;
this.poll();
}

/**
* Stop polling the queue.
*/
public stop(): void {
this.stopped = true;
}

/**
* Poll the queue for messages.
*/
private async poll(): Promise<void> {
if (this.stopped) {
logger.debug("cancelling_poll", {
detail:
"Poll was called while consumer was stopped, cancelling poll...",
});
return;
}

logger.debug("polling");

this.isPolling = true;

try {
const response = await queuesClient<PullMessagesResponse>({
path: "messages/pull",
method: "POST",
body: {
batch_size: this.batchSize,
visibility_timeout_ms: this.visibilityTimeoutMs,
},
accountId: this.accountId,
queueId: this.queueId,
});

const messages = response.result;

if (!messages || messages.length === 0) {
this.emit("empty");
this.isPolling = false;
return;
}

const successfulMessages: string[] = [];
const failedMessages: string[] = [];

for (const message of messages) {
this.emit("message_received", message);
try {
const result = await this.handleMessage(message);
if (result) {
successfulMessages.push(message.lease_id);
this.emit("message_processed", message);
}
} catch (e) {
failedMessages.push(message.lease_id);
this.emit("processing_error", e, message);
}
}

logger.debug("acknowledging_messages", {
successfulMessages,
failedMessages,
});

await queuesClient<AckMessageResponse>({
path: "messages/ack",
method: "POST",
body: { acks: successfulMessages, retries: failedMessages },
accountId: this.accountId,
queueId: this.queueId,
});

this.emit("response_processed");
} catch (e) {
this.emit("error", e);
}

this.isPolling = false;
setTimeout(() => this.poll(), this.pollingWaitTimeMs);
}
}
37 changes: 37 additions & 0 deletions src/emitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { EventEmitter } from "node:events";

import { logger } from "./logger.js";
import type { Events } from "./types.js";

export class TypedEventEmitter extends EventEmitter {
/**
* Trigger a listener on all emitted events
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
on<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void,
): this {
return super.on(event, listener);
}
/**
* Trigger a listener only once for an emitted event
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
once<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void,
): this {
return super.once(event, listener);
}
/**
* Emits an event with the provided arguments
* @param event The name of the event to emit
*/
emit<E extends keyof Events>(event: E, ...args: Events[E]): boolean {
logger.debug(event, ...args);
return super.emit(event, ...args);
}
}
5 changes: 2 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export function consumer() {
console.log("Hello World!");
}
export { Consumer } from "./consumer.js";
export * from "./types.js";
43 changes: 43 additions & 0 deletions src/lib/cloudflare.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { throwErrorIfResponseNotOk } from "./fetch.js";

const CLOUDFLARE_HOST = "https://api.cloudflare.com/client/v4";

export function getCredentials() {
const QUEUES_API_TOKEN = process.env.QUEUES_API_TOKEN;

if (!QUEUES_API_TOKEN) {
throw new Error("Missing Cloudflare credentials");
}

return {
QUEUES_API_TOKEN,
};
}

export async function queuesClient<T = unknown>({
path,
method,
body,
accountId,
queueId,
}): Promise<T> {
const { QUEUES_API_TOKEN } = getCredentials();

const response = await fetch(
`${CLOUDFLARE_HOST}/accounts/${accountId}/queues/${queueId}/${path}`,
{
method,
headers: {
"content-type": "application/json",
authorization: `Bearer ${QUEUES_API_TOKEN}`,
},
body: JSON.stringify(body),
},
);

throwErrorIfResponseNotOk(response);

const data = (await response.json()) as T;

return data;
}
15 changes: 15 additions & 0 deletions src/lib/fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function throwErrorIfResponseNotOk(response: Response): void {
const { ok, status, statusText, url } = response;

if (!ok) {
let error: Error;

if (status) {
error = new Error(`[${status} - ${statusText}] ${url}`);
} else {
error = new Error(`[${statusText}] ${url}`);
}

throw error;
}
}
6 changes: 6 additions & 0 deletions src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import createDebug from "debug";
const debug = createDebug("cloudflare-queue-consumer");

export const logger = {
debug,
};
99 changes: 99 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* The options for the consumer.
*/
export interface ConsumerOptions {
batchSize: number;
visibilityTimeoutMs: number;
accountId: string;
queueId: string;
}

export type Message = {
body: string;
id: string;
timestamp_ms: number;
attempts: number;
lease_id: string;
};

export type CloudFlareError = {
code: number;
message: string;
};

export type CloudFlareResultInfo = {
page: number;
per_page: number;
total_pages: number;
count: number;
total_count: number;
};

export type PullMessagesResponse = {
errors: CloudFlareError[];
messages: CloudFlareError[];
result: Message[];
success: boolean;
result_info: CloudFlareResultInfo;
};

export type AckMessageResponse = {
errors: CloudFlareError[];
messages: CloudFlareError[];
result: {
ackCount: number;
retryCount: number;
warnings: string[];
};
success: boolean;
result_info: CloudFlareResultInfo;
};

/**
* These are the events that the consumer emits.
*/
export interface Events {
/**
* Fired after one batch of items (up to `batchSize`) has been successfully processed.
*/
response_processed: [];
/**
* Fired when the queue is empty (All messages have been consumed).
*/
empty: [];
/**
* Fired when a message is received.
*/
message_received: [Message];
/**
* Fired when a message is successfully processed and removed from the queue.
*/
message_processed: [Message];
/**
* Fired when an error occurs interacting with the queue.
*
* If the error correlates to a message, that message is included in Params
*/
error: [Error, void | Message | Message[]];
/**
* Fired when `handleMessageTimeout` is supplied as an option and if
* `handleMessage` times out.
*/
timeout_error: [Error, Message];
/**
* Fired when an error occurs processing the message.
*/
processing_error: [Error, Message];
/**
* Fired when requests to SQS were aborted.
*/
aborted: [];
/**
* Fired when the consumer starts its work..
*/
started: [];
/**
* Fired when the consumer finally stops its work.
*/
stopped: [];
}
Loading

0 comments on commit 1264d04

Please sign in to comment.