diff --git a/src/worker.c b/src/worker.c index 7e1414c3..0c3bd45f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,6 +26,7 @@ #include "private.h" #include "tjs.h" +#include #include extern const uint8_t tjs__worker_bootstrap[]; @@ -46,12 +47,24 @@ typedef struct { uv_stream_t stream; uv_tcp_t tcp; } h; + struct { + union { + uint64_t u64; + uint8_t u8[8]; + } total_size; + uint8_t *data; + uint64_t nread; + } reading; JSValue events[MSGPIPE_EVENT_MAX]; } TJSMessagePipe; typedef struct { uv_write_t req; uint8_t *data; + union { + uint64_t u64; + uint8_t u8[8]; + } data_size; } TJSMessagePipeWriteReq; static void uv__close_cb(uv_handle_t *handle) { @@ -117,8 +130,14 @@ static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *b TJSMessagePipe *p = handle->data; CHECK_NOT_NULL(p); - buf->base = js_malloc(p->ctx, suggested_size); - buf->len = suggested_size; + if (p->reading.data) { + buf->base = (char *) p->reading.data + p->reading.nread; + uint64_t remaining = p->reading.total_size.u64 - p->reading.nread; + buf->len = remaining > suggested_size ? suggested_size : remaining; + } else { + buf->base = (char *) p->reading.total_size.u8; + buf->len = sizeof(p->reading.total_size.u8); + } } static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { @@ -129,7 +148,9 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) if (nread < 0) { uv_read_stop(&p->h.stream); - js_free(ctx, buf->base); + if (p->reading.data) + js_free(ctx, p->reading.data); + memset(&p->reading, 0, sizeof(p->reading)); if (nread != UV_EOF) { JSValue error = tjs_new_error(ctx, nread); emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error); @@ -138,15 +159,43 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - // TODO: the entire object might not have come in a single packet. + if (!p->reading.data) { + size_t len_size = sizeof(p->reading.total_size.u8); + + /* This is a bogus read, likely a zero-read. Just return the buffer. */ + if (nread != len_size) + return; + + uint64_t total_size = p->reading.total_size.u64; + CHECK_GE(total_size, 0); + p->reading.data = js_malloc(ctx, total_size); + + return; + } + + /* We are continuing a partial read. */ + uint64_t total_size = p->reading.total_size.u64; + p->reading.nread += nread; + + if (p->reading.nread < total_size) { + /* We still need to read more. */ + + return; + } + + CHECK_EQ(p->reading.nread, total_size); + + /* We have a complete buffer now. */ int flags = JS_READ_OBJ_REFERENCE; - JSValue obj = JS_ReadObject(ctx, (const uint8_t *) buf->base, buf->len, flags); + JSValue obj = JS_ReadObject(ctx, (const uint8_t *) p->reading.data, total_size, flags); if (JS_IsException(obj)) emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx)); else emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj); JS_FreeValue(ctx, obj); - js_free(ctx, buf->base); + + js_free(ctx, p->reading.data); + memset(&p->reading, 0, sizeof(p->reading)); } static JSValue tjs_new_msgpipe(JSContext *ctx, uv_os_sock_t fd) { @@ -211,9 +260,11 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg wr->req.data = wr; wr->data = buf; + wr->data_size.u64 = len; - uv_buf_t b = uv_buf_init((char *) buf, len); - int r = uv_write(&wr->req, &p->h.stream, &b, 1, uv__write_cb); + uv_buf_t bufs[2] = { uv_buf_init((char *) wr->data_size.u8, sizeof(wr->data_size.u8)), + uv_buf_init((char *) buf, len) }; + int r = uv_write(&wr->req, &p->h.stream, bufs, 2, uv__write_cb); if (r != 0) { js_free(ctx, buf); js_free(ctx, wr); diff --git a/tests/helpers/worker-echo.js b/tests/helpers/worker-echo.js index 5dd2e859..10def5d9 100644 --- a/tests/helpers/worker-echo.js +++ b/tests/helpers/worker-echo.js @@ -2,3 +2,6 @@ addEventListener('message', function(e) { postMessage(e.data); }); +addEventListener('messageerror', function(e) { + throw new Error(`Opps! ${e}`); +}); diff --git a/tests/test-worker-large-payload.js b/tests/test-worker-large-payload.js new file mode 100644 index 00000000..ea68fb47 --- /dev/null +++ b/tests/test-worker-large-payload.js @@ -0,0 +1,26 @@ +import assert from 'tjs:assert'; +import path from 'tjs:path'; + + +const data = { + x: new Array(65536).fill('x').join(''), + y: new Array(65536).fill('y').join(''), + z: 1234 +}; +const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js')); +const timer = setTimeout(() => { + w.terminate(); + assert.fail('Timeout out waiting for worker'); +}, 10000); +w.onmessage = event => { + clearTimeout(timer); + const recvData = event.data; + assert.eq(data.x, recvData.x, 'Message received matches'); + assert.eq(data.y, recvData.y, 'Message received matches'); + assert.eq(data.z, recvData.z, 'Message received matches'); + w.terminate(); +}; +w.onmessageerror = event => { + assert.fail(`Error receiving message from worker: ${event}`); +}; +w.postMessage(data);