From 0b739c5abcbbd5ea00c4a8f7b21a647fb42ad4bc Mon Sep 17 00:00:00 2001 From: Ivan Chou Date: Thu, 3 Oct 2024 15:44:32 +0800 Subject: [PATCH] fix(fetch): Fix memory leak when handling endless streaming (#13809) Co-authored-by: Luca Forstner --- .../react-router-6/tests/sse.test.ts | 5 +- packages/utils/src/instrument/fetch.ts | 74 ++++++++++++------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts b/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts index 92c06543c0b8..942e67ca4551 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts +++ b/dev-packages/e2e-tests/test-applications/react-router-6/tests/sse.test.ts @@ -45,7 +45,6 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa await fetchButton.click(); const rootSpan = await transactionPromise; - console.log(JSON.stringify(rootSpan, null, 2)); const sseFetchCall = rootSpan.spans?.filter(span => span.description === 'sse fetch call')[0] as SpanJSON; const httpGet = rootSpan.spans?.filter(span => span.description === 'GET http://localhost:8080/sse')[0] as SpanJSON; @@ -71,7 +70,7 @@ test('Waits for sse streaming when sse has been explicitly aborted', async ({ pa expect(consoleBreadcrumb?.message).toBe('Could not fetch sse AbortError: BodyStreamBuffer was aborted'); }); -test('Aborts when stream takes longer than 5s', async ({ page }) => { +test('Aborts when stream takes longer than 5s, by not updating the span duration', async ({ page }) => { await page.goto('/sse'); const transactionPromise = waitForTransaction('react-router-6', async transactionEvent => { @@ -102,5 +101,5 @@ test('Aborts when stream takes longer than 5s', async ({ page }) => { const resolveBodyDuration = Math.round((httpGet.timestamp as number) - httpGet.start_timestamp); expect(resolveDuration).toBe(0); - expect(resolveBodyDuration).toBe(7); + expect(resolveBodyDuration).toBe(0); }); diff --git a/packages/utils/src/instrument/fetch.ts b/packages/utils/src/instrument/fetch.ts index a161b8db79bb..ad28edf81e3f 100644 --- a/packages/utils/src/instrument/fetch.ts +++ b/packages/utils/src/instrument/fetch.ts @@ -116,40 +116,57 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat } async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise { - if (res && res.body && res.body.getReader) { - const responseReader = res.body.getReader(); - - // eslint-disable-next-line no-inner-declarations - async function consumeChunks({ done }: { done: boolean }): Promise { - if (!done) { - try { - // abort reading if read op takes more than 5s - const result = await Promise.race([ - responseReader.read(), - new Promise<{ done: boolean }>(res => { - setTimeout(() => { - res({ done: true }); - }, 5000); - }), - ]); - await consumeChunks(result); - } catch (error) { - // handle error if needed + if (res && res.body) { + const body = res.body; + const responseReader = body.getReader(); + + // Define a maximum duration after which we just cancel + const maxFetchDurationTimeout = setTimeout( + () => { + body.cancel().then(null, () => { + // noop + }); + }, + 90 * 1000, // 90s + ); + + let readingActive = true; + while (readingActive) { + let chunkTimeout; + try { + // abort reading if read op takes more than 5s + chunkTimeout = setTimeout(() => { + body.cancel().then(null, () => { + // noop on error + }); + }, 5000); + + // This .read() call will reject/throw when we abort due to timeouts through `body.cancel()` + const { done } = await responseReader.read(); + + clearTimeout(chunkTimeout); + + if (done) { + onFinishedResolving(); + readingActive = false; } - } else { - return Promise.resolve(); + } catch (error) { + readingActive = false; + } finally { + clearTimeout(chunkTimeout); } } - return responseReader - .read() - .then(consumeChunks) - .then(onFinishedResolving) - .catch(() => undefined); + clearTimeout(maxFetchDurationTimeout); + + responseReader.releaseLock(); + body.cancel().then(null, () => { + // noop on error + }); } } -async function streamHandler(response: Response): Promise { +function streamHandler(response: Response): void { // clone response for awaiting stream let clonedResponseForResolving: Response; try { @@ -158,7 +175,8 @@ async function streamHandler(response: Response): Promise { return; } - await resolveResponse(clonedResponseForResolving, () => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + resolveResponse(clonedResponseForResolving, () => { triggerHandlers('fetch-body-resolved', { endTimestamp: timestampInSeconds() * 1000, response,