Skip to content

Commit

Permalink
Merge pull request #261 from florian-g2/main
Browse files Browse the repository at this point in the history
Improve the identification of chunks in streaming responses (Fixes #260)
  • Loading branch information
H4ad authored Sep 9, 2024
2 parents 5022260 + d39db53 commit 8f3706e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 23 deletions.
70 changes: 47 additions & 23 deletions src/network/response-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import type { BothValueHeaders } from '../@types';
import { type ILogger, NO_OP, parseHeaders } from '../core';
import { getString } from './utils';

// header or data crlf
const crlfBuffer = Buffer.from('\r\n');

const endChunked = '0\r\n\r\n';
const headerEnd = '\r\n\r\n';
const endStatusSeparator = '\r\n';
Expand Down Expand Up @@ -55,11 +58,8 @@ export class ServerlessStreamResponse extends ServerResponse {
this.chunkedEncoding = true;

let internalWritable: Writable | null = null;
let isFirstCall = true;
// this ignore is used because I need to ignore these write calls:
// https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L934-L935
// https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L937
let writesToIgnore = 0;
let firstCrlfBufferEncountered = false;
let chunkEncountered = false;

const socket: Partial<Socket> & { _writableState: any } = {
_writableState: {},
Expand All @@ -86,23 +86,7 @@ export class ServerlessStreamResponse extends ServerResponse {
encoding,
}));

if (!isFirstCall && internalWritable) {
if (data === endChunked) {
internalWritable.end(cb);

return true;
}

if (writesToIgnore > 0) {
writesToIgnore--;
return true;
}

internalWritable.write(data, cb);
writesToIgnore = 3;
} else if (isFirstCall) {
isFirstCall = false;

if (!internalWritable) {
const stringData = getString(data);
const endStatusIndex = stringData.indexOf(endStatusSeparator);
const status = +stringData.slice(0, endStatusIndex).split(' ')[1];
Expand All @@ -120,14 +104,54 @@ export class ServerlessStreamResponse extends ServerResponse {
}),
);

writesToIgnore = 1;
internalWritable = onReceiveHeaders(status, headers);

// If we get an endChunked right after header which means the response body is empty, we need to immediately end the writable
if (stringData.substring(endHeaderIndex + 4) === endChunked)
internalWritable.end();

return true;
}

// node sends the last chunk crlf as a string:
// https://github.com/nodejs/node/blob/v22.8.0/lib/_http_outgoing.js#L1131
if (data === endChunked) {
internalWritable.end(cb);
return true;
}

// check for header or data crlf
// node sends the header and data crlf as a buffer
// below code is aligned to following node implementation of the HTTP/1.1 chunked transfer coding:
// https://github.com/nodejs/node/blob/v22.8.0/lib/_http_outgoing.js#L1012-L1015
// for reference: https://datatracker.ietf.org/doc/html/rfc9112#section-7
if (Buffer.isBuffer(data) && crlfBuffer.equals(data)) {
const isHeaderCrlf = !firstCrlfBufferEncountered;
if (isHeaderCrlf) {
firstCrlfBufferEncountered = true;
return true;
}

const isDataCrlf = firstCrlfBufferEncountered && chunkEncountered;
if (isDataCrlf) {
// done with chunk
firstCrlfBufferEncountered = false;
chunkEncountered = false;
return true;
}

// the crlf *is* the chunk
}

const isContentLength = !firstCrlfBufferEncountered;
if (isContentLength) {
// discard content length
return true;
}

// write chunk
chunkEncountered = true;
internalWritable.write(data, cb);
return true;
},
};
Expand Down
40 changes: 40 additions & 0 deletions test/handlers/aws-stream.handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,46 @@ describe('AwsStreamHandler', () => {
expect(Buffer.byteLength(finalBuffer)).toBe(Buffer.byteLength(file));
});

it('should return the correct bytes of chunked stream with eagerly flushed headers', async () => {
const app = express();
const file = readFileSync(join(__dirname, 'bitcoin.pdf'));

app.get('/', (_, res) => {
const readable = createReadStream(join(__dirname, 'bitcoin.pdf'));

res.statusCode = 200;
res.setHeader('content-type', 'application/pdf');
res.flushHeaders();
readable.pipe(res);
});

const expressFramework = new ExpressFramework();

const handler = awsStreamHandler.getHandler(
app,
expressFramework,
adapters,
resolver,
binarySettings,
respondWithErrors,
logger,
);

const event = createApiGatewayV2('GET', '/', {}, { test: 'true' });
const context = { test: Symbol('unique') };

const writable = new WritableMock();

await handler(event, writable, context);

expect(getCurrentInvoke()).toHaveProperty('event', event);
expect(getCurrentInvoke()).toHaveProperty('context', context);

const finalBuffer = Buffer.concat(writable.data);

expect(Buffer.byteLength(finalBuffer)).toBe(Buffer.byteLength(file));
});

it('should return the correct bytes of json', async () => {
const app = express();

Expand Down

0 comments on commit 8f3706e

Please sign in to comment.