Messages sent to instances of the Observer
object follow the following grammar:
onNext* (onCompleted | onError)?
This grammar allows observable sequences to send any amount (0 or more) of onNext
messages to the subscribed observer instance, optionally followed by a single success (onCompleted
) or failure (onError
) message.
The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.
A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.
var count = 0;
xs.subscribe(
() => count++,
err => console.log('Error: %s', err.message),
() => console.log('OnNext has been called %d times', count)
);
In this sample we safely assume that the total amount of calls to the OnNext method won’t change once the OnCompleted method is called as the observable sequence follows the Rx grammar.
Ignore this guideline only when working with a non-conforming implementation of the Observable object.
Paragraph 3.1 states that no more messages should arrive after an onError
or onCompleted
message. This makes it possible to cleanup any resource used by the subscription the moment an onError
or onCompleted
arrives. Cleaning up resources immediately will make sure that any side-effect occurs in a predictable fashion. It also makes sure that the runtime can reclaim these resources.
var fs = require('fs');
var Rx = require('rx');
function appendAsync(fd, buffer) { /* impl */ }
function openFile(path, flags) {
var fd = fs.openSync(path, flags);
return Rx.Disposable.create(() => fs.closeSync(fd));
}
Rx.Observable.
using(
() => openFile('temp.txt', 'w+'),
fd => Rx.Observable.range(0, 10000).map(v => Buffer(v)).flatMap(buffer => appendAsync(fd, buffer))
).subscribe();
In this sample the using
operator creates a resource that will be disposed upon unsubscription. The Rx contract for cleanup ensures that unsubscription will be called automatically once an onError
or onCompleted
message is sent.
There are currently no known cases where to ignore this guideline.
When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.
Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signaled to any previously subscribed observer instances.
Observable.timer(2000).subscribe(...).dispose()
In this sample subscribing to the observable sequence generated by Timer will queue an action on the Scheduler.timeout
scheduler to send out an onNext
message in 2 seconds. The subscription then gets canceled immediately. As the scheduled action has not started yet, it will be removed from the scheduler.
Rx.Observable.startAsync(() => Q.delay(2000)).subscribe(...).dispose();
In this sample the startAsync
operator will immediately schedule the execution of the lambda provided as its argument. The subscription registers the observer instance as a listener to this execution. As the lambda is already running once the subscription is disposed, it will keep running and its return value is ignored.