From bde2b1ab79df65e31876eb41681780097c2b12a6 Mon Sep 17 00:00:00 2001 From: Francis Gulotta Date: Thu, 19 Apr 2018 12:00:11 -0400 Subject: [PATCH] fix: transform streams could possibly hang (#29) This works around a bug where you cannot call done twice in a single event loop https://github.com/spion/promise-streams/blob/master/index.js#L70-L73 We've been able to observe this bug in production but not reproduce it in tests A more performance solution would be a queue of some sort because we don't always have to wait until next tick --- lib/transform.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/transform.ts b/lib/transform.ts index 53f5a47..a17c171 100644 --- a/lib/transform.ts +++ b/lib/transform.ts @@ -3,10 +3,15 @@ import { IBluestream } from './interfaces' import { defer, maybeResume } from './utils' async function transformHandler (data, encoding, done) { + // This works around a bug where you cannot call done twice in a single event loop + // https://github.com/spion/promise-streams/blob/master/index.js#L70-L73 + // We've been able to observe this bug in production but not reproduce it in tests + // A more performance solution would be a queue of some sort because we don't always have to wait until next tick + const finish = () => setImmediate(done) const processed = this.asyncTransform(data, encoding) - .then(async transformedData => { + .then(transformedData => { if (transformedData !== undefined) { - await this.push(transformedData) + this.push(transformedData) } this.queue.delete(processed) }, e => { @@ -15,9 +20,9 @@ async function transformHandler (data, encoding, done) { this.queue.add(processed) if (this.queue.size < this.concurrent) { - return done() + return finish() } - Promise.race(this.queue).then(() => done(), e => this.emitError(e)) + Promise.race(this.queue).then(finish, e => this.emitError(e)) } function flushHandler (done) { @@ -43,6 +48,7 @@ export class TransformStream extends Transform implements IBluestream { private handlingErrors: boolean private queue: Set> private streamEnd + private doneThisTick: boolean // tslint:disable-next-line private _flush: (done: (err, data) => void) => void; @@ -83,6 +89,7 @@ export class TransformStream extends Transform implements IBluestream { this.handlingErrors = false this.concurrent = opts.concurrent this.queue = new Set() + this.doneThisTick = false this.once('finish', () => this.streamEnd.resolve()) }