Skip to content

Commit

Permalink
Implemented interop variants + test
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 19, 2016
1 parent fc34fdd commit eb3aefc
Show file tree
Hide file tree
Showing 10 changed files with 1,337 additions and 1 deletion.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,29 @@ dependencies {
Maven search:

[http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.akarnokd%22)

# Usage

```java
import static hu.akarnokd.rxjava.interop.RxJavaInterop.*;

// convert from 1.x to 2.x

io.reactivex.Flowable f2 = RxJavaInterop.toV2Flowable(rx.Observable);

io.reactivex.Observable o2 = RxJavaInterop.toV2Observabe(rx.Observable);

io.reactive.Single s2 = RxJavaInterop.toV2Single(rx.Single);

io.reactivex.Completable c2 = RxJavaInterop.toV2Completable(rx.Completable);

// convert from 2.x to 1.x

rx.Observable o1 = RxJavaInterop.toV1Observable(Publisher);

rx.Observable q1 = RxJavaInterop.toV1Observable(ObservableSource, BackpressureStrategy);

rx.Single s1 = RxJavaInterop.toV1Single(SingleSource);

rx.Completable c1 = RxJavaInterop.toV1Completable(CompletableSource);
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava.interop;

/**
* Convert a V1 Completable into a V2 Completable, composing cancellation.
*/
final class CompletableV1ToCompletableV2 extends io.reactivex.Completable {

final rx.Completable source;

public CompletableV1ToCompletableV2(rx.Completable source) {
this.source = source;
}

@Override
protected void subscribeActual(io.reactivex.CompletableObserver observer) {
source.subscribe(new SourceCompletableSubscriber(observer));
}

static final class SourceCompletableSubscriber
implements rx.Completable.CompletableSubscriber, io.reactivex.disposables.Disposable {

final io.reactivex.CompletableObserver observer;

rx.Subscription s;

public SourceCompletableSubscriber(io.reactivex.CompletableObserver observer) {
this.observer = observer;
}

@Override
public void onSubscribe(rx.Subscription d) {
this.s = d;
observer.onSubscribe(this);
}

@Override
public void onCompleted() {
observer.onComplete();
}

@Override
public void onError(Throwable error) {
observer.onError(error);
}

@Override
public void dispose() {
s.unsubscribe();
}

@Override
public boolean isDisposed() {
return s.isUnsubscribed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava.interop;

/**
* Convert a V2 Completable into a V1 Completable, composing cancellation.
*/
final class CompletableV2ToCompletableV1 implements rx.Completable.CompletableOnSubscribe {

final io.reactivex.CompletableSource source;

public CompletableV2ToCompletableV1(io.reactivex.CompletableSource source) {
this.source = source;
}

@Override
public void call(rx.Completable.CompletableSubscriber observer) {
source.subscribe(new SourceCompletableSubscriber(observer));
}

static final class SourceCompletableSubscriber
implements io.reactivex.CompletableObserver, rx.Subscription {

final rx.Completable.CompletableSubscriber observer;

io.reactivex.disposables.Disposable d;

public SourceCompletableSubscriber(rx.Completable.CompletableSubscriber observer) {
this.observer = observer;
}

@Override
public void onSubscribe(io.reactivex.disposables.Disposable d) {
this.d = d;
observer.onSubscribe(this);
}

@Override
public void onComplete() {
observer.onCompleted();
}

@Override
public void onError(Throwable error) {
observer.onError(error);
}

@Override
public void unsubscribe() {
d.dispose();
}

@Override
public boolean isUnsubscribed() {
return d.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hu.akarnokd.rxjava.interop;

import java.util.concurrent.atomic.*;

/**
* Convert a V2 Flowable into a V1 Observable, composing backpressure and cancellation.
*
* @param <T> the value type
*/
final class FlowableV2ToObservableV1<T> implements rx.Observable.OnSubscribe<T> {

final org.reactivestreams.Publisher<T> source;

public FlowableV2ToObservableV1(org.reactivestreams.Publisher<T> source) {
this.source = source;
}

@Override
public void call(rx.Subscriber<? super T> t) {
SourceSubscriber<T> parent = new SourceSubscriber<T>(t);

t.add(parent);
t.setProducer(parent);

source.subscribe(parent);
}

static final class SourceSubscriber<T>
extends AtomicReference<org.reactivestreams.Subscription>
implements org.reactivestreams.Subscriber<T>, rx.Subscription, rx.Producer {

/** */
private static final long serialVersionUID = -6567012932544037069L;

final rx.Subscriber<? super T> actual;

final AtomicLong requested;

public SourceSubscriber(rx.Subscriber<? super T> actual) {
this.actual = actual;
this.requested = new AtomicLong();
}

@Override
public void request(long n) {
io.reactivex.internal.subscriptions.SubscriptionHelper.deferredRequest(this, requested, n);
}

@Override
public void unsubscribe() {
io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(this);
}

@Override
public boolean isUnsubscribed() {
return io.reactivex.internal.subscriptions.SubscriptionHelper.isCancelled(get());
}

@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
io.reactivex.internal.subscriptions.SubscriptionHelper.deferredSetOnce(this, requested, s);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onCompleted();
}
}
}
109 changes: 109 additions & 0 deletions src/main/java/hu/akarnokd/rxjava/interop/ObservableV1ToFlowableV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava.interop;

/**
* Convert a V1 Observable into a V2 Flowable, composing backpressure and cancellation.
*
* @param <T> the value type
*/
final class ObservableV1ToFlowableV2<T> extends io.reactivex.Flowable<T> {

final rx.Observable<T> source;

public ObservableV1ToFlowableV2(rx.Observable<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
ObservableSubscriber<T> parent = new ObservableSubscriber<T>(s);
ObservableSubscriberSubscription parentSubscription = new ObservableSubscriberSubscription(parent);
s.onSubscribe(parentSubscription);

source.unsafeSubscribe(parent);
}

static final class ObservableSubscriber<T> extends rx.Subscriber<T> {

final org.reactivestreams.Subscriber<? super T> actual;

boolean done;

public ObservableSubscriber(org.reactivestreams.Subscriber<? super T> actual) {
this.actual = actual;
this.request(0L); // suppress starting out with Long.MAX_VALUE
}

@Override
public void onNext(T t) {
if (done) {
return;
}
if (t == null) {
unsubscribe();
onError(new NullPointerException(
"The upstream 1.x Observable signalled a null value which is not supported in 2.x"));
} else {
actual.onNext(t);
}
}

@Override
public void onError(Throwable e) {
if (done) {
io.reactivex.plugins.RxJavaPlugins.onError(e);
return;
}
done = true;
actual.onError(e);
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

void requestMore(long n) {
request(n);
}
}

static final class ObservableSubscriberSubscription implements org.reactivestreams.Subscription {

final ObservableSubscriber<?> parent;

public ObservableSubscriberSubscription(ObservableSubscriber<?> parent) {
this.parent = parent;
}

@Override
public void request(long n) {
parent.requestMore(n);
}

@Override
public void cancel() {
parent.unsubscribe();
}
}

}
Loading

0 comments on commit eb3aefc

Please sign in to comment.