Skip to content

Commit

Permalink
DOM: Implement the first() Observable operator
Browse files Browse the repository at this point in the history
See WICG/observable#131.

For WPTs:
Co-authored-by: [email protected]

[email protected]

Bug: 40282760
Change-Id: I81a2b482dcfdbc149d60ddbc5b98ddadb6dafc89
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5412443
Reviewed-by: Joey Arhar <[email protected]>
Commit-Queue: Dominic Farolino <[email protected]>
Cr-Commit-Position: refs/heads/main@{#1281639}
  • Loading branch information
domfarolino authored and Chromium LUCI CQ committed Apr 3, 2024
1 parent d53336d commit 4b50763
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 0 deletions.
98 changes: 98 additions & 0 deletions third_party/blink/renderer/core/dom/observable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,56 @@ class ToArrayInternalObserver final : public ObservableInternalObserver {
Member<AbortSignal::AlgorithmHandle> abort_algorithm_handle_;
};

// This is the internal observer associated with the `first()` operator. See
// https://wicg.github.io/observable/#dom-observable-first for its definition
// and spec prose quoted below.
class OperatorFirstInternalObserver final : public ObservableInternalObserver {
public:
OperatorFirstInternalObserver(ScriptPromiseResolver<IDLAny>* resolver,
AbortController* controller,
AbortSignal::AlgorithmHandle* handle)
: resolver_(resolver),
controller_(controller),
abort_algorithm_handle_(handle) {}

void Next(ScriptValue value) override {
abort_algorithm_handle_.Clear();

// "Resolve p with the passed in value."
resolver_->Resolve(value);
// "Signal abort controller".
controller_->abort(resolver_->GetScriptState());
}
void Error(ScriptState* script_state, ScriptValue error_value) override {
abort_algorithm_handle_.Clear();

// "Reject p with the passed in error."
resolver_->Reject(error_value);
}
void Complete() override {
abort_algorithm_handle_.Clear();

// "Reject p with a new RangeError."
v8::Isolate* isolate = resolver_->GetScriptState()->GetIsolate();
resolver_->Reject(
ScriptValue(isolate, V8ThrowException::CreateRangeError(
isolate, "No values in Observable")));
}

void Trace(Visitor* visitor) const override {
ObservableInternalObserver::Trace(visitor);

visitor->Trace(resolver_);
visitor->Trace(controller_);
visitor->Trace(abort_algorithm_handle_);
}

private:
Member<ScriptPromiseResolver<IDLAny>> resolver_;
Member<AbortController> controller_;
Member<AbortSignal::AlgorithmHandle> abort_algorithm_handle_;
};

class OperatorForEachInternalObserver final
: public ObservableInternalObserver {
public:
Expand Down Expand Up @@ -1612,6 +1662,54 @@ ScriptPromise<IDLUndefined> Observable::forEach(ScriptState* script_state,
return promise;
}

ScriptPromise<IDLAny> Observable::first(ScriptState* script_state,
SubscribeOptions* options) {
ScriptPromiseResolver<IDLAny>* resolver =
MakeGarbageCollected<ScriptPromiseResolver<IDLAny>>(script_state);
ScriptPromise<IDLAny> promise = resolver->Promise();

AbortController* controller = AbortController::Create(script_state);
HeapVector<Member<AbortSignal>> signals;

// The internal observer associated with this operator must have the ability
// to unsubscribe from `this`. This happens in the internal observer's
// `next()` handler, when the first value is emitted.
//
// This means we have to maintain a separate, internal `AbortController` that
// will abort the subscription. Consequently, this means we have to subscribe
// with an internal `SubscribeOptions`, whose signal is always present, and is
// a composite signal derived from:
// 1. The aforementioned controller.
signals.push_back(controller->signal());
// 2. The given `options`'s signal, if present.
if (options->hasSignal()) {
signals.push_back(options->signal());
}

SubscribeOptions* internal_options = MakeGarbageCollected<SubscribeOptions>();
internal_options->setSignal(
MakeGarbageCollected<AbortSignal>(script_state, signals));

if (internal_options->signal()->aborted()) {
resolver->Reject(options->signal()->reason(script_state));
return promise;
}

AbortSignal::AlgorithmHandle* algorithm_handle =
internal_options->signal()->AddAlgorithm(
MakeGarbageCollected<RejectPromiseAbortAlgorithm>(
resolver, internal_options->signal()));

OperatorFirstInternalObserver* internal_observer =
MakeGarbageCollected<OperatorFirstInternalObserver>(resolver, controller,
algorithm_handle);

SubscribeInternal(script_state, /*observer_union=*/nullptr, internal_observer,
internal_options);

return promise;
}

void Observable::Trace(Visitor* visitor) const {
visitor->Trace(subscribe_callback_);
visitor->Trace(subscribe_delegate_);
Expand Down
1 change: 1 addition & 0 deletions third_party/blink/renderer/core/dom/observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class CORE_EXPORT Observable final : public ScriptWrappable,
ScriptPromise<IDLUndefined> forEach(ScriptState*,
V8Visitor*,
SubscribeOptions*);
ScriptPromise<IDLAny> first(ScriptState*, SubscribeOptions*);

void Trace(Visitor*) const override;

Expand Down
1 change: 1 addition & 0 deletions third_party/blink/renderer/core/dom/observable.idl
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ interface Observable {
// See https://wicg.github.io/observable/#promise-returning-operators.
[CallWith=ScriptState] Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
[CallWith=ScriptState] Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
[CallWith=ScriptState] Promise<any> first(optional SubscribeOptions options = {});
};

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
promise_test(async () => {
const results = [];

const source = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('teardown'));
subscriber.next(1);
results.push(subscriber.active ? 'active' : 'inactive');
results.push(subscriber.signal.aborted ? 'aborted' : 'not aborted')

// Ignored.
subscriber.next(2);
subscriber.complete();
});

const value = await source.first();

assert_array_equals(results, ['teardown', 'inactive', 'aborted']);
assert_equals(value, 1,
"Promise resolves with the first value from the source Observable");
}, "first(): Promise resolves with the first value from the source Observable");

promise_test(async () => {
const error = new Error("error from source");
const source = new Observable(subscriber => {
subscriber.error(error);
});

let rejection;
try {
await source.first();
} catch (e) {
rejection = e;
}

assert_equals(rejection, error, "Promise rejects with source Observable error");
}, "first(): Promise rejects with the error emitted from the source Observable");

promise_test(async () => {
const source = new Observable(subscriber => {
subscriber.complete();
});

let rejection;
try {
await source.first();
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof RangeError,
"Upon complete(), first() Promise rejects with RangeError");
assert_equals(rejection.message, "No values in Observable");
}, "first(): Promise rejects with RangeError when source Observable " +
"completes without emitting any values");

promise_test(async () => {
const source = new Observable(subscriber => {});

const controller = new AbortController();
const promise = source.first({ signal: controller.signal });

controller.abort();

let rejection;
try {
await promise;
} catch (e) {
rejection = e;
}

assert_true(rejection instanceof DOMException,
"Promise rejects with a DOMException for abortion");
assert_equals(rejection.name, "AbortError",
"Rejected with 'AbortError' DOMException");
assert_equals(rejection.message, "signal is aborted without reason");
}, "first(): Aborting a signal rejects the Promise with an AbortError DOMException");

promise_test(async () => {
const results = [];

const source = new Observable(subscriber => {
results.push("source subscribe");
subscriber.addTeardown(() => results.push("source teardown"));
subscriber.signal.addEventListener("abort", () => results.push("source abort"));
results.push("before source next 1");
subscriber.next(1);
results.push("after source next 1");
});

results.push("calling first");
const promise = source.first();

assert_array_equals(results, [
"calling first",
"source subscribe",
"before source next 1",
"source teardown",
"source abort",
"after source next 1"
], "Array values after first() is called");

const firstValue = await promise;
results.push(`first resolved with: ${firstValue}`);

assert_array_equals(results, [
"calling first",
"source subscribe",
"before source next 1",
"source teardown",
"source abort",
"after source next 1",
"first resolved with: 1",
], "Array values after Promise is awaited");
}, "first(): Lifecycle");
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ interface Observable
method constructor
method drop
method filter
method first
method flatMap
method forEach
method map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] method constructor
[Worker] method drop
[Worker] method filter
[Worker] method first
[Worker] method flatMap
[Worker] method forEach
[Worker] method map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6833,6 +6833,7 @@ interface Observable
method constructor
method drop
method filter
method first
method flatMap
method forEach
method map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ Starting worker: resources/global-interface-listing-worker.js
[Worker] method constructor
[Worker] method drop
[Worker] method filter
[Worker] method first
[Worker] method flatMap
[Worker] method forEach
[Worker] method map
Expand Down

0 comments on commit 4b50763

Please sign in to comment.