-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Suggestion] Create an operator that merges multiple ordered flux's into a single flow with optional fields for flux's with gaps in there keys #3645
Comments
Hi, @vibbix! Thanks for sharing your interesting use case! Cheers, |
Wondering whether the existing API ideas can be used to achieve this result with the combinator variants, e.g.
when made configurable to wait for the last active source instead of terminating upon the first finishing one. The notion of zipping on key is quite limiting potentially. |
@OlegDokuka Having a
This would be great, and if there was something like this where I can request that the output tuple "replenish" the producer slot that I consumed, I would build these sort of functions on top of that. in In my case it's more about keeping a row-level/horizontal data structure in-tact. Making this a built-in Reactor operator would ensure the entire workflow is type-safe, that each source publisher is having it's backpressure dealt with correctly, and that buffer bloat is minimized.
It could be a great shortcut for this common use case. I tend to have different types in each |
@vibbix would you be so kind to provide some more test cases with some corner cases to help us better understand and consider a possible design? For one, I'm wondering if the keys can appear more than once. If at all you'd be willing to provide the code for the implementation that works so far that would also be beneficial. |
@chemicL I created a example of what I tend to use now here: vibbix/rx-experiments. The attached
In my design, I assume that any publisher that has multiple of the same keys incoming have to be grouped prior. |
@vibbix thank you. We appreciate your input. We are in the planning process currently and will get back when we have some priorities. Just to get a sense of work involved - are you interested in contributing something once we settle on design or would you expect the team or community to provide an implementation? |
Combine a
Flux.zip
-akin operator with a key-selecting variant ofFlux.mergeComparing
for publishers that should be merged based on keys, for both finite and unbounded sources, of any combination in length.Motivation
I use Reactor everyday in my data pipeline work, to pretty great success. The lazy operators are amazing at handling complex merge operations across many distinct sources. One of the things I run into however is the case when I am trying to fan-in multiple sources of data that have different lengths. and mismatched (but ordered) keys.
Example use-case
An example of this would be merging in 4 different JSON arrays, where a "match-key" would be missing from some of the sets, or that some of the sets have totally different lengths, and would short circuit early.
I have used
Flux.groupBy
in the past, but that doesn't work in a unbounded Flux caseI tend to create a custom interleave for these situations, but a generic solution would be incredibly helpful.
Desired solution
An example signature for this kind of operator that I have experimented with:
Desired output
Test Case
Considered alternatives
Flux.groupBy
doesn't work in unbounded / infinite publisher situations.Mono.zip
Flux.mergeComparingDelayError(...)
Flux.windowUntilChanged
to group the entitiesflatMap
withreduceWith
accumulator to build the tuple outOptional.empty()
for unmatched fieldsThe text was updated successfully, but these errors were encountered: