diff --git a/README.md b/README.md index c419ef1..be67706 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,41 @@ const text = await retrier.retry(() => fs.readFile("README.md", "utf8")); The `retry()` method will either pass through the result on success or wait and retry on failure. Any error that isn't caught by the retrier is automatically rejected so the end result is a transparent passing through of both success and failure. +### Setting a Timeout + +You can control how long a task will attempt to retry before giving up by passing the `timeout` option to the `Retrier` constructor. By default, the timeout is one minute. + +```js +import fs from "fs/promises"; + +const retrier = new Retrier(error => { + return error.code === "ENFILE" || error.code === "EMFILE"; +}, { timeout: 100_000 }); + +const text = await retrier.retry(() => fs.readFile("README.md", "utf8")); +``` + +When a call times out, it rejects the first error that was received from calling the function. + +### Setting a Concurrency Limit + +When processing a large number of function calls, you can limit the number of concurrent function calls by passing the `concurrency` option to the `Retrier` constructor. By default, `concurrency` is 1000. + +```js +import fs from "fs/promises"; + +const retrier = new Retrier(error => { + return error.code === "ENFILE" || error.code === "EMFILE"; +}, { concurrency: 100 }); + +const filenames = getFilenames(); +const contents = await Promise.all( + filenames.map(filename => retrier.retry(() => fs.readFile(filename, "utf8")) +); +``` + +### Aborting with `AbortSignal` + You can also pass an `AbortSignal` to cancel a retry: ```js diff --git a/package-lock.json b/package-lock.json index 0a76f0d..552d2d0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -622,12 +622,12 @@ } }, "node_modules/debug": { - "version": "4.3.4", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", - "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", "dev": true, "dependencies": { - "ms": "2.1.2" + "ms": "^2.1.3" }, "engines": { "node": ">=6.0" @@ -638,12 +638,6 @@ } } }, - "node_modules/debug/node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true - }, "node_modules/diff": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.0.0.tgz", @@ -1438,6 +1432,29 @@ "url": "https://github.com/chalk/chalk?sponsor=1" } }, + "node_modules/lint-staged/node_modules/debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, + "dependencies": { + "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/lint-staged/node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + }, "node_modules/listr2": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/listr2/-/listr2-8.0.1.tgz", @@ -1876,6 +1893,29 @@ "node": ">= 14.0.0" } }, + "node_modules/mocha/node_modules/debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dev": true, + "dependencies": { + "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/mocha/node_modules/debug/node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + }, "node_modules/mocha/node_modules/glob": { "version": "8.1.0", "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", diff --git a/src/retrier.js b/src/retrier.js index 1a66a8a..3358c46 100644 --- a/src/retrier.js +++ b/src/retrier.js @@ -10,11 +10,23 @@ const MAX_TASK_TIMEOUT = 60000; const MAX_TASK_DELAY = 100; +const MAX_CONCURRENCY = 1000; //----------------------------------------------------------------------------- // Helpers //----------------------------------------------------------------------------- +/** + * Logs a message to the console if the DEBUG environment variable is set. + * @param {string} message The message to log. + * @returns {void} + */ +function debug(message) { + if (globalThis?.process?.env.DEBUG) { + console.log(message); + } +} + /* * The following logic has been extracted from graceful-fs. * @@ -59,6 +71,29 @@ function isTimeToBail(task, timeout) { return task.age > timeout; } +/** + * Creates a new promise with resolve and reject functions. + * @returns {{promise:Promise, resolve:(value:any) => any, reject: (value:any) => any}} A new promise. + */ +function createPromise() { + if (Promise.withResolvers) { + return Promise.withResolvers(); + } + + let resolve, reject; + + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + if (resolve === undefined || reject === undefined) { + throw new Error("Promise executor did not initialize resolve or reject."); + } + + return { promise, resolve, reject }; +} + /** * A class to represent a task in the retry queue. @@ -154,7 +189,19 @@ export class Retrier { * Represents the queue for processing tasks. * @type {Array} */ - #queue = []; + #retrying = []; + + /** + * Represents the queue for pending tasks. + * @type {Array} + */ + #pending = []; + + /** + * The number of tasks currently being processed. + * @type {number} + */ + #working = 0; /** * The timeout for the queue. @@ -180,14 +227,21 @@ export class Retrier { */ #check; + /** + * The maximum number of concurrent tasks. + * @type {number} + */ + #concurrency; + /** * Creates a new instance. * @param {Function} check The function to call. * @param {object} [options] The options for the instance. * @param {number} [options.timeout] The timeout for the queue. * @param {number} [options.maxDelay] The maximum delay for the queue. + * @param {number} [options.concurrency] The maximum number of concurrent tasks. */ - constructor(check, { timeout = MAX_TASK_TIMEOUT, maxDelay = MAX_TASK_DELAY } = {}) { + constructor(check, { timeout = MAX_TASK_TIMEOUT, maxDelay = MAX_TASK_DELAY, concurrency = MAX_CONCURRENCY } = {}) { if (typeof check !== "function") { throw new Error("Missing function to check errors"); @@ -196,50 +250,154 @@ export class Retrier { this.#check = check; this.#timeout = timeout; this.#maxDelay = maxDelay; + this.#concurrency = concurrency; } /** - * Adds a new retry job to the queue. + * Gets the number of tasks waiting to be retried. + * @returns {number} The number of tasks in the retry queue. + */ + get retrying() { + return this.#retrying.length; + } + + /** + * Gets the number of tasks waiting to be processed in the pending queue. + * @returns {number} The number of tasks in the pending queue. + */ + get pending() { + return this.#pending.length; + } + + /** + * Gets the number of tasks currently being processed. + * @returns {number} The number of tasks currently being processed. + */ + get working() { + return this.#working; + } + + /** + * Calls the function and retries if it fails. * @param {Function} fn The function to call. - * @param {object} [options] The options for the job. + * @param {Object} options The options for the job. * @param {AbortSignal} [options.signal] The AbortSignal to monitor for cancellation. - * @returns {Promise} A promise that resolves when the queue is - * processed. + * @param {Promise} options.promise The promise to return when the function settles. + * @param {Function} options.resolve The resolve function for the promise. + * @param {Function} options.reject The reject function for the promise. + * @returns {Promise} A promise that resolves when the function is + * called successfully. */ - retry(fn, { signal } = {}) { - - signal?.throwIfAborted(); + #call(fn, { signal, promise, resolve, reject }) { let result; try { result = fn(); } catch (/** @type {any} */ error) { - return Promise.reject(new Error(`Synchronous error: ${error.message}`, { cause: error })); + reject(new Error(`Synchronous error: ${error.message}`, { cause: error })); + return promise; } // if the result is not a promise then reject an error if (!result || typeof result.then !== "function") { - return Promise.reject(new Error("Result is not a promise.")); + reject(new Error("Result is not a promise.")); + return promise; } + this.#working++; + promise.finally(() => { + this.#working--; + this.#processPending(); + }); + // call the original function and catch any ENFILE or EMFILE errors // @ts-ignore because we know it's any - return Promise.resolve(result).catch(error => { - if (!this.#check(error)) { - throw error; - } + return Promise.resolve(result) + .then(value => { + debug("Function called successfully without retry."); + resolve(value); + return promise; + }) + .catch(error => { + if (!this.#check(error)) { + reject(error); + return promise; + } - return new Promise((resolve, reject) => { - this.#queue.push(new RetryTask(fn, error, resolve, reject, signal)); + const task = new RetryTask(fn, error, resolve, reject, signal); + + debug(`Function failed, queuing for retry with task ${task.id}.`); + this.#retrying.push(task); signal?.addEventListener("abort", () => { + debug(`Task ${task.id} was aborted due to AbortSignal.`); reject(signal.reason); }); this.#processQueue(); + + return promise; }); - }); + } + + /** + * Adds a new retry job to the queue. + * @param {Function} fn The function to call. + * @param {object} [options] The options for the job. + * @param {AbortSignal} [options.signal] The AbortSignal to monitor for cancellation. + * @returns {Promise} A promise that resolves when the queue is + * processed. + */ + retry(fn, { signal } = {}) { + + signal?.throwIfAborted(); + + const { promise, resolve, reject } = createPromise(); + + this.#pending.push(() => this.#call(fn, { signal, promise, resolve, reject })); + this.#processPending(); + + return promise; + } + + + /** + * Processes the pending queue and the retry queue. + * @returns {void} + */ + #processAll() { + if (this.pending) { + this.#processPending(); + } + + if (this.retrying) { + this.#processQueue(); + } + } + + /** + * Processes the pending queue to see which tasks can be started. + * @returns {void} + */ + #processPending() { + + debug(`Processing pending tasks: ${this.pending} pending, ${this.working} working.`); + + const available = this.#concurrency - this.working; + + if (available <= 0) { + return; + } + + const count = Math.min(this.pending, available); + + for (let i = 0; i < count; i++) { + const task = this.#pending.shift(); + task?.(); + } + + debug(`Processed pending tasks: ${this.pending} pending, ${this.working} working.`); } /** @@ -251,17 +409,26 @@ export class Retrier { clearTimeout(this.#timerId); this.#timerId = undefined; + debug(`Processing retry queue: ${this.retrying} retrying, ${this.working} working.`); + + const processAgain = () => { + this.#timerId = setTimeout(() => this.#processAll(), 0); + }; + // if there's nothing in the queue, we're done - const task = this.#queue.shift(); + const task = this.#retrying.shift(); if (!task) { + debug("Queue is empty, exiting."); + + if (this.pending) { + processAgain(); + } return; } - const processAgain = () => { - this.#timerId = setTimeout(() => this.#processQueue(), 0); - }; // if it's time to bail, then bail if (isTimeToBail(task, this.#timeout)) { + debug(`Task ${task.id} was abandoned due to timeout.`); task.reject(task.error); processAgain(); return; @@ -269,7 +436,8 @@ export class Retrier { // if it's not time to retry, then wait and try again if (!isTimeToRetry(task, this.#maxDelay)) { - this.#queue.push(task); + debug(`Task ${task.id} is not ready to retry, skipping.`); + this.#retrying.push(task); processAgain(); return; } @@ -280,20 +448,26 @@ export class Retrier { // Promise.resolve needed in case it's a thenable but not a Promise Promise.resolve(task.fn()) // @ts-ignore because we know it's any - .then(result => task.resolve(result)) + .then(result => { + debug(`Task ${task.id} succeeded after ${task.age}ms.`); + task.resolve(result); + }) // @ts-ignore because we know it's any .catch(error => { if (!this.#check(error)) { + debug(`Task ${task.id} failed with non-retryable error: ${error.message}.`); task.reject(error); return; } // update the task timestamp and push to back of queue to try again task.lastAttempt = Date.now(); - this.#queue.push(task); - + this.#retrying.push(task); + debug(`Task ${task.id} failed, requeueing to try again.`); }) - .finally(() => this.#processQueue()); + .finally(() => { + this.#processAll(); + }); } } diff --git a/tests/retrier.test.js b/tests/retrier.test.js index c593e22..9bfffc4 100644 --- a/tests/retrier.test.js +++ b/tests/retrier.test.js @@ -88,104 +88,306 @@ describe("Retrier", () => { assert.equal(result, 5); }); - it("should reject an error when the function is synchronous", () => { + describe("Concurrency", () => { - const retrier = new Retrier(error => error.message === "foo"); - return assert.rejects(() => { - return retrier.retry(() => { - throw new Error("foo"); + it("should retry a function that rejects an error with default concurrency", async () => { + + let count1 = 0; + let count2 = 0; + const retrier = new Retrier(error => error.message === "foo"); + const promise1 = retrier.retry(async () => { + count1++; + + if (count1 === 1) { + throw new Error("foo"); + } + + return count1; + }); + + const promise2 = retrier.retry(async () => { + count2++; + + if (count2 < 3) { + throw new Error("foo"); + } + + return count2; + }); + + const promise3 = retrier.retry(async () => { + return 42; }); - }, { - message: "Synchronous error: foo" + + + assert.strictEqual(retrier.working, 3); + assert.strictEqual(retrier.pending, 0); + + const result1 = await promise1; + const result2 = await promise2; + const result3 = await promise3; + + assert.strictEqual(retrier.working, 0); + assert.strictEqual(retrier.pending, 0); + + assert.strictEqual(result1, 2); + assert.strictEqual(result2, 3); + assert.strictEqual(result3, 42); }); - }); - it("should reject an error that Retrier isn't expecting after expected errors", () => { + it("should retry a function that rejects an error with concurrency: 1", async () => { - const retrier = new Retrier(error => error.message === "foo"); - let callCount = 0; + let count1 = 0; + let count2 = 0; + const retrier = new Retrier(error => error.message === "foo", { + concurrency: 1 + }); + const promise1 = retrier.retry(async () => { + count1++; + + if (count1 === 1) { + throw new Error("foo"); + } + + return count1; + }); - return assert.rejects(async () => { - await retrier.retry(async () => { - callCount++; + const promise2 = retrier.retry(async () => { + count2++; - if (callCount < 3) { + if (count2 < 3) { throw new Error("foo"); } - throw new Error("bar"); + return count2; }); - }, /bar/); - }); - it("should reject an error when the function doesn't return a promise", () => { + const promise3 = retrier.retry(async () => { + return 42; + }); - const retrier = new Retrier(error => error.message === "foo"); + + assert.strictEqual(retrier.working, 1); + assert.strictEqual(retrier.pending, 2); - return assert.rejects(async () => { - // @ts-expect-error - await retrier.retry(() => {}); - }, /Result is not a promise/); - }); + const result1 = await promise1; + const result2 = await promise2; + const result3 = await promise3; - it("should cancel a function when an AbortSignal starts out aborted", async () => { + assert.strictEqual(retrier.working, 0); + assert.strictEqual(retrier.pending, 0); - let count = 0; - const retrier = new Retrier(error => error.message === "foo"); - await assert.rejects(async () => { - await retrier.retry(async () => { - count++; + assert.strictEqual(result1, 2); + assert.strictEqual(result2, 3); + assert.strictEqual(result3, 42); + }); + + it("should retry 100 functions that reject an error with concurrency", async () => { + + let count = 0; + const retrier = new Retrier(error => error.message === "foo", { + concurrency: 10 + }); + + const values = Array.from({ length: 100 }, (_, i) => i); + const promises = []; + for (const value of values) { + promises.push(retrier.retry(async () => { + count++; + + if (count < value) { + throw new Error("foo"); + } - if (count < 5) { + return count; + })); + } + + assert.strictEqual(retrier.working, 10); + assert.strictEqual(retrier.pending, 90); + + const results = await Promise.all(promises); + assert.strictEqual(retrier.working, 0); + assert.strictEqual(retrier.pending, 0); + + for (let i = 0; i < 100; i++) { + assert.strictEqual(results[i], i + 1); + } + }); + + it("should retry 100 functions that that don't reject an error with concurrency", async () => { + + const retrier = new Retrier(error => error.message === "foo", { + concurrency: 5 + }); + + const values = Array.from({ length: 10000 }, (_, i) => i); + const promises = []; + for (const value of values) { + promises.push(retrier.retry(async () => value)); + } + + assert.strictEqual(retrier.working, 5); + assert.strictEqual(retrier.pending, 9995); + assert.strictEqual(retrier.retrying, 0); + + const results = await Promise.all(promises); + assert.strictEqual(retrier.working, 0); + assert.strictEqual(retrier.pending, 0); + assert.strictEqual(retrier.retrying, 0); + + for (let i = 0; i < 100; i++) { + assert.strictEqual(results[i], i); + } + }); + + it("should retry a function until it throws an unknown error", async () => { + + let count1 = 0; + let count2 = 0; + const retrier = new Retrier(error => error.message === "foo"); + const promise1 = retrier.retry(async () => { + count1++; + + if (count1 === 1) { + throw new Error("foo"); + } + + return count1; + }); + + const promise2 = retrier.retry(async () => { + count2++; + + if (count2 < 3) { throw new Error("foo"); } - return count; - }, { signal: AbortSignal.abort()}); - }, /AbortError/); + return count2; + }); + + const promise3 = retrier.retry(async () => { + throw new TypeError("Whatever"); + }); + + + assert.strictEqual(retrier.working, 3); + assert.strictEqual(retrier.pending, 0); + + const result1 = await promise1; + const result2 = await promise2; + await assert.rejects(promise3, /Whatever/); + + assert.strictEqual(result1, 2); + assert.strictEqual(result2, 3); + }); }); - it("should cancel a function when an AbortSignal times out", async () => { + describe("Errors", () => { + it("should reject an error when the function is synchronous", () => { - let count = 0; - const retrier = new Retrier(error => error.message === "foo"); - await assert.rejects(async () => { - await retrier.retry(async () => { - count++; + const retrier = new Retrier(error => error.message === "foo"); - if (count < 5) { + return assert.rejects(() => { + return retrier.retry(() => { throw new Error("foo"); - } + }); + }, { + message: "Synchronous error: foo" + }); + }); + + it("should reject an error that Retrier isn't expecting after expected errors", () => { + + const retrier = new Retrier(error => error.message === "foo"); + let callCount = 0; + + return assert.rejects(async () => { + await retrier.retry(async () => { + callCount++; + + if (callCount < 3) { + throw new Error("foo"); + } + + throw new Error("bar"); + }); + }, /bar/); + }); + + it("should reject an error when the function doesn't return a promise", () => { - return count; - }, { signal: AbortSignal.timeout(0)}); - }, /TimeoutError/); + const retrier = new Retrier(error => error.message === "foo"); + + return assert.rejects(async () => { + // @ts-expect-error + await retrier.retry(() => { }); + }, /Result is not a promise/); + }); }); - it("should cancel a function when an AbortSignal is triggered", async () => { + describe("AbortSignal", () => { - let count = 0; - const controller = new AbortController(); - const retrier = new Retrier(error => error.message === "foo", { - maxDelay: 500 + it("should cancel a function when an AbortSignal starts out aborted", async () => { + + let count = 0; + const retrier = new Retrier(error => error.message === "foo"); + await assert.rejects(async () => { + await retrier.retry(async () => { + count++; + + if (count < 5) { + throw new Error("foo"); + } + + return count; + }, { signal: AbortSignal.abort() }); + }, /AbortError/); }); - setTimeout(() => { - controller.abort(); - }, 0); + it("should cancel a function when an AbortSignal times out", async () => { - await assert.rejects(async () => { - await retrier.retry(async () => { - count++; + let count = 0; + const retrier = new Retrier(error => error.message === "foo"); + await assert.rejects(async () => { + await retrier.retry(async () => { + count++; - if (count < 5) { - throw new Error("foo"); - } + if (count < 5) { + throw new Error("foo"); + } + + return count; + }, { signal: AbortSignal.timeout(0) }); + }, /TimeoutError/); + }); + + it("should cancel a function when an AbortSignal is triggered", async () => { + + let count = 0; + const controller = new AbortController(); + const retrier = new Retrier(error => error.message === "foo", { + maxDelay: 500 + }); - return count; - }, { signal: controller.signal }); - }, /AbortError/); + setTimeout(() => { + controller.abort(); + }, 0); + + await assert.rejects(async () => { + await retrier.retry(async () => { + count++; + + if (count < 5) { + throw new Error("foo"); + } + + return count; + }, { signal: controller.signal }); + }, /AbortError/); + }); + }); });