Skip to content

Commit

Permalink
Copy internal helper methods from RxJava to avoid OSGi problems
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed May 13, 2020
1 parent b8a5acf commit bf9a541
Show file tree
Hide file tree
Showing 33 changed files with 4,207 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Check out the [https://github.com/akarnokd/RxJavaBridge](https://github.com/akar

```
dependencies {
compile "com.github.akarnokd:rxjava3-interop:3.0.0"
compile "com.github.akarnokd:rxjava3-interop:3.0.1"
}
```

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GROUP=com.github.akarnokd
VERSION_NAME=3.0.0
version=3.0.0
VERSION_NAME=3.0.1
version=3.0.1

POM_ARTIFACT_ID=rxjava3-interop
POM_NAME=Interop library between RxJava 1.x and 3.x
Expand Down
156 changes: 156 additions & 0 deletions src/main/java/hu/akarnokd/rxjava3/interop/BackpressureHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava3.interop;

import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Utility class to help with backpressure-related operations such as request aggregation.
*/
final class BackpressureHelper {
/** Utility class. */
private BackpressureHelper() {
throw new IllegalStateException("No instances!");
}

/**
* Adds two long values and caps the sum at {@link Long#MAX_VALUE}.
* @param a the first value
* @param b the second value
* @return the sum capped at {@link Long#MAX_VALUE}
*/
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}

/**
* Multiplies two long values and caps the product at {@link Long#MAX_VALUE}.
* @param a the first value
* @param b the second value
* @return the product capped at {@link Long#MAX_VALUE}
*/
public static long multiplyCap(long a, long b) {
long u = a * b;
if (((a | b) >>> 31) != 0) {
if (u / a != b) {
return Long.MAX_VALUE;
}
}
return u;
}

/**
* Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
* caps the result at {@link Long#MAX_VALUE} and returns the previous value.
* @param requested the {@code AtomicLong} holding the current requested value
* @param n the value to add, must be positive (not verified)
* @return the original value before the add
*/
public static long add(@NonNull AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}

/**
* Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
* caps the result at {@link Long#MAX_VALUE} and returns the previous value and
* considers {@link Long#MIN_VALUE} as a cancel indication (no addition then).
* @param requested the {@code AtomicLong} holding the current requested value
* @param n the value to add, must be positive (not verified)
* @return the original value before the add
*/
public static long addCancel(@NonNull AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MIN_VALUE) {
return Long.MIN_VALUE;
}
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}

/**
* Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}.
* @param requested the target field holding the current requested amount
* @param n the produced element count, positive (not validated)
* @return the new amount
*/
public static long produced(@NonNull AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}

/**
* Atomically subtract the given number (positive, not validated) from the target field if
* it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode).
* @param requested the target field holding the current requested amount
* @param n the produced element count, positive (not validated)
* @return the new amount
*/
public static long producedCancel(@NonNull AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MIN_VALUE) {
return Long.MIN_VALUE;
}
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 David Karnok
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 David Karnok
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 David Karnok
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
141 changes: 141 additions & 0 deletions src/main/java/hu/akarnokd/rxjava3/interop/DisposableHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package hu.akarnokd.rxjava3.interop;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.ProtocolViolationException;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Utility methods for working with Disposables atomically.
*/
enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;

/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}

/**
* Atomically sets the field to the given non-null Disposable and returns true
* or returns false if the field is non-null.
* If the target field contains the common DISPOSED instance, the supplied disposable
* is disposed. If the field contains other non-null Disposable, an IllegalStateException
* is signalled to the RxJavaPlugins.onError hook.
*
* @param field the target field
* @param d the disposable to set, not null
* @return true if the operation succeeded, false
*/
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}

/**
* Atomically disposes the Disposable in the field if not already disposed.
* @param field the target field
* @return true if the current thread managed to dispose the Disposable
*/
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}

/**
* Verifies that current is null, next is not null, otherwise signals errors
* to the RxJavaPlugins and returns false.
* @param current the current Disposable, expected to be null
* @param next the next Disposable, expected to be non-null
* @return true if the validation succeeded
*/
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {
next.dispose();
reportDisposableSet();
return false;
}
return true;
}

/**
* Reports that the disposable is already set to the RxJavaPlugins error handler.
*/
public static void reportDisposableSet() {
RxJavaPlugins.onError(new ProtocolViolationException("Disposable already set!"));
}

/**
* Atomically tries to set the given Disposable on the field if it is null or disposes it if
* the field contains {@link #DISPOSED}.
* @param field the target field
* @param d the disposable to set
* @return true if successful, false otherwise
*/
public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
if (!field.compareAndSet(null, d)) {
if (field.get() == DISPOSED) {
d.dispose();
}
return false;
}
return true;
}

@Override
public void dispose() {
// deliberately no-op
}

@Override
public boolean isDisposed() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 David Karnok
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 David Karnok
* Copyright 2016-2020 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,23 +58,23 @@ static final class SourceSubscriber<T>
@Override
public void request(long n) {
if (n != 0L) {
io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.deferredRequest(this, requested, n);
SubscriptionHelper.deferredRequest(this, requested, n);
}
}

@Override
public void unsubscribe() {
io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.cancel(this);
SubscriptionHelper.cancel(this);
}

@Override
public boolean isUnsubscribed() {
return io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.CANCELLED == get();
return SubscriptionHelper.CANCELLED == get();
}

@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.deferredSetOnce(this, requested, s);
SubscriptionHelper.deferredSetOnce(this, requested, s);
}

@Override
Expand Down
Loading

0 comments on commit bf9a541

Please sign in to comment.