Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider not calling onNext(null) #33

Open
cretz opened this issue Oct 31, 2016 · 7 comments
Open

Consider not calling onNext(null) #33

cretz opened this issue Oct 31, 2016 · 7 comments

Comments

@cretz
Copy link
Contributor

cretz commented Oct 31, 2016

Null values can be difficult for different reactive streams implementations to handle (e.g. monix/monix#252). I understand it can cause compatibility concerns, but possibly changing com.github.pgasync.Transaction::commit to return Observable<?> or Observable<Object> instead of Observable<Void> and return something like Optional.empty() may be cleaner (and apply the same thing to other places where onNext(null) is invoked).

@alexandru
Copy link

To add some context, the problem is that interfaces like java.util.Queue are using null to signal an empty queue on methods such as poll(). See: https://docs.oracle.com/javase/7/docs/api/java/util/Queue.html#poll()

This is then implemented by all concurrent queue implementations, like Java's own ConcurrentLinkedQueue, or the ones in JCTools.org.

And reactive streams implementations, such as Monix, use concurrent queues to implement buffering on asynchronous boundaries, where needed. RxJava is also using the queue implementations in JCTools. Even if you've been allowed to push null values downstream and it worked until now, because of these queue implementations pushing nulls is basically a minefield.

@alexandru
Copy link

Here's another clearer reason for why null is not allowed - null parameters are forbidden by the Reactive Streams specification, see rule 2.13 - if the Monix implementation doesn't throw a NullPointerException then it wouldn't be compliant with that spec.

@alaisi
Copy link
Owner

alaisi commented Nov 9, 2016

Hi,

you're correct, but due to backward compatibility I'm hesitant to change observables emitting a single null to Observable.empty(). But for example returning something like Observable<Transaction.State> with onNext(Transaction.State.ACTIVE) from Transaction::begin would probably be a good compromise? The same goes for the other null-emitting observables.

@alexandru
Copy link

@alaisi if that's some kind of enum, then it sounds good. IMO the problem is that Observables aren't really meant for single values. Those are usually Future abstractions, though granted in Java those suck at least until ver 8, in @monix we have Task and I believe RxJava had Single, you might want to check that out.

@cretz
Copy link
Contributor Author

cretz commented Nov 9, 2016

@alaisi - I'd say that's fine, just note you still have a compat issue if people saved off the Observable<Void> but I agree that it is less of a compat concern than not emitting anything. I don't think you need to change begin, just commit and rollback. I agree with @alexandru that you should utilize Single if you can stand the compat break (and everywhere else where it's a single job with a single emitted value), but otherwise an observable that emits a single non-null value should be fine.

@alaisi
Copy link
Owner

alaisi commented Nov 14, 2016

Ok, great. For Single, that will probably be taken into using when porting to RxJava 2.0.

@cretz
Copy link
Contributor Author

cretz commented Apr 16, 2018

For any others, note, this is causing me production issues at

. Here is the stack trace from my Play app:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[MissingBackpressureException: null]]
        at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:255)
        at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:182)
        at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:310)
        at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:308)
        at scala.concurrent.Future.$anonfun$recoverWith$1(Future.scala:414)
        at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
Caused by: rx.exceptions.MissingBackpressureException: null
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
        at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:130)
        at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:121)
        at com.github.pgasync.impl.netty.NettyPgProtocolStream$1.onNext(NettyPgProtocolStream.java:204)
        at com.github.pgasync.impl.netty.NettyPgProtocolStream$5.channelRead(NettyPgProtocolStream.java:304)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)

Not really sure what to do about it (working on moving to another lib) but figured I'd post the stack trace to help anyone else Googling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants