Skip to content

Commit

Permalink
worker: allow sending huge messages
Browse files Browse the repository at this point in the history
When sending a payload, prefix it with its length. Then on the receiving
end, we read the length and the entire buffer in chunks.
  • Loading branch information
saghul authored Jul 15, 2024
1 parent 8f7e3e6 commit 84267a1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
67 changes: 59 additions & 8 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "private.h"
#include "tjs.h"

#include <string.h>
#include <unistd.h>

extern const uint8_t tjs__worker_bootstrap[];
Expand All @@ -46,12 +47,24 @@ typedef struct {
uv_stream_t stream;
uv_tcp_t tcp;
} h;
struct {
union {
uint64_t u64;
uint8_t u8[8];
} total_size;
uint8_t *data;
uint64_t nread;
} reading;
JSValue events[MSGPIPE_EVENT_MAX];
} TJSMessagePipe;

typedef struct {
uv_write_t req;
uint8_t *data;
union {
uint64_t u64;
uint8_t u8[8];
} data_size;
} TJSMessagePipeWriteReq;

static void uv__close_cb(uv_handle_t *handle) {
Expand Down Expand Up @@ -117,8 +130,14 @@ static void uv__alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *b
TJSMessagePipe *p = handle->data;
CHECK_NOT_NULL(p);

buf->base = js_malloc(p->ctx, suggested_size);
buf->len = suggested_size;
if (p->reading.data) {
buf->base = (char *) p->reading.data + p->reading.nread;
uint64_t remaining = p->reading.total_size.u64 - p->reading.nread;
buf->len = remaining > suggested_size ? suggested_size : remaining;
} else {
buf->base = (char *) p->reading.total_size.u8;
buf->len = sizeof(p->reading.total_size.u8);
}
}

static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
Expand All @@ -129,7 +148,9 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)

if (nread < 0) {
uv_read_stop(&p->h.stream);
js_free(ctx, buf->base);
if (p->reading.data)
js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
if (nread != UV_EOF) {
JSValue error = tjs_new_error(ctx, nread);
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, error);
Expand All @@ -138,15 +159,43 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}

// TODO: the entire object might not have come in a single packet.
if (!p->reading.data) {
size_t len_size = sizeof(p->reading.total_size.u8);

/* This is a bogus read, likely a zero-read. Just return the buffer. */
if (nread != len_size)
return;

uint64_t total_size = p->reading.total_size.u64;
CHECK_GE(total_size, 0);
p->reading.data = js_malloc(ctx, total_size);

return;
}

/* We are continuing a partial read. */
uint64_t total_size = p->reading.total_size.u64;
p->reading.nread += nread;

if (p->reading.nread < total_size) {
/* We still need to read more. */

return;
}

CHECK_EQ(p->reading.nread, total_size);

/* We have a complete buffer now. */
int flags = JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) buf->base, buf->len, flags);
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) p->reading.data, total_size, flags);
if (JS_IsException(obj))
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx));
else
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj);
JS_FreeValue(ctx, obj);
js_free(ctx, buf->base);

js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
}

static JSValue tjs_new_msgpipe(JSContext *ctx, uv_os_sock_t fd) {
Expand Down Expand Up @@ -211,9 +260,11 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg

wr->req.data = wr;
wr->data = buf;
wr->data_size.u64 = len;

uv_buf_t b = uv_buf_init((char *) buf, len);
int r = uv_write(&wr->req, &p->h.stream, &b, 1, uv__write_cb);
uv_buf_t bufs[2] = { uv_buf_init((char *) wr->data_size.u8, sizeof(wr->data_size.u8)),
uv_buf_init((char *) buf, len) };
int r = uv_write(&wr->req, &p->h.stream, bufs, 2, uv__write_cb);
if (r != 0) {
js_free(ctx, buf);
js_free(ctx, wr);
Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/worker-echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ addEventListener('message', function(e) {
postMessage(e.data);
});

addEventListener('messageerror', function(e) {
throw new Error(`Opps! ${e}`);
});
26 changes: 26 additions & 0 deletions tests/test-worker-large-payload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import assert from 'tjs:assert';
import path from 'tjs:path';


const data = {
x: new Array(65536).fill('x').join(''),
y: new Array(65536).fill('y').join(''),
z: 1234
};
const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js'));
const timer = setTimeout(() => {
w.terminate();
assert.fail('Timeout out waiting for worker');
}, 10000);
w.onmessage = event => {
clearTimeout(timer);
const recvData = event.data;
assert.eq(data.x, recvData.x, 'Message received matches');
assert.eq(data.y, recvData.y, 'Message received matches');
assert.eq(data.z, recvData.z, 'Message received matches');
w.terminate();
};
w.onmessageerror = event => {
assert.fail(`Error receiving message from worker: ${event}`);
};
w.postMessage(data);

0 comments on commit 84267a1

Please sign in to comment.