-
-
Notifications
You must be signed in to change notification settings - Fork 139
Migrating from RxJS
If you choose to use xstream but have an existing application written in RxJS, here are some hints that may help you convert the code.
RxJS | xstream |
---|---|
Cold by default | Hot only |
The biggest difference between RxJS and xstream is the cold/hot issue. When migrating, you will notice this by how you won't need to .share()
in xstream code. There is no .share()
in xstream because all streams are already "shared".
Sometimes, though, a chain of cold streams in RxJS won't "work" when converted to xstream. This happens specially if you have a Y-shaped dependency. For instance:
a$ -> b$ -> c$
and
a$ -> b$ -> d$
where all of these are cold. The most common case for this is where b$
is a state$
, returned from a model()
function. In RxJS, the entire chain is cold, so there are actually two separate executions of b$
, and that's why both c$
and d$
get incoming events.
In xstream, there would be just one shared execution of b$
, and if it sent out an initial value, only the first chain with c$
would see it, while d$
would miss it. This will happen if b$
has a .startWith()
or something similar, which emits an initial event synchronously. Usually this is solved by the equivalent of .shareReplay()
in xstream, which is called .remember()
. This operator returns a MemoryStream, which is like an RxJS ReplaySubject.
As a rule of thumb, if a stream represents a "value over time", you should make sure to apply .remember() to make it a MemoryStream. You can do this for every stream that acts like a "value over time" in your code. You don't need to wait for a bug to happen to only then apply remember()
. A "value over time" is different to an event stream because at any point in time you always expect some value to exist. An example of a "value over time" is a person's age, while an example of an event stream is a person's birthday events. Every living person has an age value at any point in time. However, there is no point in talking about "your current birthday", because these are just a stream of events that happen periodically every year. It's possible to convert from one to the other, though: age$ = birthday$.remember()
. A typical "value over time" in a Cycle.js app is state$
, so make sure these are defined with .remember()
in the end.
RxJS | xstream |
---|---|
.shareReplay(1) |
.remember() |
Those are the largest obstacles. Otherwise, the operator API in xstream is well compatible with RxJS. Compare these:
RxJS | xstream |
---|---|
.map(x => x * 10) |
.map(x => x * 10) |
.map(10) |
.mapTo(10) |
.filter(x => x === 1) |
.filter(x => x === 1) |
.take(1) |
.take(1) |
.last() |
.last() |
.startWith('init') |
.startWith('init') |
Observable.never() |
xs.never() |
Observable.empty() |
xs.empty() |
Observable.throw(err) |
xs.throw(err) |
Observable.of(1, 2, 3) |
xs.of(1, 2, 3) |
Observable.merge(a$, b$, c$) |
xs.merge(a$, b$, c$) |
Observable.fromPromise(p) |
xs.fromPromise(p) |
Some operators and methods, though, are slightly different or have different names:
RxJS | xstream |
---|---|
Observable.interval(1000) |
xs.periodic(1000) |
.subscribe(observer) |
.addListener(listener) |
subscription.unsubscribe() |
.removeListener(listener) |
.skip(3) |
.drop(3) |
.takeUntil(b$) |
.endWhen(b$) |
.catch(fn) |
.replaceError(fn) |
.do(fn) |
.debug(fn) |
.let(fn) |
.compose(fn) |
.scan(fn, seed) |
.fold(fn, seed) |
Observable.combineLatest |
xs.combine |
switch |
flatten |
mergeAll |
flattenConcurrently |
concatAll |
flattenSequentially |
Subject onNext
|
shamefullySendNext |
Subject onError
|
shamefullySendError |
Subject onComplete
|
shamefullySendComplete |
It's very important to note the difference between scan
and fold
is not just naming. xstream fold
has startWith(seed)
embedded internally. So xstream a$.fold((acc, x) => acc + x, 0)
is equivalent to RxJS a$.startWith(0).scan((acc, x) => acc + x)
. We noticed that in most cases where RxJS scan was used in Cycle.js apps, it was preceded by startWith
, so we built fold
so that it has both together. If you don't want the seed value emitted initially, then just apply .drop(1)
after fold
.
RxJS | xstream |
---|---|
a$.startWith(0).scan((acc, x) => acc + x) |
a$.fold((acc, x) => acc + x, 0) |
combineLatest in xstream is a bit different. It's called combine
, and only takes streams as arguments. The output of combine is an array of values, so it usually requires a map
operation after combine
to take the array of values and apply a transformation. It's usually a good idea to use ES2015 array destructuring on the parameter of the transformation function. E.g. .map(([a,b]) => a+b)
not .map(arr => arr[0] + arr[1])
.
RxJS | xstream |
---|---|
Observable.combineLatest(a$, b$, (a,b) => a+b)) |
xs.combine(a$, b$).map(([a,b]) => a+b) |
Also important to note that xstream has no flatMap
nor flatMapLatest
/switchMap
, but instead you should apply two operators: map
+ flatten
or map
+ flattenConcurrently
:
// RxJS
var b$ = a$.flatMap(x =>
Observable.of(x+1, x+2)
);
// xstream
var b$ = a$.map(x =>
xs.of(x+1, x+2)
).compose(flattenConcurrently);
// RxJS
var b$ = a$.flatMapLatest(x =>
Observable.of(x+1, x+2)
);
// xstream
var b$ = a$.map(x =>
xs.of(x+1, x+2)
).flatten();
Pay careful attention to the difference in naming:
RxJS | xstream |
---|---|
flatMapLatest | map + flatten |
flatMap | map + flattenConcurrently |
concatMap | map + flattenSequentially |
If you were using the Proxy Subject technique in Cycle.js for building circularly dependent Observables, xstream makes that easier with imitate()
, built in the library specifically for circularly dependent streams:
-var proxy$ = new Rx.Subject();
+var proxy$ = xs.create();
var childSinks = Child({DOM: sources.DOM, foo: proxy$});
-childSinks.actions.subscribe(proxy$);
+proxy$.imitate(childSinks.actions);