From b9cd634c1dfbfc7b5d079a69067b10fb551a412f Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Thu, 7 Dec 2023 10:28:15 +0100 Subject: [PATCH] Emit progress events in Node.js again --- lib/node/httpStack.js | 58 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/lib/node/httpStack.js b/lib/node/httpStack.js index c3f84fb2..629f26fb 100644 --- a/lib/node/httpStack.js +++ b/lib/node/httpStack.js @@ -88,7 +88,16 @@ class Request { }) if (body instanceof Readable) { + // Readable stream are piped through a PassThrough instance, which + // counts the number of bytes passed through. This is used, for example, + // when an fs.ReadStream is provided to tus-js-client. body.pipe(new ProgressEmitter(this._progressHandler)).pipe(req) + } else if (body instanceof Uint8Array) { + // For Buffers and Uint8Arrays (in Node.js all buffers are instances of Uint8Array), + // we write chunks of the buffer to the stream and use that to track the progress. + // This is used when either a Buffer or a normal readable stream is provided + // to tus-js-client. + writeBufferToStreamWithProgress(req, body, this._progressHandler) } else { req.end(body) } @@ -153,3 +162,52 @@ class ProgressEmitter extends Transform { callback(null, chunk) } } + +// writeBufferToStreamWithProgress writes chunks from `source` (either a +// Buffer or Uint8Array) to the readable stream `stream`. +// The size of the chunk depends on the stream's highWaterMark to fill the +// stream's internal buffer as best as possible. +// If the internal buffer is full, the callback `onprogress` will be invoked +// to notify about the write progress. Writing will be resumed once the internal +// buffer is empty, as indicated by the emitted `drain` event. +// See https://nodejs.org/docs/latest/api/stream.html#buffering for more details +// on the buffering behavior of streams. +const writeBufferToStreamWithProgress = (stream, source, onprogress) => { + onprogress = throttle(onprogress, 100, { + leading: true, + trailing: false, + }) + + let offset = 0 + + function writeNextChunk() { + // Take at most the amount of bytes from highWaterMark. This should fill the streams + // internal buffer already. + const chunkSize = Math.min(stream.writableHighWaterMark, source.length - offset) + + // Note: We use subarray instead of slice because it works without copying data for + // Buffers and Uint8Arrays. + const chunk = source.subarray(offset, offset + chunkSize) + offset += chunk.length + + // `write` returns true if the internal buffer is not full and we should write more. + // If the stream is destroyed because the request is aborted, it will return false + // and no 'drain' event is emitted, so won't continue writing data. + const canContinue = stream.write(chunk) + + if (!canContinue) { + // If the buffer is full, wait for the 'drain' event to write more data. + stream.once('drain', writeNextChunk) + onprogress(offset) + } else if (offset < source.length) { + // If there's still data to write and the buffer is not full, write next chunk. + writeNextChunk() + } else { + // If all data has been written, close the stream if needed, and emit a 'finish' event. + stream.end() + } + } + + // Start writing the first chunk. + writeNextChunk() +}