Skip to content

Commit

Permalink
Merge branch 'master' of github.com:josdejong/workerpool into fix/abo…
Browse files Browse the repository at this point in the history
…rt-listener-execution
  • Loading branch information
joshLong145 committed Jan 1, 2025
2 parents 7c1e099 + 100f512 commit 29c2a51
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 64 deletions.
6 changes: 6 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
https://github.com/josdejong/workerpool


## not yet published, version 9.3.0

- Feat: support for events and std streams to parents from an abort handler.
Thanks @joshLong145.


## 2024-10-11, version 9.2.0

- Feat: implement support for abort handlers in the workers (#448).
Expand Down
94 changes: 63 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ The following options are available:
- In case of `'process'`, `child_process` will be used. Only available in a node.js environment.
- In case of `'thread'`, `worker_threads` will be used. If `worker_threads` are not available, an error is thrown. Only available in a node.js environment.
- `workerTerminateTimeout: number`. The timeout in milliseconds to wait for a worker to cleanup it's resources on termination before stopping it forcefully. Default value is `1000`.
- `abortListenerTimeout: number`. The timeout in milliseconds to wait for abort listener's before stopping it forcefully, triggering cleanup. Default value is `1000`.
- `forkArgs: String[]`. For `process` worker type. An array passed as `args` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options)
- `forkOpts: Object`. For `process` worker type. An object passed as `options` to [child_process.fork](https://nodejs.org/api/child_process.html#child_processforkmodulepath-args-options). See nodejs documentation for available options.
- `workerOpts: Object`. For `web` worker type. An object passed to the [constructor of the web worker](https://html.spec.whatwg.org/multipage/workers.html#dom-worker). See [WorkerOptions specification](https://html.spec.whatwg.org/multipage/workers.html#workeroptions) for available options.
Expand Down Expand Up @@ -393,7 +394,62 @@ workerpool.worker({
});
```

Tasks may configure an `abort handler` to perform cleanup operations when `timeout` or `cancel` is called on a `task`. the `abortListenerTimeout` option can be configured to control when cleanup should be aborted in the case an `abortHandler` never resolves. This timeout trigger will cause the given worker to be cleaned up. Allowing a new worker to be created if need be.
### Events

You can send data back from workers to the pool while the task is being executed using the `workerEmit` function:

`workerEmit(payload: any) : unknown`

This function only works inside a worker **and** during a task.

Example:

```js
// file myWorker.js
const workerpool = require('workerpool');

function eventExample(delay) {
workerpool.workerEmit({
status: 'in_progress',
});

workerpool.workerEmit({
status: 'complete',
});

return true;
}

// create a worker and register functions
workerpool.worker({
eventExample: eventExample,
});
```

To receive those events, you can use the `on` option of the pool `exec` method:

```js
pool.exec('eventExample', [], {
on: function (payload) {
if (payload.status === 'in_progress') {
console.log('In progress...');
} else if (payload.status === 'complete') {
console.log('Done!');
}
},
});
```

### Worker API
Workers have access to a `worker` api which contains the following methods

- `emit: (payload: unknown | Transfer): void`
- `addAbortListener: (listener: () => Promise<void>): void`


Worker termination may be recoverable through `abort listeners` which are registered through `worker.addAbortListener`. If all registered listeners resolve then the worker will not be terminated, allowing for worker reuse in some cases.

NOTE: For operations to successfully clean up, a worker implementation should be *async*. If the worker thread is blocked, then the worker will be killed.

```js
function asyncTimeout() {
Expand All @@ -402,11 +458,9 @@ function asyncTimeout() {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.

// Register a listener which will resolve before the time out
// above triggers.
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
Expand All @@ -425,25 +479,17 @@ workerpool.worker(
);
```

### Events

You can send data back from workers to the pool while the task is being executed using the `workerEmit` function:

`workerEmit(payload: any) : unknown`

This function only works inside a worker **and** during a task.

Example:
Events may also be emitted from the `worker` api through `worker.emit`

```js
// file myWorker.js
const workerpool = require('workerpool');

function eventExample(delay) {
workerpool.workerEmit({
status: 'in_progress',
this.worker.emit({
status: "in_progress",
});

workerpool.workerEmit({
status: 'complete',
});
Expand All @@ -457,20 +503,6 @@ workerpool.worker({
});
```

To receive those events, you can use the `on` option of the pool `exec` method:

```js
pool.exec('eventExample', [], {
on: function (payload) {
if (payload.status === 'in_progress') {
console.log('In progress...');
} else if (payload.status === 'complete') {
console.log('Done!');
}
},
});
```

### Utilities

Following properties are available for convenience:
Expand Down
11 changes: 8 additions & 3 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ var worker = {
// works in both node.js and the browser
var publicWorker = {
/**
*
* @param {() => Promise<void>} listener
*/
* Registers listeners which will trigger when a task is timed out or cancled. If all listeners resolve, the worker executing the given task will not be terminated.
* *Note*: If there is a blocking operation within a listener, the worker will be terminated.
* @param {() => Promise<void>} listener
*/
addAbortListener: function(listener) {
worker.abortListeners.push(listener);
},

/**
* Emit an event from the worker thread to the main thread.
* @param {any} payload
*/
emit: worker.emit
};

Expand Down
48 changes: 25 additions & 23 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ function add(a, b) {
}

describe('Pool', function () {

// Creating pool with this function ensures that the pool is terminated
// at the end of the test, which avoid hanging the test suite if terminate()
// hadn't been called for some reasons
// hadn't been called for some reason
let createdPools = []
function createPool(script, options) {
var pool = new Pool(script, options);

after(() => {
return pool.terminate();
});

const pool = new Pool(script, options);
createdPools.push(pool);
return pool;
}

afterEach(async () => {
while (createdPools.length > 0) {
await createdPools.shift().terminate();
}
});

describe('nodeWorker', function() {
function add(a,b) {
return a+b;
Expand Down Expand Up @@ -209,7 +211,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
})
Expand All @@ -232,7 +234,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
})
Expand All @@ -259,7 +261,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
})
Expand All @@ -282,7 +284,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
})
Expand Down Expand Up @@ -387,7 +389,7 @@ describe('Pool', function () {
done();
})
.catch(function () {
assert('Should not throw an error');
assert.fail('Should not throw an error');
});
});
});
Expand Down Expand Up @@ -416,7 +418,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
});
});
});
Expand All @@ -434,7 +436,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
});
});
});
Expand Down Expand Up @@ -471,7 +473,7 @@ describe('Pool', function () {
done();
})
.catch(function () {
assert('Should not throw an error');
assert.fail('Should not throw an error');
});
});

Expand All @@ -484,7 +486,7 @@ describe('Pool', function () {

pool.exec(testAsync)
.then(function () {
assert('Should not resolve');
assert.fail('Should not resolve');
})
.catch(function (err) {
assert.strictEqual(err.toString(), 'Error: I reject!');
Expand Down Expand Up @@ -653,7 +655,7 @@ describe('Pool', function () {
return pool.exec(forever)
.timeout(50)
.then(function (result) {
assert('promise should never resolve');
assert.fail('promise should never resolve');
})
//.catch(Promise.CancellationError, function (err) { // TODO: not yet supported
.catch(function (err) {
Expand Down Expand Up @@ -698,7 +700,7 @@ describe('Pool', function () {
.catch(done);
})
.catch(function (err) {
assert('promise should not throw');
assert.fail('promise should not throw');
});
});

Expand All @@ -721,7 +723,7 @@ describe('Pool', function () {
pool.exec(sleep)
.timeout(delay)
.then(function (result) {
assert('promise should never resolve');
assert.fail('promise should never resolve');
})
.catch(function (err) {
assert(err instanceof Promise.TimeoutError);
Expand Down Expand Up @@ -1211,7 +1213,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
});
Expand All @@ -1234,7 +1236,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
});
Expand All @@ -1252,7 +1254,7 @@ describe('Pool', function () {
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
assert.fail('Should not throw an error');
done(err);
});
});
Expand Down
Loading

0 comments on commit 29c2a51

Please sign in to comment.