Skip to content

Commit

Permalink
[expo] fix fetch abort error for streaming requests (expo#33577)
Browse files Browse the repository at this point in the history
# Why

i unregistered the abort event listener earlier before the request
finished.
fixes expo#33553

# How

unregister the abort event listener when native request is finished. i
introduced a new `readyForJSFinalization` event that emits from both
streaming and non-streaming requests.
  • Loading branch information
Kudo authored Dec 13, 2024
1 parent 92b4b84 commit 04c6ae2
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 34 deletions.
4 changes: 2 additions & 2 deletions apps/bare-expo/scripts/start-ios-e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ appId: dev.expo.Payments
- openLink: bareexpo://test-suite/run?tests=${testCase}
- extendedWaitUntil:
visible:
id: "test_suite_container"
timeout: 30000
id: "test_suite_text_results"
timeout: 120000
- assertVisible:
text: "Success!"
`);
Expand Down
60 changes: 60 additions & 0 deletions apps/test-suite/tests/Fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,66 @@ export function test({ describe, expect, it, ...t }) {
}
expect(error).not.toBeNull();
});

it('should abort streaming request', async () => {
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
let error: Error | null = null;
let hasReceivedChunk = false;
try {
const resp = await fetch('https://httpbin.test.k6.io/drip?numbytes=512&duration=60', {
signal: controller.signal,
headers: {
Accept: 'text/event-stream',
},
});
const reader = resp.body.getReader();
while (true) {
const { done } = await reader.read();
hasReceivedChunk = true;
if (done) {
break;
}
}
} catch (e: unknown) {
if (e instanceof Error) {
error = e;
}
}
expect(error).not.toBeNull();
expect(hasReceivedChunk).toBe(true);
});

// Same as the previous test but abort at 0ms,
// that to ensure the request is aborted before receiving any chunks.
it('should abort streaming request before receiving chunks', async () => {
const controller = new AbortController();
setTimeout(() => controller.abort(), 0);
let error: Error | null = null;
let hasReceivedChunk = false;
try {
const resp = await fetch('https://httpbin.test.k6.io/drip?numbytes=512&duration=60', {
signal: controller.signal,
headers: {
Accept: 'text/event-stream',
},
});
const reader = resp.body.getReader();
while (true) {
const { done } = await reader.read();
hasReceivedChunk = true;
if (done) {
break;
}
}
} catch (e: unknown) {
if (e instanceof Error) {
error = e;
}
}
expect(error).not.toBeNull();
expect(hasReceivedChunk).toBe(false);
});
});

describe('Streaming', () => {
Expand Down
1 change: 1 addition & 0 deletions packages/expo/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- Fix sending a blob as fetch body not setting correct content-type. ([#33405](https://github.com/expo/expo/pull/33405) by [@aleqsio](https://github.com/aleqsio))
- Use nullish assignment operator to assign entries in FormData. ([#33445](https://github.com/expo/expo/pull/33445) by [@j-piasecki](https://github.com/j-piasecki))
- Fixed streaming requests with `AbortController` doesn't work on `expo/fetch`. ([#33577](https://github.com/expo/expo/pull/33577) by [@kudo](https://github.com/kudo))

### 💡 Others

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
}

fun emitRequestCancelled() {
error = FetchRequestCancelledException()
val error = FetchRequestCancelledException()
this.error = error
if (state == ResponseState.BODY_STREAMING_STARTED) {
emit("didFailWithError", error)
}
state = ResponseState.ERROR_RECEIVED
emit("readyForJSFinalization")
}

fun waitForStates(states: List<ResponseState>, callback: (ResponseState) -> Unit) {
Expand Down Expand Up @@ -108,6 +113,7 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
}
error = e
state = ResponseState.ERROR_RECEIVED
emit("readyForJSFinalization")
}

override fun onResponse(call: Call, response: Response) {
Expand All @@ -123,6 +129,7 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
emit("didComplete")
}
this@NativeResponse.state = ResponseState.BODY_COMPLETED
emit("readyForJSFinalization")
}
}

Expand Down Expand Up @@ -158,22 +165,30 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
}

private fun pumpResponseBodyStream(stream: BufferedSource) {
while (!stream.exhausted()) {
if (isInvalidState(
ResponseState.RESPONSE_RECEIVED,
ResponseState.BODY_STREAMING_STARTED,
ResponseState.BODY_STREAMING_CANCELLED
)
) {
break
try {
while (!stream.exhausted()) {
if (isInvalidState(
ResponseState.RESPONSE_RECEIVED,
ResponseState.BODY_STREAMING_STARTED,
ResponseState.BODY_STREAMING_CANCELLED
)
) {
break
}
if (state == ResponseState.RESPONSE_RECEIVED) {
sink.appendBufferBody(stream.buffer.readByteArray())
} else if (state == ResponseState.BODY_STREAMING_STARTED) {
emit("didReceiveResponseData", stream.buffer.readByteArray())
} else {
break
}
}
if (state == ResponseState.RESPONSE_RECEIVED) {
sink.appendBufferBody(stream.buffer.readByteArray())
} else if (state == ResponseState.BODY_STREAMING_STARTED) {
emit("didReceiveResponseData", stream.buffer.readByteArray())
} else {
break
} catch (e: IOException) {
this.error = e
if (state == ResponseState.BODY_STREAMING_STARTED) {
emit("didFailWithError", e)
}
state = ResponseState.ERROR_RECEIVED
}
}

Expand Down
5 changes: 4 additions & 1 deletion packages/expo/build/winter/fetch/FetchResponse.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/expo/build/winter/fetch/FetchResponse.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/expo/build/winter/fetch/NativeRequest.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/expo/build/winter/fetch/NativeRequest.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/expo/build/winter/fetch/fetch.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion packages/expo/ios/Fetch/NativeResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ internal final class NativeResponse: SharedObject, ExpoURLSessionTaskDelegate {
}

func emitRequestCanceled() {
error = FetchRequestCanceledException()
let error = FetchRequestCanceledException()
self.error = error
if state == .bodyStreamingStarted {
emit(event: "didFailWithError", arguments: error.localizedDescription)
}
state = .errorReceived
emit(event: "readyForJSFinalization")
}

/**
Expand Down Expand Up @@ -197,5 +202,7 @@ internal final class NativeResponse: SharedObject, ExpoURLSessionTaskDelegate {
} else {
state = .bodyCompleted
}

emit(event: "readyForJSFinalization")
}
}
18 changes: 13 additions & 5 deletions packages/expo/src/winter/fetch/FetchResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ import type { NativeResponse } from './NativeRequest';

const ConcreteNativeResponse = ExpoFetchModule.NativeResponse as typeof NativeResponse;

export type AbortSubscriptionCleanupFunction = () => void;

/**
* A response implementation for the `fetch.Response` API.
*/
export class FetchResponse extends ConcreteNativeResponse implements Response {
private streamingStarted = false;

constructor(private readonly abortCleanupFunction: AbortSubscriptionCleanupFunction) {
super();
this.addListener('readyForJSFinalization', this.finalize);
}

get body(): ReadableStream<Uint8Array> | null {
const response = this;
return new ReadableStream({
Expand All @@ -20,12 +27,10 @@ export class FetchResponse extends ConcreteNativeResponse implements Response {
});

response.addListener('didComplete', () => {
response.removeAllRegisteredListeners();
controller.close();
});

response.addListener('didFailWithError', (error: string) => {
response.removeAllRegisteredListeners();
controller.error(new Error(error));
});
},
Expand All @@ -36,7 +41,6 @@ export class FetchResponse extends ConcreteNativeResponse implements Response {
}
},
cancel(reason) {
response.removeAllRegisteredListeners();
response.cancelStreaming(String(reason));
},
});
Expand Down Expand Up @@ -91,9 +95,13 @@ export class FetchResponse extends ConcreteNativeResponse implements Response {
throw new Error('Not implemented');
}

private removeAllRegisteredListeners() {
private finalize = (): void => {
this.removeListener('readyForJSFinalization', this.finalize);

this.abortCleanupFunction();

this.removeAllListeners('didReceiveResponseData');
this.removeAllListeners('didComplete');
this.removeAllListeners('didFailWithError');
}
};
}
1 change: 1 addition & 0 deletions packages/expo/src/winter/fetch/NativeRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export type NativeResponseEvents = {
didReceiveResponseData(data: Uint8Array): void;
didComplete(): void;
didFailWithError(error: string): void;
readyForJSFinalization(): void;
};

export declare class NativeResponse extends SharedObject<NativeResponseEvents> {
Expand Down
28 changes: 21 additions & 7 deletions packages/expo/src/winter/fetch/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { ExpoFetchModule } from './ExpoFetchModule';
import { FetchError } from './FetchErrors';
import { FetchResponse } from './FetchResponse';
import { FetchResponse, type AbortSubscriptionCleanupFunction } from './FetchResponse';
import { NativeRequest, NativeRequestInit } from './NativeRequest';
import { normalizeBodyInitAsync, normalizeHeadersInit, overrideHeaders } from './RequestUtils';
import type { FetchRequestInit } from './fetch.types';

export async function fetch(url: string, init?: FetchRequestInit): Promise<FetchResponse> {
const response = new FetchResponse();
let abortSubscription: AbortSubscriptionCleanupFunction | null = null;

const response = new FetchResponse(() => {
abortSubscription?.();
});
const request = new ExpoFetchModule.NativeRequest(response) as NativeRequest;

let headers = normalizeHeadersInit(init?.headers);
Expand All @@ -25,10 +29,9 @@ export async function fetch(url: string, init?: FetchRequestInit): Promise<Fetch
if (init?.signal && init.signal.aborted) {
throw new FetchError('The operation was aborted.');
}
const abortHandler = () => {
abortSubscription = addAbortSignalListener(init?.signal, () => {
request.cancel();
};
init?.signal?.addEventListener('abort', abortHandler);
});
try {
await request.start(url, nativeRequestInit, requestBody);
} catch (e: unknown) {
Expand All @@ -37,8 +40,19 @@ export async function fetch(url: string, init?: FetchRequestInit): Promise<Fetch
} else {
throw new FetchError(String(e));
}
} finally {
init?.signal?.removeEventListener('abort', abortHandler);
}
return response;
}

/**
* A wrapper of `AbortSignal.addEventListener` that returns a cleanup function.
*/
function addAbortSignalListener(
signal: AbortSignal | undefined,
listener: Parameters<AbortSignal['addEventListener']>[1]
): AbortSubscriptionCleanupFunction {
signal?.addEventListener('abort', listener);
return () => {
signal?.removeEventListener('abort', listener);
};
}

0 comments on commit 04c6ae2

Please sign in to comment.