diff --git a/src/network/response-stream.ts b/src/network/response-stream.ts index 3e46985f..22a264ef 100644 --- a/src/network/response-stream.ts +++ b/src/network/response-stream.ts @@ -5,6 +5,9 @@ import type { BothValueHeaders } from '../@types'; import { type ILogger, NO_OP, parseHeaders } from '../core'; import { getString } from './utils'; +// header or data crlf +const crlfBuffer = Buffer.from('\r\n'); + const endChunked = '0\r\n\r\n'; const headerEnd = '\r\n\r\n'; const endStatusSeparator = '\r\n'; @@ -55,11 +58,8 @@ export class ServerlessStreamResponse extends ServerResponse { this.chunkedEncoding = true; let internalWritable: Writable | null = null; - let isFirstCall = true; - // this ignore is used because I need to ignore these write calls: - // https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L934-L935 - // https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L937 - let writesToIgnore = 0; + let firstCrlfBufferEncountered = false; + let chunkEncountered = false; const socket: Partial & { _writableState: any } = { _writableState: {}, @@ -86,23 +86,7 @@ export class ServerlessStreamResponse extends ServerResponse { encoding, })); - if (!isFirstCall && internalWritable) { - if (data === endChunked) { - internalWritable.end(cb); - - return true; - } - - if (writesToIgnore > 0) { - writesToIgnore--; - return true; - } - - internalWritable.write(data, cb); - writesToIgnore = 3; - } else if (isFirstCall) { - isFirstCall = false; - + if (!internalWritable) { const stringData = getString(data); const endStatusIndex = stringData.indexOf(endStatusSeparator); const status = +stringData.slice(0, endStatusIndex).split(' ')[1]; @@ -120,14 +104,54 @@ export class ServerlessStreamResponse extends ServerResponse { }), ); - writesToIgnore = 1; internalWritable = onReceiveHeaders(status, headers); // If we get an endChunked right after header which means the response body is empty, we need to immediately end the writable if (stringData.substring(endHeaderIndex + 4) === endChunked) internalWritable.end(); + + return true; + } + + // node sends the last chunk crlf as a string: + // https://github.com/nodejs/node/blob/v22.8.0/lib/_http_outgoing.js#L1131 + if (data === endChunked) { + internalWritable.end(cb); + return true; + } + + // check for header or data crlf + // node sends the header and data crlf as a buffer + // below code is aligned to following node implementation of the HTTP/1.1 chunked transfer coding: + // https://github.com/nodejs/node/blob/v22.8.0/lib/_http_outgoing.js#L1012-L1015 + // for reference: https://datatracker.ietf.org/doc/html/rfc9112#section-7 + if (Buffer.isBuffer(data) && crlfBuffer.equals(data)) { + const isHeaderCrlf = !firstCrlfBufferEncountered; + if (isHeaderCrlf) { + firstCrlfBufferEncountered = true; + return true; + } + + const isDataCrlf = firstCrlfBufferEncountered && chunkEncountered; + if (isDataCrlf) { + // done with chunk + firstCrlfBufferEncountered = false; + chunkEncountered = false; + return true; + } + + // the crlf *is* the chunk + } + + const isContentLength = !firstCrlfBufferEncountered; + if (isContentLength) { + // discard content length + return true; } + // write chunk + chunkEncountered = true; + internalWritable.write(data, cb); return true; }, }; diff --git a/test/handlers/aws-stream.handler.spec.ts b/test/handlers/aws-stream.handler.spec.ts index af871d4b..c289676a 100644 --- a/test/handlers/aws-stream.handler.spec.ts +++ b/test/handlers/aws-stream.handler.spec.ts @@ -83,6 +83,46 @@ describe('AwsStreamHandler', () => { expect(Buffer.byteLength(finalBuffer)).toBe(Buffer.byteLength(file)); }); + it('should return the correct bytes of chunked stream with eagerly flushed headers', async () => { + const app = express(); + const file = readFileSync(join(__dirname, 'bitcoin.pdf')); + + app.get('/', (_, res) => { + const readable = createReadStream(join(__dirname, 'bitcoin.pdf')); + + res.statusCode = 200; + res.setHeader('content-type', 'application/pdf'); + res.flushHeaders(); + readable.pipe(res); + }); + + const expressFramework = new ExpressFramework(); + + const handler = awsStreamHandler.getHandler( + app, + expressFramework, + adapters, + resolver, + binarySettings, + respondWithErrors, + logger, + ); + + const event = createApiGatewayV2('GET', '/', {}, { test: 'true' }); + const context = { test: Symbol('unique') }; + + const writable = new WritableMock(); + + await handler(event, writable, context); + + expect(getCurrentInvoke()).toHaveProperty('event', event); + expect(getCurrentInvoke()).toHaveProperty('context', context); + + const finalBuffer = Buffer.concat(writable.data); + + expect(Buffer.byteLength(finalBuffer)).toBe(Buffer.byteLength(file)); + }); + it('should return the correct bytes of json', async () => { const app = express();