Skip to content

Commit

Permalink
Emit progress events in Node.js again
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Dec 7, 2023
1 parent eed7b9a commit b9cd634
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions lib/node/httpStack.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,16 @@ class Request {
})

if (body instanceof Readable) {
// Readable stream are piped through a PassThrough instance, which
// counts the number of bytes passed through. This is used, for example,
// when an fs.ReadStream is provided to tus-js-client.
body.pipe(new ProgressEmitter(this._progressHandler)).pipe(req)
} else if (body instanceof Uint8Array) {
// For Buffers and Uint8Arrays (in Node.js all buffers are instances of Uint8Array),
// we write chunks of the buffer to the stream and use that to track the progress.
// This is used when either a Buffer or a normal readable stream is provided
// to tus-js-client.
writeBufferToStreamWithProgress(req, body, this._progressHandler)
} else {
req.end(body)
}
Expand Down Expand Up @@ -153,3 +162,52 @@ class ProgressEmitter extends Transform {
callback(null, chunk)
}
}

// writeBufferToStreamWithProgress writes chunks from `source` (either a
// Buffer or Uint8Array) to the readable stream `stream`.
// The size of the chunk depends on the stream's highWaterMark to fill the
// stream's internal buffer as best as possible.
// If the internal buffer is full, the callback `onprogress` will be invoked
// to notify about the write progress. Writing will be resumed once the internal
// buffer is empty, as indicated by the emitted `drain` event.
// See https://nodejs.org/docs/latest/api/stream.html#buffering for more details
// on the buffering behavior of streams.
const writeBufferToStreamWithProgress = (stream, source, onprogress) => {
onprogress = throttle(onprogress, 100, {
leading: true,
trailing: false,
})

let offset = 0

function writeNextChunk() {
// Take at most the amount of bytes from highWaterMark. This should fill the streams
// internal buffer already.
const chunkSize = Math.min(stream.writableHighWaterMark, source.length - offset)

// Note: We use subarray instead of slice because it works without copying data for
// Buffers and Uint8Arrays.
const chunk = source.subarray(offset, offset + chunkSize)
offset += chunk.length

// `write` returns true if the internal buffer is not full and we should write more.
// If the stream is destroyed because the request is aborted, it will return false
// and no 'drain' event is emitted, so won't continue writing data.
const canContinue = stream.write(chunk)

if (!canContinue) {
// If the buffer is full, wait for the 'drain' event to write more data.
stream.once('drain', writeNextChunk)
onprogress(offset)
} else if (offset < source.length) {
// If there's still data to write and the buffer is not full, write next chunk.
writeNextChunk()
} else {
// If all data has been written, close the stream if needed, and emit a 'finish' event.
stream.end()
}
}

// Start writing the first chunk.
writeNextChunk()
}

0 comments on commit b9cd634

Please sign in to comment.