Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested queries get stuck when using transactions (bound single connection) #194

Closed
cambierr opened this issue Sep 24, 2019 · 15 comments
Closed
Labels
for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid

Comments

@cambierr
Copy link

cambierr commented Sep 24, 2019

Hi,

I just tried a very simple Exemple, based on two reactive repositories:

Given br, a r2dbc crud repo, and cr, another r2dbc crud repo:

br.findAll()
   .flatMap(br -> {
      return cr.findById(br.getPropertyOne())
               .doOnNext(c -> br.setProperty2(c))
               .thenReturn(br);
    })
   .collectList().block();

This code samples never completes (only the 250 first, or so, entries reach the .collectList operator). After some digging, adding some onBackpressureXXX operator after the findAll seems to "fix" the issue by... well, dropping elements or buffering them.

At this point, my understanding is that the r2dbc reactive repositories doesn't uses the consumer feedback mechanism which removes a significant part of r2dbc's benefits.

Am I wrong ? Is there any better way to achieve the same objective ?

Thanks !

@mp911de mp911de changed the title Consumer feedback not taken into account ? Backpressure not taken into account ? Sep 25, 2019
@mp911de
Copy link
Member

mp911de commented Sep 25, 2019

Can you provide more details? Do you use connection pooling and do you have a ReactiveTransactionManager set up along with @EnableTransactionManagement?

If so, then a couple of things play into that. First, the database connection is pinned to the Subscription which causes DatabaseClient and R2DBC Repositories to use the same connection for each call.

Secondly, the code calls operations on a connection while the connection hasn't consumed all results from the previous query. Rows are emitted as data is received and decoded from the transport channel. That means, the mapping function at flatMap(…) gets called as soon as the first rows arrive.

In the next step, the code tries to issue sub-queries on the same connection. What happens now is that queries are sent to the connection but sub-queries do not receive any data because the outer query isn't completed. Therefore, inner subscribers are left with the initial demand and lock up the connection.

@cambierr
Copy link
Author

Indeed, ReactiveTransactionManager and @EnableTransactionManagement are being used, as well as connection pooling.

Thank you for the explanation on what happen internally. Could you suggest a way to get this working ? I'm not yet that familiar with (spring-data-)r2dbc and trying to learn

@cambierr
Copy link
Author

I just tried the same code with a DefaultDatabaseClient instead, and using a debug point in inConnectionMany(...) I double-checked what connection are used.

A first one is used by the findAll while the findById in the flatMap uses several ones (always the N sames thanks to the pool) but never uses the one used by the findAll.

So it should work but I have the exact same behavior as before so I've to admit that I'm a bit lost.

@cambierr
Copy link
Author

Also, I added a

.onBackpressureDrop(new Consumer<Broadcast>() {
  @Override
  public void accept(Broadcast broadcast) {
    System.out.println("DROP");
  }
})
.doOnRequest(r -> System.out.println("RQ 1 " + r))

right after the findAll which seems to drop thousands of events while the request was for only two entries.

@mp911de
Copy link
Member

mp911de commented Sep 25, 2019

Avoid stream creation while another stream is active (Famous quote: Do not cross the streams) as general rule.

If you want to fetch related data, then ideally collect all results as List and the run subqueries. This way, the initial response stream is consumed and the connection is free to fetch additional results.

Something like the following snippet should do the job:

br.findAll().collectList()
		.flatMap(it -> {

			List<Mono<Reference>> refs = new ArrayList<>();
			for (Person p : it) {
				 refs.add(cr.findById(br.getPropertyOne()).doOnNext(…));
			}

			return Flux.concat(refs).thenReturn(it);
		});

@mp911de mp911de added for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid labels Sep 25, 2019
@cambierr
Copy link
Author

I'll continue on SO then: https://stackoverflow.com/questions/58094931/spring-data-r2dbc-backpressure-not-taken-into-account

Mainly: my issue regarding your suggestion is that we need to keep all in memory before starting to process the data while I kinda like the "stream" approach that doesn't requires me to store millions of lines in memory.

@mp911de
Copy link
Member

mp911de commented Sep 25, 2019

Moving to Stack Overflow is a good idea to share our discussion with the community.

we need to keep all in memory before starting to process the data

Yes, that is the case. In imperative programming, we typically consume data as List to consume all protocol frames from the transport. You can run into a similar situation with JDBC when working directly with ResultSet. There you typically see an exception complaining about the protocol state.

I'm going to close this issue as we have now a SO post.

@mp911de mp911de closed this as completed Sep 25, 2019
@cambierr
Copy link
Author

cambierr commented Oct 1, 2019

@mp911de I know the the SO thread should centralize this discussion but since no replies are coming and even if testing a lot of stuff, I can't manage to get this working without saving everything in memory.

Regarding

Do not cross the streams

I do not agree, or not in this case to be more precise. I personally think that the notion of Flux / Stream / pipeline / ... is exactly this: stream process a set of "events", transform them, enrich them, ... and finally, in the last subscriber, save them / print them / ... whatever.

My requirement is also quite simple, I'm trying to implement some sort of "lazy loading" of a child entity and this looks like a good use-case for stream processing (read a stream of source entities, enrich them, then write them to an output stream).

After your suggestion regarding transaction management, I did a lot of tests using a non-transactional database client but can not figure out why this doesn't work... My observation is that (I use r2dbc-pool, and a flatMap with a concurrency limited to 2) a first connection is used for the initial find, while two others are used for the flatMap enrichment operations... and this works for about the 80 firsts root entites before blocking everything.

Given the fact that this works for some time, I think the overall logic is good (we use it with many other technologies such as couchbase and Cassandra) but there is probably something wrong in the connection reset or such a thing.

What are your thought on this ?

@mp911de
Copy link
Member

mp911de commented Oct 1, 2019

You need to take into account, on which connection you run queries. In the transactional context, you're operating on a single connection that maps to the I/O stream of database wire protocol frames.
Without transactions, you typically operate on multiple independent connections.

When SQL query results are not fully consumed from the connection before you issue further commands, then a lockup is almost guaranteed.

So

Do not cross the streams

Should probably become:

Do not cross the stream with itself.

From a high-level perspective, here's what happens:

  1. You run a query A (send SQL to the database)
  2. The database responds with a stream of rows for query A
  3. You get the first row emitted for query A
  4. You issue another query B using flatMap(…)
  5. Next row for query A gets emitted
  6. as in 4, query C
  7. Meanwhile, the database responds with results for queries B, C and all the following that result from the flatMap(…) looking up related rows.
  8. flatMap(…) from 4. has a limited concurrency/parallelity setting that translates into backpressure. Once the concurrency is reached, flatMap(…) awaits completion. This can only happen if query B and C complete. Since B and C can be only consumed when query A results are consumed, you experience the lockup.

Let me know if that outline makes sense.

@cambierr
Copy link
Author

cambierr commented Oct 1, 2019

This perfectly makes sense in a transactional context since a single connection is used, but what I don't get even when thinking hard about it is why this happens even without the transactional client, which means on other connections...

I'll do some tests using the 1.0 RC1 release

@mp911de mp911de removed the status: invalid An issue that we don't feel is valid label Oct 1, 2019
@mp911de mp911de reopened this Oct 1, 2019
@mp911de
Copy link
Member

mp911de commented Oct 1, 2019

Can you provide a reproducer? Without transactions, that issue should be gone. Not sure whether this is a driver of a Spring Data R2DBC issue, but we'll figure it out.

@cambierr
Copy link
Author

cambierr commented Oct 2, 2019

Well... you know what ? Using the last version (1.0 RC1) I got it working fine in a non-transactional context... and the expected blocking behavior in a transactional one.

Unless you think investigating this on a old version is meaningful, I'll just stick with the last r2dbc + spring-data-r2dbc versions :)

@mp911de
Copy link
Member

mp911de commented Oct 2, 2019

Thanks a lot for checking. Then let's close this ticket here. I'm going to change the title to reflect the transactional behavior so future users can find that issue easier.

@mp911de mp911de closed this as completed Oct 2, 2019
@mp911de mp911de added the status: invalid An issue that we don't feel is valid label Oct 2, 2019
@mp911de mp911de changed the title Backpressure not taken into account ? Nested queries get stuck when using transactions (bound single connection) Oct 2, 2019
@Lexas228
Copy link

@cambierr Hi, have u found any solution?

@cambierr
Copy link
Author

cambierr commented Oct 1, 2023

Hi @Lexas228,

I went for the simplest solution, that is basically to process my dataset by match of 10k rows... that is load 10k, buffer them, process them, again.

This is slightly against the "streaming" approach of reactor but this does the trick :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants