diff --git a/README.md b/README.md index cfc809e..43ff4e3 100644 --- a/README.md +++ b/README.md @@ -19,3 +19,161 @@ dependencies { # Features + +## Convert between Flowables + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Flowable f2 = RxJavaBridge.toV2Flowable(io.reactivex.rxjava3.core.Flowable) + +io.reactivex.rxjava3.core.Flowable f3 = RxJavaBridge.toV3Flowable(io.reactivex.Flowable) +``` + +### via FlowableConverter application + +```java +f3 = f2.as(RxJavaBridge.toV3Flowable()) + +f2 = f3.to(RxJavaBridge.toV2Flowable()) +``` + +## Convert between Observables + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Observable o2 = RxJavaBridge.toV2Observable(io.reactivex.rxjava3.core.Observable) + +io.reactivex.rxjava3.core.Observable o3 = RxJavaBridge.toV3Observable(io.reactivex.Observable) +``` + +### via ObservableConverter application + +```java +o3 = o2.as(RxJavaBridge.toV3Observable()) + +o2 = o3.to(RxJavaBridge.toV2Observable()) +``` + +## Convert between Maybes + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Maybe m2 = RxJavaBridge.toV2Maybe(io.reactivex.rxjava3.core.Maybe) + +io.reactivex.rxjava3.core.Maybe m3 = RxJavaBridge.toV3Maybe(io.reactivex.Maybe) +``` + +### via MaybeConverter application + +```java +m3 = m2.as(RxJavaBridge.toV3Maybe()) + +m2 = m3.to(RxJavaBridge.toV2Maybe()) +``` + +## Convert between Singles + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Single s2 = RxJavaBridge.toV2Single(io.reactivex.rxjava3.core.Single) + +io.reactivex.rxjava3.core.Single s3 = RxJavaBridge.toV3Single(io.reactivex.Single) +``` + +### via SingleConverter application + +```java +s3 = s2.as(RxJavaBridge.toV3Single()) + +s2 = s3.to(RxJavaBridge.toV2Single()) +``` + + +## Convert between Completables + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Completable c2 = RxJavaBridge.toV2Completable(io.reactivex.rxjava3.core.Completable) + +io.reactivex.rxjava3.core.Completable c3 = RxJavaBridge.toV3Completable(io.reactivex.Completable) +``` + +### via CompletableConverter application + +```java +c3 = c2.as(RxJavaBridge.toV3Completable()) + +c2 = c3.to(RxJavaBridge.toV2Completable()) +``` + + +## Convert between Disposables + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.disposables.Disposable d2 = RxJavaBridge.toV2Disposable(io.reactivex.rxjava3.disposables.Disposable) + +io.reactivex.rxjava3.disosables.Observable d3 = RxJavaBridge.toV3Disposable(io.reactivex.disposables.Disposable) +``` + +## Convert between Schedulers + +### via static methods + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.Scheduler sch2 = RxJavaBridge.toV2Scheduler(io.reactivex.rxjava3.core.Scheduler) + +io.reactivex.rxjava3.core.Scheduler sch3 = RxJavaBridge.toV3Scheduler(io.reactivex.Scheduler) +``` + +### use 3.x standard schedulers in 2.x + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.schedulers.Schedulers.shutdown(); + +RxJavaBridge.startUsingV3Schedulers(); + +// when done + +RxJavaBridge.stopUsingV3Schedulers(); + +io.reactivex.schedulers.Schedulers.start(); +``` + +### use 2.x standard schedulers in 3.x + +```java +import hu.akarnokd.rxjava3.bridge.RxJavaBridge; + +io.reactivex.rxjava3.schedulers.Schedulers.shutdown(); + +RxJavaBridge.startUsingV2Schedulers(); + +// when done + +RxJavaBridge.stopUsingV2Schedulers(); + +io.reactivex.rxjava3.schedulers.Schedulers.start(); +``` diff --git a/build.gradle b/build.gradle index ed79e69..7126cdf 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,15 @@ build.dependsOn jacocoTestReport check.dependsOn jacocoTestReport +javadoc { + failOnError = false + + options.links( + "http://docs.oracle.com/javase/7/docs/api/", + "http://reactivex.io/RxJava/2.x/javadoc", + "http://reactivex.io/RxJava/3.x/javadoc" + ) +} group = "com.github.akarnokd" diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV2toV3.java new file mode 100644 index 0000000..3a15d99 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV2toV3.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class CompletableV2toV3 extends io.reactivex.rxjava3.core.Completable +implements io.reactivex.CompletableConverter { + + final io.reactivex.Completable source; + + static final CompletableV2toV3 CONVERTER = new CompletableV2toV3(null); + + CompletableV2toV3(io.reactivex.Completable source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.rxjava3.core.CompletableObserver s) { + source.subscribe(new CompletableObserverV3toV2(s)); + } + + @Override + public io.reactivex.rxjava3.core.Completable apply(io.reactivex.Completable upstream) { + return new CompletableV2toV3(upstream); + } + + static final class CompletableObserverV3toV2 + implements io.reactivex.CompletableObserver, io.reactivex.rxjava3.disposables.Disposable { + + final io.reactivex.rxjava3.core.CompletableObserver downstream; + + io.reactivex.disposables.Disposable upstream; + + CompletableObserverV3toV2(io.reactivex.rxjava3.core.CompletableObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV3toV2.java new file mode 100644 index 0000000..f6fc6b9 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/CompletableV3toV2.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class CompletableV3toV2 extends io.reactivex.Completable +implements io.reactivex.rxjava3.core.CompletableConverter { + + final io.reactivex.rxjava3.core.Completable source; + + static final CompletableV3toV2 CONVERTER = new CompletableV3toV2(null); + + CompletableV3toV2(io.reactivex.rxjava3.core.Completable source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.CompletableObserver s) { + source.subscribe(new CompletableObserverV2toV3(s)); + } + + @Override + public io.reactivex.Completable apply(io.reactivex.rxjava3.core.Completable upstream) { + return new CompletableV3toV2(upstream); + } + + static final class CompletableObserverV2toV3 + implements io.reactivex.rxjava3.core.CompletableObserver, io.reactivex.disposables.Disposable { + + final io.reactivex.CompletableObserver downstream; + + io.reactivex.rxjava3.disposables.Disposable upstream; + + CompletableObserverV2toV3(io.reactivex.CompletableObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV2toV3.java new file mode 100644 index 0000000..5cc224b --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV2toV3.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class DisposableV2toV3 implements io.reactivex.rxjava3.disposables.Disposable { + + final io.reactivex.disposables.Disposable disposable; + + DisposableV2toV3(io.reactivex.disposables.Disposable disposable) { + this.disposable = disposable; + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } + + @Override + public void dispose() { + disposable.dispose(); + } + + static io.reactivex.rxjava3.disposables.Disposable wrap(io.reactivex.disposables.Disposable disposable) { + if (disposable == io.reactivex.internal.disposables.DisposableHelper.DISPOSED) { + return io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED; + } + if (disposable == io.reactivex.internal.disposables.EmptyDisposable.INSTANCE) { + return io.reactivex.rxjava3.internal.disposables.EmptyDisposable.INSTANCE; + } + return new DisposableV2toV3(disposable); + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV3toV2.java new file mode 100644 index 0000000..e66a4a4 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/DisposableV3toV2.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class DisposableV3toV2 implements io.reactivex.disposables.Disposable { + + final io.reactivex.rxjava3.disposables.Disposable disposable; + + DisposableV3toV2(io.reactivex.rxjava3.disposables.Disposable disposable) { + this.disposable = disposable; + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } + + @Override + public void dispose() { + disposable.dispose(); + } + + static io.reactivex.disposables.Disposable wrap(io.reactivex.rxjava3.disposables.Disposable disposable) { + if (disposable == io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED) { + return io.reactivex.internal.disposables.DisposableHelper.DISPOSED; + } + if (disposable == io.reactivex.rxjava3.internal.disposables.EmptyDisposable.INSTANCE) { + return io.reactivex.internal.disposables.EmptyDisposable.INSTANCE; + } + return new DisposableV3toV2(disposable); + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableSubscriberBridge.java b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableSubscriberBridge.java new file mode 100644 index 0000000..0c1d3c6 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableSubscriberBridge.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class FlowableSubscriberBridge +implements io.reactivex.FlowableSubscriber, + io.reactivex.rxjava3.core.FlowableSubscriber, + org.reactivestreams.Subscription { + + final org.reactivestreams.Subscriber downstream; + + org.reactivestreams.Subscription upstream; + + FlowableSubscriberBridge(org.reactivestreams.Subscriber downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription s) { + this.upstream = s; + downstream.onSubscribe(this); + } + + @Override + public void onNext(T t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable t) { + downstream.onError(t); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } +} \ No newline at end of file diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV2toV3.java new file mode 100644 index 0000000..55c4993 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV2toV3.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class FlowableV2toV3 extends io.reactivex.rxjava3.core.Flowable +implements io.reactivex.FlowableConverter> { + + final io.reactivex.Flowable source; + + static final FlowableV2toV3 CONVERTER = new FlowableV2toV3(null); + + FlowableV2toV3(io.reactivex.Flowable source) { + this.source = source; + } + + @Override + protected void subscribeActual(org.reactivestreams.Subscriber s) { + source.subscribe(new FlowableSubscriberBridge(s)); + } + + @Override + public io.reactivex.rxjava3.core.Flowable apply(io.reactivex.Flowable upstream) { + return new FlowableV2toV3(upstream); + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV3toV2.java new file mode 100644 index 0000000..5b22387 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/FlowableV3toV2.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class FlowableV3toV2 extends io.reactivex.Flowable +implements io.reactivex.rxjava3.core.FlowableConverter> { + + final io.reactivex.rxjava3.core.Flowable source; + + static final FlowableV3toV2 CONVERTER = new FlowableV3toV2(null); + + FlowableV3toV2(io.reactivex.rxjava3.core.Flowable source) { + this.source = source; + } + + @Override + protected void subscribeActual(org.reactivestreams.Subscriber s) { + source.subscribe(new FlowableSubscriberBridge(s)); + } + + @Override + public io.reactivex.Flowable apply(io.reactivex.rxjava3.core.Flowable upstream) { + return new FlowableV3toV2(upstream); + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV2toV3.java new file mode 100644 index 0000000..d7d2ea2 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV2toV3.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class MaybeV2toV3 extends io.reactivex.rxjava3.core.Maybe +implements io.reactivex.MaybeConverter> { + + final io.reactivex.Maybe source; + + static final MaybeV2toV3 CONVERTER = new MaybeV2toV3(null); + + MaybeV2toV3(io.reactivex.Maybe source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.rxjava3.core.MaybeObserver s) { + source.subscribe(new MaybeObserverV3toV2(s)); + } + + @Override + public io.reactivex.rxjava3.core.Maybe apply(io.reactivex.Maybe upstream) { + return new MaybeV2toV3(upstream); + } + + static final class MaybeObserverV3toV2 + implements io.reactivex.MaybeObserver, io.reactivex.rxjava3.disposables.Disposable { + + final io.reactivex.rxjava3.core.MaybeObserver downstream; + + io.reactivex.disposables.Disposable upstream; + + MaybeObserverV3toV2(io.reactivex.rxjava3.core.MaybeObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onSuccess(T t) { + downstream.onSuccess(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV3toV2.java new file mode 100644 index 0000000..f0a5665 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/MaybeV3toV2.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class MaybeV3toV2 extends io.reactivex.Maybe +implements io.reactivex.rxjava3.core.MaybeConverter> { + + final io.reactivex.rxjava3.core.Maybe source; + + static final MaybeV3toV2 CONVERTER = new MaybeV3toV2(null); + + MaybeV3toV2(io.reactivex.rxjava3.core.Maybe source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.MaybeObserver s) { + source.subscribe(new MaybeObserverV2toV3(s)); + } + + @Override + public io.reactivex.Maybe apply(io.reactivex.rxjava3.core.Maybe upstream) { + return new MaybeV3toV2(upstream); + } + + static final class MaybeObserverV2toV3 + implements io.reactivex.rxjava3.core.MaybeObserver, io.reactivex.disposables.Disposable { + + final io.reactivex.MaybeObserver downstream; + + io.reactivex.rxjava3.disposables.Disposable upstream; + + MaybeObserverV2toV3(io.reactivex.MaybeObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onSuccess(T t) { + downstream.onSuccess(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV2toV3.java new file mode 100644 index 0000000..f770dff --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV2toV3.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class ObservableV2toV3 extends io.reactivex.rxjava3.core.Observable +implements io.reactivex.ObservableConverter> { + + final io.reactivex.Observable source; + + static final ObservableV2toV3 CONVERTER = new ObservableV2toV3(null); + + ObservableV2toV3(io.reactivex.Observable source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.rxjava3.core.Observer s) { + source.subscribe(new ObserverV3toV2(s)); + } + + @Override + public io.reactivex.rxjava3.core.Observable apply(io.reactivex.Observable upstream) { + return new ObservableV2toV3(upstream); + } + + static final class ObserverV3toV2 + implements io.reactivex.Observer, io.reactivex.rxjava3.disposables.Disposable { + + final io.reactivex.rxjava3.core.Observer downstream; + + io.reactivex.disposables.Disposable upstream; + + ObserverV3toV2(io.reactivex.rxjava3.core.Observer downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onNext(T t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV3toV2.java new file mode 100644 index 0000000..c93778d --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/ObservableV3toV2.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class ObservableV3toV2 extends io.reactivex.Observable +implements io.reactivex.rxjava3.core.ObservableConverter> { + + final io.reactivex.rxjava3.core.Observable source; + + static final ObservableV3toV2 CONVERTER = new ObservableV3toV2(null); + + ObservableV3toV2(io.reactivex.rxjava3.core.Observable source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.Observer s) { + source.subscribe(new ObserverV2toV3(s)); + } + + @Override + public io.reactivex.Observable apply(io.reactivex.rxjava3.core.Observable upstream) { + return new ObservableV3toV2(upstream); + } + + static final class ObserverV2toV3 + implements io.reactivex.rxjava3.core.Observer, io.reactivex.disposables.Disposable { + + final io.reactivex.Observer downstream; + + io.reactivex.rxjava3.disposables.Disposable upstream; + + ObserverV2toV3(io.reactivex.Observer downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onNext(T t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/RxJavaBridge.java b/src/main/java/hu/akarnokd/rxjava3/bridge/RxJavaBridge.java index 46d63e3..d30987e 100644 --- a/src/main/java/hu/akarnokd/rxjava3/bridge/RxJavaBridge.java +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/RxJavaBridge.java @@ -21,9 +21,396 @@ * @since 3.0.0 */ public final class RxJavaBridge { - + private RxJavaBridge() { throw new IllegalStateException("No instances!"); } + // ----------------------------------------------------- + // Flowable conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Flowable and exposes it as a V2 Flowable. + * @param the element type of the sequence + * @param source the source V3 Flowable + * @return the wrapper V2 Flowable + */ + public static io.reactivex.Flowable toV2Flowable(io.reactivex.rxjava3.core.Flowable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.plugins.RxJavaPlugins.onAssembly(new FlowableV3toV2(source)); + } + + /** + * Wraps a V2 Flowable and exposes it as a V3 Flowable. + * @param the element type of the sequence + * @param source the source V2 Flowable + * @return the wrapper V3 Flowable + */ + public static io.reactivex.rxjava3.core.Flowable toV3Flowable(io.reactivex.Flowable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.rxjava3.plugins.RxJavaPlugins.onAssembly(new FlowableV2toV3(source)); + } + + /** + * Returns the shared singleton instance of a V2 Flowable converter + * that when applied in V2 {@link io.reactivex.Flowable#as(io.reactivex.FlowableConverter)}-based + * fluent conversions, produces a V3 Flowable instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.FlowableConverter> toV3Flowable() { + return (FlowableV2toV3)FlowableV2toV3.CONVERTER; + } + + /** + * Returns the shared singleton instance of a V3 Flowable converter + * that when applied in V3 {@link io.reactivex.rxjava3.core.Flowable#to(io.reactivex.rxjava3.core.FlowableConverter)}-based + * fluent conversions, produces a V2 Flowable instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.rxjava3.core.FlowableConverter> toV2Flowable() { + return (FlowableV3toV2)FlowableV3toV2.CONVERTER; + } + + // ----------------------------------------------------- + // Observable conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Observable and exposes it as a V2 Observable. + * @param the element type of the sequence + * @param source the source V3 Observable + * @return the wrapper V2 Observable + */ + public static io.reactivex.Observable toV2Observable(io.reactivex.rxjava3.core.Observable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.plugins.RxJavaPlugins.onAssembly(new ObservableV3toV2(source)); + } + + /** + * Wraps a V2 Observable and exposes it as a V3 Observable. + * @param the element type of the sequence + * @param source the source V2 Observable + * @return the wrapper V3 Observable + */ + public static io.reactivex.rxjava3.core.Observable toV3Observable(io.reactivex.Observable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.rxjava3.plugins.RxJavaPlugins.onAssembly(new ObservableV2toV3(source)); + } + + /** + * Returns the shared singleton instance of a V2 Observable converter + * that when applied in V2 {@link io.reactivex.Observable#as(io.reactivex.ObservableConverter)}-based + * fluent conversions, produces a V3 Observable instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.ObservableConverter> toV3Observable() { + return (ObservableV2toV3)ObservableV2toV3.CONVERTER; + } + + /** + * Returns the shared singleton instance of a V3 Observable converter + * that when applied in V3 {@link io.reactivex.rxjava3.core.Observable#to(io.reactivex.rxjava3.core.ObservableConverter)}-based + * fluent conversions, produces a V2 Observable instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.rxjava3.core.ObservableConverter> toV2Observable() { + return (ObservableV3toV2)ObservableV3toV2.CONVERTER; + } + + // ----------------------------------------------------- + // Maybe conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Maybe and exposes it as a V2 Maybe. + * @param the element type of the sequence + * @param source the source V3 Maybe + * @return the wrapper V2 Maybe + */ + public static io.reactivex.Maybe toV2Maybe(io.reactivex.rxjava3.core.Maybe source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.plugins.RxJavaPlugins.onAssembly(new MaybeV3toV2(source)); + } + + /** + * Wraps a V2 Maybe and exposes it as a V3 Maybe. + * @param the element type of the sequence + * @param source the source V2 Maybe + * @return the wrapper V3 Maybe + */ + public static io.reactivex.rxjava3.core.Maybe toV3Maybe(io.reactivex.Maybe source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.rxjava3.plugins.RxJavaPlugins.onAssembly(new MaybeV2toV3(source)); + } + + /** + * Returns the shared singleton instance of a V2 Maybe converter + * that when applied in V2 {@link io.reactivex.Maybe#as(io.reactivex.MaybeConverter)}-based + * fluent conversions, produces a V3 Maybe instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.MaybeConverter> toV3Maybe() { + return (MaybeV2toV3)MaybeV2toV3.CONVERTER; + } + + /** + * Returns the shared singleton instance of a V3 Maybe converter + * that when applied in V3 {@link io.reactivex.rxjava3.core.Maybe#to(io.reactivex.rxjava3.core.MaybeConverter)}-based + * fluent conversions, produces a V2 Maybe instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.rxjava3.core.MaybeConverter> toV2Maybe() { + return (MaybeV3toV2)MaybeV3toV2.CONVERTER; + } + + // ----------------------------------------------------- + // Single conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Single and exposes it as a V2 Single. + * @param the element type of the sequence + * @param source the source V3 Single + * @return the wrapper V2 Single + */ + public static io.reactivex.Single toV2Single(io.reactivex.rxjava3.core.Single source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.plugins.RxJavaPlugins.onAssembly(new SingleV3toV2(source)); + } + + /** + * Wraps a V2 Single and exposes it as a V3 Single. + * @param the element type of the sequence + * @param source the source V2 Single + * @return the wrapper V3 Single + */ + public static io.reactivex.rxjava3.core.Single toV3Single(io.reactivex.Single source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.rxjava3.plugins.RxJavaPlugins.onAssembly(new SingleV2toV3(source)); + } + + /** + * Returns the shared singleton instance of a V2 Single converter + * that when applied in V2 {@link io.reactivex.Single#as(io.reactivex.SingleConverter)}-based + * fluent conversions, produces a V3 Single instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.SingleConverter> toV3Single() { + return (SingleV2toV3)SingleV2toV3.CONVERTER; + } + + /** + * Returns the shared singleton instance of a V3 Single converter + * that when applied in V3 {@link io.reactivex.rxjava3.core.Single#to(io.reactivex.rxjava3.core.SingleConverter)}-based + * fluent conversions, produces a V2 Single instance. + * @param the element type of the sequence + * @return the shared singleton converter instance + */ + @SuppressWarnings("unchecked") + public static io.reactivex.rxjava3.core.SingleConverter> toV2Single() { + return (SingleV3toV2)SingleV3toV2.CONVERTER; + } + + // ----------------------------------------------------- + // Completable conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Completable and exposes it as a V2 Completable. + * @param source the source V3 Completable + * @return the wrapper V2 Completable + */ + public static io.reactivex.Completable toV2Completable(io.reactivex.rxjava3.core.Completable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.plugins.RxJavaPlugins.onAssembly(new CompletableV3toV2(source)); + } + + /** + * Wraps a V2 Completable and exposes it as a V3 Completable. + * @param source the source V2 Completable + * @return the wrapper V3 Completable + */ + public static io.reactivex.rxjava3.core.Completable toV3Completable(io.reactivex.Completable source) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(source, "source is null"); + return io.reactivex.rxjava3.plugins.RxJavaPlugins.onAssembly(new CompletableV2toV3(source)); + } + + /** + * Returns the shared singleton instance of a V2 Completable converter + * that when applied in V2 {@link io.reactivex.Completable#as(io.reactivex.CompletableConverter)}-based + * fluent conversions, produces a V3 Completable instance. + * @return the shared singleton converter instance + */ + public static io.reactivex.CompletableConverter toV3Completable() { + return CompletableV2toV3.CONVERTER; + } + + /** + * Returns the shared singleton instance of a V3 Completable converter + * that when applied in V3 {@link io.reactivex.rxjava3.core.Completable#to(io.reactivex.rxjava3.core.CompletableConverter)}-based + * fluent conversions, produces a V2 Completable instance. + * @return the shared singleton converter instance + */ + public static io.reactivex.rxjava3.core.CompletableConverter toV2Completable() { + return CompletableV3toV2.CONVERTER; + } + + // ----------------------------------------------------- + // Disposable conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Disposable and exposes it as a V2 Disposable. + * @param disposable the source V3 Disposable + * @return the wrapper V2 Disposable + */ + public static io.reactivex.disposables.Disposable toV2Disposable(io.reactivex.rxjava3.disposables.Disposable disposable) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(disposable, "disposable is null"); + return DisposableV3toV2.wrap(disposable); + } + + /** + * Wraps a V2 Disposable and exposes it as a V3 Disposable. + * @param disposable the source V2 Disposable + * @return the wrapper V3 Disposable + */ + public static io.reactivex.rxjava3.disposables.Disposable toV3Disposable(io.reactivex.disposables.Disposable disposable) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(disposable, "disposable is null"); + return DisposableV2toV3.wrap(disposable); + } + + // ----------------------------------------------------- + // Scheduler conversions + // ----------------------------------------------------- + + /** + * Wraps a V3 Scheduler and exposes it as a V2 Scheduler. + * @param scheduler the source V3 Scheduler + * @return the wrapper V2 Scheduler + */ + public static io.reactivex.Scheduler toV2Scheduler(io.reactivex.rxjava3.core.Scheduler scheduler) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new SchedulerV3toV2(scheduler); + } + + /** + * Wraps a V2 Scheduler and exposes it as a V3 Scheduler. + * @param scheduler the source V2 Scheduler + * @return the wrapper V3 Scheduler + */ + public static io.reactivex.rxjava3.core.Scheduler toV3Scheduler(io.reactivex.Scheduler scheduler) { + io.reactivex.rxjava3.internal.functions.ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return new SchedulerV2toV3(scheduler); + } + + /** + * Wraps all standard V2 Schedulers into V3 Schedulers and installs + * scheduler hooks in the V3 RxJavaPlugins so that both RxJava 2 and + * RxJava 3 use the same backing scheduler implementations. + */ + public static void startUsingV2Schedulers() { + final io.reactivex.rxjava3.core.Scheduler computation = toV3Scheduler(io.reactivex.schedulers.Schedulers.computation()); + final io.reactivex.rxjava3.core.Scheduler ios = toV3Scheduler(io.reactivex.schedulers.Schedulers.io()); + final io.reactivex.rxjava3.core.Scheduler single = toV3Scheduler(io.reactivex.schedulers.Schedulers.single()); + final io.reactivex.rxjava3.core.Scheduler newThread = toV3Scheduler(io.reactivex.schedulers.Schedulers.newThread()); + + io.reactivex.rxjava3.plugins.RxJavaPlugins.setComputationSchedulerHandler(new io.reactivex.rxjava3.functions.Function() { + @Override + public io.reactivex.rxjava3.core.Scheduler apply(io.reactivex.rxjava3.core.Scheduler v) throws Throwable { + return computation; + } + }); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setIoSchedulerHandler(new io.reactivex.rxjava3.functions.Function() { + @Override + public io.reactivex.rxjava3.core.Scheduler apply(io.reactivex.rxjava3.core.Scheduler v) throws Throwable { + return ios; + } + }); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setSingleSchedulerHandler(new io.reactivex.rxjava3.functions.Function() { + @Override + public io.reactivex.rxjava3.core.Scheduler apply(io.reactivex.rxjava3.core.Scheduler v) throws Throwable { + return single; + } + }); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(new io.reactivex.rxjava3.functions.Function() { + @Override + public io.reactivex.rxjava3.core.Scheduler apply(io.reactivex.rxjava3.core.Scheduler v) throws Throwable { + return newThread; + } + }); + } + + /** + * Stop using the V2 standard schedulers by resetting the V3 scheduler hooks to their + * default (null) handlers in the V3 RxJavaPlugins. + */ + public static void stopUsingV2Schedulers() { + io.reactivex.rxjava3.plugins.RxJavaPlugins.setComputationSchedulerHandler(null); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setIoSchedulerHandler(null); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setSingleSchedulerHandler(null); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(null); + } + + /** + * Wraps all standard V3 Schedulers into V2 Schedulers and installs + * scheduler hooks in the V2 RxJavaPlugins so that both RxJava 2 and + * RxJava 3 use the same backing scheduler implementations. + */ + public static void startUsingV3Schedulers() { + final io.reactivex.Scheduler computation = toV2Scheduler(io.reactivex.rxjava3.schedulers.Schedulers.computation()); + final io.reactivex.Scheduler ios = toV2Scheduler(io.reactivex.rxjava3.schedulers.Schedulers.io()); + final io.reactivex.Scheduler single = toV2Scheduler(io.reactivex.rxjava3.schedulers.Schedulers.single()); + final io.reactivex.Scheduler newThread = toV2Scheduler(io.reactivex.rxjava3.schedulers.Schedulers.newThread()); + + io.reactivex.plugins.RxJavaPlugins.setComputationSchedulerHandler(new io.reactivex.functions.Function() { + @Override + public io.reactivex.Scheduler apply(io.reactivex.Scheduler v) throws Exception { + return computation; + } + }); + io.reactivex.plugins.RxJavaPlugins.setIoSchedulerHandler(new io.reactivex.functions.Function() { + @Override + public io.reactivex.Scheduler apply(io.reactivex.Scheduler v) throws Exception { + return ios; + } + }); + io.reactivex.plugins.RxJavaPlugins.setSingleSchedulerHandler(new io.reactivex.functions.Function() { + @Override + public io.reactivex.Scheduler apply(io.reactivex.Scheduler v) throws Exception { + return single; + } + }); + io.reactivex.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(new io.reactivex.functions.Function() { + @Override + public io.reactivex.Scheduler apply(io.reactivex.Scheduler v) throws Exception { + return newThread; + } + }); + } + + /** + * Stop using the V3 standard schedulers by resetting the V2 scheduler hooks to their + * default (null) handlers in the V2 RxJavaPlugins. + */ + public static void stopUsingV3Schedulers() { + io.reactivex.plugins.RxJavaPlugins.setComputationSchedulerHandler(null); + io.reactivex.plugins.RxJavaPlugins.setIoSchedulerHandler(null); + io.reactivex.plugins.RxJavaPlugins.setSingleSchedulerHandler(null); + io.reactivex.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(null); + } } diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV2toV3.java new file mode 100644 index 0000000..31751d5 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV2toV3.java @@ -0,0 +1,102 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +import java.util.concurrent.TimeUnit; + +final class SchedulerV2toV3 extends io.reactivex.rxjava3.core.Scheduler { + + final io.reactivex.Scheduler scheduler; + + SchedulerV2toV3(io.reactivex.Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable scheduleDirect(Runnable run) { + return DisposableV2toV3.wrap(scheduler.scheduleDirect(run)); + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + return DisposableV2toV3.wrap(scheduler.scheduleDirect(run, delay, unit)); + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + return DisposableV2toV3.wrap(scheduler.schedulePeriodicallyDirect(run, initialDelay, period, unit)); + } + + @Override + public void start() { + scheduler.start(); + } + + @Override + public void shutdown() { + scheduler.shutdown(); + } + + @Override + public long now(TimeUnit unit) { + return scheduler.now(unit); + } + + @Override + public Worker createWorker() { + return new WorkerV2toV3(scheduler.createWorker()); + } + + static final class WorkerV2toV3 extends Worker { + + final io.reactivex.Scheduler.Worker worker; + + WorkerV2toV3(io.reactivex.Scheduler.Worker worker) { + this.worker = worker; + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable schedule(Runnable run) { + return DisposableV2toV3.wrap(worker.schedule(run)); + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable schedule(Runnable run, long delay, TimeUnit unit) { + return DisposableV2toV3.wrap(worker.schedule(run, delay, unit)); + } + + @Override + public io.reactivex.rxjava3.disposables.Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) { + return DisposableV2toV3.wrap(worker.schedulePeriodically(run, initialDelay, period, unit)); + } + + @Override + public long now(TimeUnit unit) { + return worker.now(unit); + } + + @Override + public boolean isDisposed() { + return worker.isDisposed(); + } + + @Override + public void dispose() { + worker.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV3toV2.java new file mode 100644 index 0000000..6d294ba --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/SchedulerV3toV2.java @@ -0,0 +1,102 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +import java.util.concurrent.TimeUnit; + +final class SchedulerV3toV2 extends io.reactivex.Scheduler { + + final io.reactivex.rxjava3.core.Scheduler scheduler; + + SchedulerV3toV2(io.reactivex.rxjava3.core.Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public io.reactivex.disposables.Disposable scheduleDirect(Runnable run) { + return DisposableV3toV2.wrap(scheduler.scheduleDirect(run)); + } + + @Override + public io.reactivex.disposables.Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + return DisposableV3toV2.wrap(scheduler.scheduleDirect(run, delay, unit)); + } + + @Override + public io.reactivex.disposables.Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + return DisposableV3toV2.wrap(scheduler.schedulePeriodicallyDirect(run, initialDelay, period, unit)); + } + + @Override + public void start() { + scheduler.start(); + } + + @Override + public void shutdown() { + scheduler.shutdown(); + } + + @Override + public long now(TimeUnit unit) { + return scheduler.now(unit); + } + + @Override + public Worker createWorker() { + return new WorkerV3toV2(scheduler.createWorker()); + } + + static final class WorkerV3toV2 extends Worker { + + final io.reactivex.rxjava3.core.Scheduler.Worker worker; + + WorkerV3toV2(io.reactivex.rxjava3.core.Scheduler.Worker worker) { + this.worker = worker; + } + + @Override + public io.reactivex.disposables.Disposable schedule(Runnable run) { + return DisposableV3toV2.wrap(worker.schedule(run)); + } + + @Override + public io.reactivex.disposables.Disposable schedule(Runnable run, long delay, TimeUnit unit) { + return DisposableV3toV2.wrap(worker.schedule(run, delay, unit)); + } + + @Override + public io.reactivex.disposables.Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) { + return DisposableV3toV2.wrap(worker.schedulePeriodically(run, initialDelay, period, unit)); + } + + @Override + public long now(TimeUnit unit) { + return worker.now(unit); + } + + @Override + public boolean isDisposed() { + return worker.isDisposed(); + } + + @Override + public void dispose() { + worker.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV2toV3.java b/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV2toV3.java new file mode 100644 index 0000000..8f388bf --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV2toV3.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class SingleV2toV3 extends io.reactivex.rxjava3.core.Single +implements io.reactivex.SingleConverter> { + + final io.reactivex.Single source; + + static final SingleV2toV3 CONVERTER = new SingleV2toV3(null); + + SingleV2toV3(io.reactivex.Single source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.rxjava3.core.SingleObserver s) { + source.subscribe(new SingleObserverV3toV2(s)); + } + + @Override + public io.reactivex.rxjava3.core.Single apply(io.reactivex.Single upstream) { + return new SingleV2toV3(upstream); + } + + static final class SingleObserverV3toV2 + implements io.reactivex.SingleObserver, io.reactivex.rxjava3.disposables.Disposable { + + final io.reactivex.rxjava3.core.SingleObserver downstream; + + io.reactivex.disposables.Disposable upstream; + + SingleObserverV3toV2(io.reactivex.rxjava3.core.SingleObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onSuccess(T t) { + downstream.onSuccess(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV3toV2.java b/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV3toV2.java new file mode 100644 index 0000000..9efc18c --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/bridge/SingleV3toV2.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.bridge; + +final class SingleV3toV2 extends io.reactivex.Single +implements io.reactivex.rxjava3.core.SingleConverter> { + + final io.reactivex.rxjava3.core.Single source; + + static final SingleV3toV2 CONVERTER = new SingleV3toV2(null); + + SingleV3toV2(io.reactivex.rxjava3.core.Single source) { + this.source = source; + } + + @Override + protected void subscribeActual(io.reactivex.SingleObserver s) { + source.subscribe(new SingleObserverV2toV3(s)); + } + + @Override + public io.reactivex.Single apply(io.reactivex.rxjava3.core.Single upstream) { + return new SingleV3toV2(upstream); + } + + static final class SingleObserverV2toV3 + implements io.reactivex.rxjava3.core.SingleObserver, io.reactivex.disposables.Disposable { + + final io.reactivex.SingleObserver downstream; + + io.reactivex.rxjava3.disposables.Disposable upstream; + + SingleObserverV2toV3(io.reactivex.SingleObserver downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { + this.upstream = d; + downstream.onSubscribe(this); + } + + @Override + public void onSuccess(T t) { + downstream.onSuccess(t); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + } +} diff --git a/src/test/java/hu/akarnokd/rxjava3/bridge/RxJavaBridgeTest.java b/src/test/java/hu/akarnokd/rxjava3/bridge/RxJavaBridgeTest.java index f4b5f1b..1adc515 100644 --- a/src/test/java/hu/akarnokd/rxjava3/bridge/RxJavaBridgeTest.java +++ b/src/test/java/hu/akarnokd/rxjava3/bridge/RxJavaBridgeTest.java @@ -16,6 +16,12 @@ package hu.akarnokd.rxjava3.bridge; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; public class RxJavaBridgeTest { @@ -24,4 +30,723 @@ public class RxJavaBridgeTest { public void utilityClass() { TestHelper.checkUtilityClass(RxJavaBridge.class); } + + // ----------------------------------------------------------- + // Flowable + // ----------------------------------------------------------- + + @Test + public void flowableV2toV3Normal() { + RxJavaBridge.toV3Flowable(io.reactivex.Flowable.range(1, 5)) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void flowableV2toV3NormalConverter() { + io.reactivex.Flowable.range(1, 5).as(RxJavaBridge.toV3Flowable()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void flowableV2toV3Error() { + RxJavaBridge.toV3Flowable(io.reactivex.Flowable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void flowableV2toV3Take() { + RxJavaBridge.toV3Flowable(io.reactivex.Flowable.range(1, 5)) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void flowableV3toV2Normal() { + RxJavaBridge.toV2Flowable(io.reactivex.rxjava3.core.Flowable.range(1, 5)) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void flowableV3toV2NormalConverter() { + io.reactivex.rxjava3.core.Flowable.range(1, 5).to(RxJavaBridge.toV2Flowable()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void flowableV3toV2Error() { + RxJavaBridge.toV2Flowable(io.reactivex.rxjava3.core.Flowable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void flowableV3toV2Take() { + RxJavaBridge.toV2Flowable(io.reactivex.rxjava3.core.Flowable.range(1, 5)) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + // ----------------------------------------------------------- + // Observable + // ----------------------------------------------------------- + + @Test + public void observableV2toV3Normal() { + RxJavaBridge.toV3Observable(io.reactivex.Observable.range(1, 5)) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void observableV2toV3NormalConverter() { + io.reactivex.Observable.range(1, 5).as(RxJavaBridge.toV3Observable()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void observableV2toV3Error() { + RxJavaBridge.toV3Observable(io.reactivex.Observable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void observableV2toV3Take() { + RxJavaBridge.toV3Observable(io.reactivex.Observable.range(1, 5)) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void observableV2toV3IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV3Observable(io.reactivex.subjects.PublishSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + @Test + public void observableV3toV2Normal() { + RxJavaBridge.toV2Observable(io.reactivex.rxjava3.core.Observable.range(1, 5)) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void observableV3toV2NormalConverter() { + io.reactivex.rxjava3.core.Observable.range(1, 5).to(RxJavaBridge.toV2Observable()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void observableV3toV2Error() { + RxJavaBridge.toV2Observable(io.reactivex.rxjava3.core.Observable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void observableV3toV2Take() { + RxJavaBridge.toV2Observable(io.reactivex.rxjava3.core.Observable.range(1, 5)) + .take(3) + .test() + .assertResult(1, 2, 3); + } + + + @Test + public void observableV3toV2IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV2Observable(io.reactivex.rxjava3.subjects.PublishSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + // ----------------------------------------------------------- + // Maybe + // ----------------------------------------------------------- + + @Test + public void maybeV2toV3Success() { + RxJavaBridge.toV3Maybe(io.reactivex.Maybe.just(1)) + .test() + .assertResult(1); + } + + @Test + public void maybeV2toV3Completed() { + RxJavaBridge.toV3Maybe(io.reactivex.Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void maybeV2toV3SuccessConverter() { + io.reactivex.Maybe.just(1).as(RxJavaBridge.toV3Maybe()) + .test() + .assertResult(1); + } + + @Test + public void maybeV2toV3Error() { + RxJavaBridge.toV3Maybe(io.reactivex.Maybe.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void maybeV2toV3Dispose() { + io.reactivex.subjects.MaybeSubject subject = io.reactivex.subjects.MaybeSubject.create(); + io.reactivex.rxjava3.observers.TestObserver to = RxJavaBridge.toV3Maybe(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + @Test + public void maybeV2toV3IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV3Maybe(io.reactivex.subjects.MaybeSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + + @Test + public void maybeV3toV2Success() { + RxJavaBridge.toV2Maybe(io.reactivex.rxjava3.core.Maybe.just(1)) + .test() + .assertResult(1); + } + + @Test + public void maybeV3toV2Completed() { + RxJavaBridge.toV2Maybe(io.reactivex.rxjava3.core.Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void maybeV3toV2SuccessConverter() { + io.reactivex.rxjava3.core.Maybe.just(1).to(RxJavaBridge.toV2Maybe()) + .test() + .assertResult(1); + } + + @Test + public void maybeV3toV2Error() { + RxJavaBridge.toV2Maybe(io.reactivex.rxjava3.core.Maybe.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void maybeV3toV2Dispose() { + io.reactivex.rxjava3.subjects.MaybeSubject subject = io.reactivex.rxjava3.subjects.MaybeSubject.create(); + io.reactivex.observers.TestObserver to = RxJavaBridge.toV2Maybe(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + + @Test + public void maybeV3toV2IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV2Maybe(io.reactivex.rxjava3.subjects.MaybeSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + // ----------------------------------------------------------- + // Single + // ----------------------------------------------------------- + + @Test + public void singleV2toV3Success() { + RxJavaBridge.toV3Single(io.reactivex.Single.just(1)) + .test() + .assertResult(1); + } + + @Test + public void singleV2toV3SuccessConverter() { + io.reactivex.Single.just(1).as(RxJavaBridge.toV3Single()) + .test() + .assertResult(1); + } + + @Test + public void singleV2toV3Error() { + RxJavaBridge.toV3Single(io.reactivex.Single.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void singleV2toV3Dispose() { + io.reactivex.subjects.SingleSubject subject = io.reactivex.subjects.SingleSubject.create(); + io.reactivex.rxjava3.observers.TestObserver to = RxJavaBridge.toV3Single(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + + @Test + public void singleV2toV3IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV3Single(io.reactivex.subjects.SingleSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + @Test + public void singleV3toV2Success() { + RxJavaBridge.toV2Single(io.reactivex.rxjava3.core.Single.just(1)) + .test() + .assertResult(1); + } + + @Test + public void singleV3toV2SuccessConverter() { + io.reactivex.rxjava3.core.Single.just(1).to(RxJavaBridge.toV2Single()) + .test() + .assertResult(1); + } + + @Test + public void singleV3toV2Error() { + RxJavaBridge.toV2Single(io.reactivex.rxjava3.core.Single.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void singleV3toV2Dispose() { + io.reactivex.rxjava3.subjects.SingleSubject subject = io.reactivex.rxjava3.subjects.SingleSubject.create(); + io.reactivex.observers.TestObserver to = RxJavaBridge.toV2Single(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + + @Test + public void singleV3toV2IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV2Single(io.reactivex.rxjava3.subjects.SingleSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + // ----------------------------------------------------------- + // Completable + // ----------------------------------------------------------- + + @Test + public void completableV2toV3Completed() { + RxJavaBridge.toV3Completable(io.reactivex.Completable.complete()) + .test() + .assertResult(); + } + + @Test + public void completableV2toV3CompletedConverter() { + io.reactivex.Completable.complete().as(RxJavaBridge.toV3Completable()) + .test() + .assertResult(); + } + + @Test + public void completableV2toV3Error() { + RxJavaBridge.toV3Completable(io.reactivex.Completable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void completableV2toV3Dispose() { + io.reactivex.subjects.CompletableSubject subject = io.reactivex.subjects.CompletableSubject.create(); + io.reactivex.rxjava3.observers.TestObserver to = RxJavaBridge.toV3Completable(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + @Test + public void completableV2toV3IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV3Completable(io.reactivex.subjects.CompletableSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + @Test + public void completableV3toV2Completed() { + RxJavaBridge.toV2Completable(io.reactivex.rxjava3.core.Completable.complete()) + .test() + .assertResult(); + } + + @Test + public void completableV3toV2SuccessConverter() { + io.reactivex.rxjava3.core.Completable.complete().to(RxJavaBridge.toV2Completable()) + .test() + .assertResult(); + } + + @Test + public void completableV3toV2Error() { + RxJavaBridge.toV2Completable(io.reactivex.rxjava3.core.Completable.error(new IOException())) + .test() + .assertFailure(IOException.class); + } + + @Test + public void completableV3toV2Dispose() { + io.reactivex.rxjava3.subjects.CompletableSubject subject = io.reactivex.rxjava3.subjects.CompletableSubject.create(); + io.reactivex.observers.TestObserver to = RxJavaBridge.toV2Completable(subject) + .test(); + + assertTrue(subject.hasObservers()); + + to.dispose(); + + assertFalse(subject.hasObservers()); + } + + @Test + public void completableV3toV2IsDisposed() { + IsDisposed d = new IsDisposed(); + + RxJavaBridge.toV2Completable(io.reactivex.rxjava3.subjects.CompletableSubject.create()) + .subscribe(d); + + assertFalse(d.before); + assertTrue(d.after); + } + + // ----------------------------------------------------------- + // Disposable + // ----------------------------------------------------------- + + @Test + public void disposableV2toV3() { + io.reactivex.disposables.Disposable empty = io.reactivex.disposables.Disposables.empty(); + io.reactivex.rxjava3.disposables.Disposable disposable = RxJavaBridge.toV3Disposable(empty); + + assertFalse(disposable.isDisposed()); + assertFalse(empty.isDisposed()); + + disposable.dispose(); + + assertTrue(disposable.isDisposed()); + assertTrue(empty.isDisposed()); + } + + @Test + public void disposableV2toV3InternalDisposed() { + io.reactivex.disposables.Disposable empty = io.reactivex.internal.disposables.DisposableHelper.DISPOSED; + io.reactivex.rxjava3.disposables.Disposable disposable = RxJavaBridge.toV3Disposable(empty); + + assertSame(disposable, io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED); + } + + @Test + public void disposableV2toV3InternalEmpty() { + io.reactivex.disposables.Disposable empty = io.reactivex.internal.disposables.EmptyDisposable.INSTANCE; + io.reactivex.rxjava3.disposables.Disposable disposable = RxJavaBridge.toV3Disposable(empty); + + assertSame(disposable, io.reactivex.rxjava3.internal.disposables.EmptyDisposable.INSTANCE); + } + + @Test + public void disposableV3toV2() { + io.reactivex.rxjava3.disposables.Disposable empty = io.reactivex.rxjava3.disposables.Disposables.empty(); + io.reactivex.disposables.Disposable disposable = RxJavaBridge.toV2Disposable(empty); + + assertFalse(disposable.isDisposed()); + assertFalse(empty.isDisposed()); + + disposable.dispose(); + + assertTrue(disposable.isDisposed()); + assertTrue(empty.isDisposed()); + } + + @Test + public void disposableV3toV2InternalDisposed() { + io.reactivex.rxjava3.disposables.Disposable empty = io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED; + io.reactivex.disposables.Disposable disposable = RxJavaBridge.toV2Disposable(empty); + + assertSame(disposable, io.reactivex.internal.disposables.DisposableHelper.DISPOSED); + } + + @Test + public void disposableV3toV2InternalEmpty() { + io.reactivex.rxjava3.disposables.Disposable empty = io.reactivex.rxjava3.internal.disposables.EmptyDisposable.INSTANCE; + io.reactivex.disposables.Disposable disposable = RxJavaBridge.toV2Disposable(empty); + + assertSame(disposable, io.reactivex.internal.disposables.EmptyDisposable.INSTANCE); + } + + // ----------------------------------------------------------- + // Disposable + // ----------------------------------------------------------- + + @Test + public void schedulerUseV2toV3() { + final io.reactivex.schedulers.TestScheduler scheduler = new io.reactivex.schedulers.TestScheduler(); + io.reactivex.functions.Function handler = new io.reactivex.functions.Function() { + @Override + public io.reactivex.Scheduler apply(io.reactivex.Scheduler s) throws Exception { + return scheduler; + } + }; + final AtomicInteger count = new AtomicInteger(); + Runnable r = new Runnable() { + @Override + public void run() { + count.getAndIncrement(); + } + }; + + try { + io.reactivex.plugins.RxJavaPlugins.setComputationSchedulerHandler(handler); + io.reactivex.plugins.RxJavaPlugins.setIoSchedulerHandler(handler); + io.reactivex.plugins.RxJavaPlugins.setSingleSchedulerHandler(handler); + io.reactivex.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(handler); + + RxJavaBridge.startUsingV2Schedulers(); + + io.reactivex.rxjava3.schedulers.Schedulers.shutdown(); + io.reactivex.rxjava3.schedulers.Schedulers.start(); + + checkScheduler(scheduler, io.reactivex.rxjava3.schedulers.Schedulers.computation(), r); + checkScheduler(scheduler, io.reactivex.rxjava3.schedulers.Schedulers.io(), r); + checkScheduler(scheduler, io.reactivex.rxjava3.schedulers.Schedulers.single(), r); + checkScheduler(scheduler, io.reactivex.rxjava3.schedulers.Schedulers.newThread(), r); + + assertEquals(4 * 5 * 2, count.get()); + + RxJavaBridge.stopUsingV2Schedulers(); + + assertNull(io.reactivex.rxjava3.plugins.RxJavaPlugins.getComputationSchedulerHandler()); + assertNull(io.reactivex.rxjava3.plugins.RxJavaPlugins.getIoSchedulerHandler()); + assertNull(io.reactivex.rxjava3.plugins.RxJavaPlugins.getSingleSchedulerHandler()); + assertNull(io.reactivex.rxjava3.plugins.RxJavaPlugins.getNewThreadSchedulerHandler()); + } finally { + io.reactivex.plugins.RxJavaPlugins.reset(); + io.reactivex.rxjava3.plugins.RxJavaPlugins.reset(); + } + } + + @Test + public void schedulerUseV3toV2() { + final io.reactivex.rxjava3.schedulers.TestScheduler scheduler = new io.reactivex.rxjava3.schedulers.TestScheduler(); + io.reactivex.rxjava3.functions.Function handler = new io.reactivex.rxjava3.functions.Function() { + @Override + public io.reactivex.rxjava3.core.Scheduler apply(io.reactivex.rxjava3.core.Scheduler s) throws Exception { + return scheduler; + } + }; + final AtomicInteger count = new AtomicInteger(); + Runnable r = new Runnable() { + @Override + public void run() { + count.getAndIncrement(); + } + }; + + try { + io.reactivex.rxjava3.plugins.RxJavaPlugins.setComputationSchedulerHandler(handler); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setIoSchedulerHandler(handler); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setSingleSchedulerHandler(handler); + io.reactivex.rxjava3.plugins.RxJavaPlugins.setNewThreadSchedulerHandler(handler); + + RxJavaBridge.startUsingV3Schedulers(); + + io.reactivex.schedulers.Schedulers.shutdown(); + io.reactivex.schedulers.Schedulers.start(); + + checkScheduler(scheduler, io.reactivex.schedulers.Schedulers.computation(), r); + checkScheduler(scheduler, io.reactivex.schedulers.Schedulers.io(), r); + checkScheduler(scheduler, io.reactivex.schedulers.Schedulers.single(), r); + checkScheduler(scheduler, io.reactivex.schedulers.Schedulers.newThread(), r); + + assertEquals(4 * 5 * 2, count.get()); + + RxJavaBridge.stopUsingV3Schedulers(); + + assertNull(io.reactivex.plugins.RxJavaPlugins.getComputationSchedulerHandler()); + assertNull(io.reactivex.plugins.RxJavaPlugins.getIoSchedulerHandler()); + assertNull(io.reactivex.plugins.RxJavaPlugins.getSingleSchedulerHandler()); + assertNull(io.reactivex.plugins.RxJavaPlugins.getNewThreadSchedulerHandler()); + } finally { + io.reactivex.plugins.RxJavaPlugins.reset(); + io.reactivex.rxjava3.plugins.RxJavaPlugins.reset(); + } + } + + static void checkScheduler(io.reactivex.schedulers.TestScheduler tester, io.reactivex.rxjava3.core.Scheduler scheduler, Runnable r) { + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + + scheduler.scheduleDirect(r); + scheduler.scheduleDirect(r, 1, TimeUnit.SECONDS); + + io.reactivex.rxjava3.disposables.Disposable d = scheduler.schedulePeriodicallyDirect(r, 2, 1, TimeUnit.SECONDS); + + tester.advanceTimeBy(4, TimeUnit.SECONDS); + + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + + d.dispose(); + + io.reactivex.rxjava3.core.Scheduler.Worker worker = scheduler.createWorker(); + try { + assertEquals(tester.now(TimeUnit.SECONDS), worker.now(TimeUnit.SECONDS)); + + worker.schedule(r); + worker.schedule(r, 1, TimeUnit.SECONDS); + + worker.schedulePeriodically(r, 2, 1, TimeUnit.SECONDS); + + tester.advanceTimeBy(4, TimeUnit.SECONDS); + + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + } finally { + worker.dispose(); + assertTrue(worker.isDisposed()); + } + } + + static void checkScheduler(io.reactivex.rxjava3.schedulers.TestScheduler tester, io.reactivex.Scheduler scheduler, Runnable r) { + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + + scheduler.scheduleDirect(r); + scheduler.scheduleDirect(r, 1, TimeUnit.SECONDS); + + io.reactivex.disposables.Disposable d = scheduler.schedulePeriodicallyDirect(r, 2, 1, TimeUnit.SECONDS); + + tester.advanceTimeBy(4, TimeUnit.SECONDS); + + d.dispose(); + + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + + io.reactivex.Scheduler.Worker worker = scheduler.createWorker(); + try { + assertEquals(tester.now(TimeUnit.SECONDS), worker.now(TimeUnit.SECONDS)); + + worker.schedule(r); + worker.schedule(r, 1, TimeUnit.SECONDS); + + worker.schedulePeriodically(r, 2, 1, TimeUnit.SECONDS); + + tester.advanceTimeBy(4, TimeUnit.SECONDS); + + assertEquals(tester.now(TimeUnit.SECONDS), scheduler.now(TimeUnit.SECONDS)); + } finally { + worker.dispose(); + assertTrue(worker.isDisposed()); + } + } + + static final class IsDisposed implements + io.reactivex.Observer, + io.reactivex.rxjava3.core.Observer, + io.reactivex.MaybeObserver, + io.reactivex.rxjava3.core.MaybeObserver, + io.reactivex.SingleObserver, + io.reactivex.rxjava3.core.SingleObserver, + io.reactivex.CompletableObserver, + io.reactivex.rxjava3.core.CompletableObserver + { + + boolean before; + + boolean after; + + @Override + public void onSuccess(Integer t) { + } + + @Override + public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { + before = d.isDisposed(); + d.dispose(); + after = d.isDisposed(); + } + + @Override + public void onSubscribe(io.reactivex.disposables.Disposable d) { + before = d.isDisposed(); + d.dispose(); + after = d.isDisposed(); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + } }