Skip to content

Commit

Permalink
Add communication from worker to main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Elyahou authored and jasnell committed May 2, 2022
1 parent b9551af commit 52be44d
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 27 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ itself.

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.

### Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.

### Property: `completed` (readonly)

The current number of completed tasks.
Expand Down
16 changes: 16 additions & 0 deletions examples/messages/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

const Piscina = require('../../dist/src');
const { resolve } = require('path');

const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js')
});

(async function () {
piscina.on('message', (event) => {
console.log("Messsage received from worker: ", event);
});

await piscina.run();
})();
6 changes: 6 additions & 0 deletions examples/messages/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict';
const { parentPort } = require('worker_threads')

module.exports = () => {
parentPort.postMessage('hello from the worker pool');
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "piscina",
"version": "3.2.0",
"version": "3.3.0",
"description": "A fast, efficient Node.js Worker Thread Pool implementation",
"main": "./dist/src/index.js",
"exports": {
Expand Down
5 changes: 3 additions & 2 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { MessagePort } from 'worker_threads';

export const READY = '_WORKER_READY';

export interface StartupMessage {
filename : string | null;
name : string;
Expand All @@ -17,15 +19,14 @@ export interface RequestMessage {
}

export interface ReadyMessage {
ready: true
[READY]: true
};

export interface ResponseMessage {
taskId : number;
result : any;
error: Error | null;
}

export const commonState = {
isWorkerThread: false,
workerData: undefined
Expand Down
26 changes: 14 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Histogram, build } from 'hdr-histogram-js';
import { performance } from 'perf_hooks';
import hdrobj from 'hdr-histogram-percentiles-obj';
import {
ReadyMessage,
READY,
RequestMessage,
ResponseMessage,
StartupMessage,
Expand Down Expand Up @@ -616,20 +616,22 @@ class ThreadPool {
pool._processPendingMessages();
}

worker.on('message', (message : ReadyMessage) => {
if (message.ready === true) {
if (workerInfo.currentUsage() === 0) {
workerInfo.unref();
}
function onReady() {
if (workerInfo.currentUsage() === 0) {
workerInfo.unref();
}

if (!workerInfo.isReady()) {
workerInfo.markAsReady();
}
return;
if (!workerInfo.isReady()) {
workerInfo.markAsReady();
}
}

function onEventMessage(message: any) {
pool.publicInterface.emit('message', message);
}

worker.emit('error', new Error(
`Unexpected message on Worker: ${inspect(message)}`));
worker.on('message', (message : any) => {
message instanceof Object && READY in message ? onReady() : onEventMessage(message);
});

worker.on('error', (err : Error) => {
Expand Down
3 changes: 2 additions & 1 deletion src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads';
import { pathToFileURL } from 'url';
import {
READY,
commonState,
ReadyMessage,
RequestMessage,
Expand Down Expand Up @@ -89,7 +90,7 @@ parentPort!.on('message', (message : StartupMessage) => {
await getHandler(filename, name);
}

const readyMessage : ReadyMessage = { ready: true };
const readyMessage : ReadyMessage = { [READY]: true };
parentPort!.postMessage(readyMessage);

port.on('message', onMessage.bind(null, port, sharedBuffer));
Expand Down
19 changes: 19 additions & 0 deletions test/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import Piscina from '../dist/src';
import { test } from 'tap';
import { resolve } from 'path';
import { once } from 'events';

test('Pool receive message from workers', async ({ equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
});

const messagePromise = once(pool, 'message');

const taskResult = pool.runTask(`
require('worker_threads').parentPort.postMessage("some message");
42
`);
equal(await taskResult, 42);
equal((await messagePromise)[0], 'some message');
});
11 changes: 0 additions & 11 deletions test/test-uncaught-exception-from-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,3 @@ test('uncaught exception in immediate after task yields error event', async ({ e
// This is the main aassertion here.
equal((await errorEvent)[0].message, 'not_caught');
});

test('using parentPort is treated as an error', async ({ rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
});
await rejects(
pool.runTask(`
require('worker_threads').parentPort.postMessage("some message");
new Promise(() => {}) /* act as if we were doing some work */
`), /Unexpected message on Worker: 'some message'/);
});

0 comments on commit 52be44d

Please sign in to comment.