Skip to content

Commit

Permalink
fix: transform streams could possibly hang (#29)
Browse files Browse the repository at this point in the history
This works around a bug where you cannot call done twice in a single event loop
https://github.com/spion/promise-streams/blob/master/index.js#L70-L73
We've been able to observe this bug in production but not reproduce it in tests
A more performance solution would be a queue of some sort because we don't always have to wait until next tick
  • Loading branch information
reconbot authored Apr 19, 2018
1 parent 76938be commit bde2b1a
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions lib/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import { IBluestream } from './interfaces'
import { defer, maybeResume } from './utils'

async function transformHandler (data, encoding, done) {
// This works around a bug where you cannot call done twice in a single event loop
// https://github.com/spion/promise-streams/blob/master/index.js#L70-L73
// We've been able to observe this bug in production but not reproduce it in tests
// A more performance solution would be a queue of some sort because we don't always have to wait until next tick
const finish = () => setImmediate(done)
const processed = this.asyncTransform(data, encoding)
.then(async transformedData => {
.then(transformedData => {
if (transformedData !== undefined) {
await this.push(transformedData)
this.push(transformedData)
}
this.queue.delete(processed)
}, e => {
Expand All @@ -15,9 +20,9 @@ async function transformHandler (data, encoding, done) {
this.queue.add(processed)

if (this.queue.size < this.concurrent) {
return done()
return finish()
}
Promise.race(this.queue).then(() => done(), e => this.emitError(e))
Promise.race(this.queue).then(finish, e => this.emitError(e))
}

function flushHandler (done) {
Expand All @@ -43,6 +48,7 @@ export class TransformStream extends Transform implements IBluestream {
private handlingErrors: boolean
private queue: Set<Promise<any>>
private streamEnd
private doneThisTick: boolean
// tslint:disable-next-line
private _flush: (done: (err, data) => void) => void;

Expand Down Expand Up @@ -83,6 +89,7 @@ export class TransformStream extends Transform implements IBluestream {
this.handlingErrors = false
this.concurrent = opts.concurrent
this.queue = new Set()
this.doneThisTick = false
this.once('finish', () => this.streamEnd.resolve())
}

Expand Down

0 comments on commit bde2b1a

Please sign in to comment.