Skip to content

Commit

Permalink
fix(fetch): Fix memory leak when handling endless streaming (#13809)
Browse files Browse the repository at this point in the history
Co-authored-by: Luca Forstner <[email protected]>
  • Loading branch information
soapproject and lforst authored Oct 3, 2024
1 parent febdfc6 commit 0b739c5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa
await fetchButton.click();

const rootSpan = await transactionPromise;
console.log(JSON.stringify(rootSpan, null, 2));
const sseFetchCall = rootSpan.spans?.filter(span => span.description === 'sse fetch call')[0] as SpanJSON;
const httpGet = rootSpan.spans?.filter(span => span.description === 'GET http://localhost:8080/sse')[0] as SpanJSON;

Expand All @@ -71,7 +70,7 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa
expect(consoleBreadcrumb?.message).toBe('Could not fetch sse AbortError: BodyStreamBuffer was aborted');
});

test('Aborts when stream takes longer than 5s', async ({ page }) => {
test('Aborts when stream takes longer than 5s, by not updating the span duration', async ({ page }) => {
await page.goto('/sse');

const transactionPromise = waitForTransaction('react-router-6', async transactionEvent => {
Expand Down Expand Up @@ -102,5 +101,5 @@ test('Aborts when stream takes longer than 5s', async ({ page }) => {
const resolveBodyDuration = Math.round((httpGet.timestamp as number) - httpGet.start_timestamp);

expect(resolveDuration).toBe(0);
expect(resolveBodyDuration).toBe(7);
expect(resolveBodyDuration).toBe(0);
});
74 changes: 46 additions & 28 deletions packages/utils/src/instrument/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,40 +116,57 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat
}

async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise<void> {
if (res && res.body && res.body.getReader) {
const responseReader = res.body.getReader();

// eslint-disable-next-line no-inner-declarations
async function consumeChunks({ done }: { done: boolean }): Promise<void> {
if (!done) {
try {
// abort reading if read op takes more than 5s
const result = await Promise.race([
responseReader.read(),
new Promise<{ done: boolean }>(res => {
setTimeout(() => {
res({ done: true });
}, 5000);
}),
]);
await consumeChunks(result);
} catch (error) {
// handle error if needed
if (res && res.body) {
const body = res.body;
const responseReader = body.getReader();

// Define a maximum duration after which we just cancel
const maxFetchDurationTimeout = setTimeout(
() => {
body.cancel().then(null, () => {
// noop
});
},
90 * 1000, // 90s
);

let readingActive = true;
while (readingActive) {
let chunkTimeout;
try {
// abort reading if read op takes more than 5s
chunkTimeout = setTimeout(() => {
body.cancel().then(null, () => {
// noop on error
});
}, 5000);

// This .read() call will reject/throw when we abort due to timeouts through `body.cancel()`
const { done } = await responseReader.read();

clearTimeout(chunkTimeout);

if (done) {
onFinishedResolving();
readingActive = false;
}
} else {
return Promise.resolve();
} catch (error) {
readingActive = false;
} finally {
clearTimeout(chunkTimeout);
}
}

return responseReader
.read()
.then(consumeChunks)
.then(onFinishedResolving)
.catch(() => undefined);
clearTimeout(maxFetchDurationTimeout);

responseReader.releaseLock();
body.cancel().then(null, () => {
// noop on error
});
}
}

async function streamHandler(response: Response): Promise<void> {
function streamHandler(response: Response): void {
// clone response for awaiting stream
let clonedResponseForResolving: Response;
try {
Expand All @@ -158,7 +175,8 @@ async function streamHandler(response: Response): Promise<void> {
return;
}

await resolveResponse(clonedResponseForResolving, () => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
resolveResponse(clonedResponseForResolving, () => {
triggerHandlers('fetch-body-resolved', {
endTimestamp: timestampInSeconds() * 1000,
response,
Expand Down

0 comments on commit 0b739c5

Please sign in to comment.