Skip to content

Commit

Permalink
fix deprecations with rxjava 1.2.1, increase test timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Oct 25, 2016
1 parent c327ce9 commit afa71b8
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.target>1.6</maven.compiler.target>
<rxjava.version>1.1.10</rxjava.version>
<rxjava.version>1.2.1</rxjava.version>
<jmh.version>1.11.1</jmh.version>
<exec.version>1.4.0</exec.version>
<slf4j.version>1.7.12</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ static abstract class BoundedSubscriber<T> extends Subscriber<T> {

final AtomicInteger wip;

final NotificationLite<T> nl;

final int limit;

List<T> buffer;
Expand All @@ -105,7 +103,6 @@ public BoundedSubscriber(Subscriber<? super List<T>> actual, int capacityHint,
buffer = new ArrayList<T>(capacityHint);
requested = new AtomicLong();
wip = new AtomicInteger();
nl = NotificationLite.instance();
limit = prefetch - (prefetch >> 2);
if (prefetch == Integer.MAX_VALUE) {
request(Long.MAX_VALUE);
Expand All @@ -116,7 +113,7 @@ public BoundedSubscriber(Subscriber<? super List<T>> actual, int capacityHint,

@Override
public void onNext(T t) {
if (!queue.offer(nl.next(t))) {
if (!queue.offer(NotificationLite.next(t))) {
unsubscribe();
onError(new MissingBackpressureException());
} else {
Expand Down Expand Up @@ -205,7 +202,7 @@ void drain() {
break;
}

T value = nl.getValue(notification);
T value = NotificationLite.getValue(notification);

localBuffer.add(value);
localConsumption++;
Expand Down Expand Up @@ -327,7 +324,7 @@ void drain() {
break;
}

T value = nl.getValue(o);
T value = NotificationLite.getValue(o);

boolean emit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ static final class MergeProducer<T> extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = -812969080497027108L;

final NotificationLite<T> nl = NotificationLite.instance();

final boolean delayErrors;
final Comparator<? super T> comparator;
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -222,15 +220,15 @@ public void emit() {
// if we already found a value, compare it against the
// current
if (hasAtLeastOne) {
T v = nl.getValue(o);
T v = NotificationLite.getValue(o);
int c = comparator.compare(minimum, v);
if (c > 0) {
minimum = v;
toPoll = i;
}
} else {
// this is the first value found
minimum = nl.getValue(o);
minimum = NotificationLite.getValue(o);
hasAtLeastOne = true;
toPoll = i;
}
Expand Down Expand Up @@ -315,7 +313,7 @@ public void requestMore(long n) {
@Override
public void onNext(T t) {
try {
queue.onNext(parent.nl.next(t));
queue.onNext(NotificationLite.next(t));
} catch (MissingBackpressureException mbe) {
try {
onError(mbe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class ObservableServerSocketTest {

private static final Charset UTF_8 = Charset.forName("UTF-8");

// private static final int PORT = 12345;
// private static final int PORT = 12345;
private static final String TEXT = "hello there";

private static final int POOL_SIZE = 10;
Expand Down Expand Up @@ -174,13 +174,13 @@ public Observable<byte[]> call(Observable<byte[]> g) {
.compose(Bytes.collect()) //
.doOnNext(Actions.setAtomic(result)) //
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName()
+ ": " + new String(bytes));
}
}) //
.onErrorResumeNext(Observable.<byte[]>empty()) //
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName() + ": "
+ new String(bytes));
}
}) //
.onErrorResumeNext(Observable.<byte[]> empty()) //
.subscribeOn(scheduler);
}
}) //
Expand Down Expand Up @@ -220,19 +220,18 @@ public Observable<String> call(Observable<byte[]> g) {
.compose(Bytes.collect()) //
.doOnNext(Actions.setAtomic(result)) //
.map(new Func1<byte[], String>() {
@Override
public String call(byte[] bytes) {
return new String(bytes, UTF_8);
}
}) //
@Override
public String call(byte[] bytes) {
return new String(bytes, UTF_8);
}
}) //
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(
Thread.currentThread().getName() + ": " + s);
}
}) //
.onErrorResumeNext(Observable.<String>empty()) //
@Override
public void call(String s) {
System.out.println(Thread.currentThread().getName() + ": " + s);
}
}) //
.onErrorResumeNext(Observable.<String> empty()) //
.subscribeOn(scheduler);
}
}).subscribeOn(scheduler) //
Expand Down Expand Up @@ -271,10 +270,9 @@ public void testLoad() throws InterruptedException {
try {
int bufferSize = 4;
IO.serverSocketAutoAllocatePort(Actions.setAtomic(port)) //
.readTimeoutMs(10000) //
.readTimeoutMs(30000) //
.bufferSize(bufferSize) //
.create()
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
.create().flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> g) {
return g //
Expand All @@ -293,7 +291,7 @@ public String call(byte[] bytes) {
}) //
.doOnNext(Actions.decrement1(connections)) //
.doOnError(Actions.printStackTrace1()) //
.doOnError(Actions.<Throwable>setToTrue1(errored)) //
.doOnError(Actions.<Throwable> setToTrue1(errored)) //
.subscribeOn(scheduler) //
.subscribe(ts);
TestSubscriber<Object> ts2 = TestSubscriber.create();
Expand Down Expand Up @@ -342,12 +340,13 @@ public Observable<Object> call() {
throw new RuntimeException(e);
} finally {
try {
socket.close();
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return Observable.<Object>just(1);
return Observable.<Object> just(1);
}
}) //
.timeout(5, TimeUnit.SECONDS) //
Expand Down Expand Up @@ -388,13 +387,13 @@ public Observable<byte[]> call(Observable<byte[]> g) {
.compose(Bytes.collect()) //
.doOnNext(Actions.setAtomic(result)) //
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName()
+ ": " + new String(bytes));
}
}) //
.onErrorResumeNext(Observable.<byte[]>empty()) //
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName() + ": "
+ new String(bytes));
}
}) //
.onErrorResumeNext(Observable.<byte[]> empty()) //
.subscribeOn(scheduler);
}
}).subscribeOn(scheduler) //
Expand All @@ -421,7 +420,7 @@ public void call(byte[] bytes) {
ts.unsubscribe();
}
}

@Test
public void testAcceptSocketRejectsAlways()
throws UnknownHostException, IOException, InterruptedException {
Expand Down Expand Up @@ -452,7 +451,6 @@ public void testAcceptSocketRejectsAlways()
}
}


public static void main(String[] args) throws InterruptedException {
reset();
TestSubscriber<Object> ts = TestSubscriber.create();
Expand All @@ -464,13 +462,13 @@ public Observable<byte[]> call(Observable<byte[]> g) {
.compose(Bytes.collect()) //
.doOnNext(new Action1<byte[]>() {

@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName() + ": "
+ new String(bytes).trim());
}
}) //
.onErrorResumeNext(Observable.<byte[]>empty()) //
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName() + ": "
+ new String(bytes).trim());
}
}) //
.onErrorResumeNext(Observable.<byte[]> empty()) //
.subscribeOn(scheduler);
}
}).subscribeOn(scheduler) //
Expand Down

0 comments on commit afa71b8

Please sign in to comment.