-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flowable#groupBy
race leads to a back-pressure issue
#7100
Comments
There is no ordering between group emission and cancellation and the JavaDocs warns about MBEs due to lack of requesting more groups.
In the example, the |
It's more complicated than that. The request/delivery in the inner groups still feed back to the main source because they have to trigger progress if they become ready. Consequently, since the next upstream value may be creating a new group or would go to an existing group, the main group output may hold back the inner groups. You could buffer these new groups but then they can't get consumed and can clog up the main queue. |
Yeah. Noticed that. I guess I found another fix. Let me try it quickly for the Reactor to see if it works. Apologies for confusing with the first message |
Fix -> reactor/reactor-core#2450 |
Hi!
While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)
Apparently, with
3.0.7
, the same construction in RxJava fails with a very similar issue (although the failure is different):Gives (not 100% reliably, consider running in "rerun until failure" mode):
A few interesting observations:
observeOn
's buffer size to131
and higher makes it always pass130
would sometimes fail withUnable to emit a new group (#99) due to lack of requests
129
would sometimes fail withUnable to emit a new group (#98) due to lack of requests
128
would sometimes fail withUnable to emit a new group (#97) due to lack of requests
So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.
The text was updated successfully, but these errors were encountered: