You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:
DataSource ds = DataSourceFactory.newFactory("org.postgresql.adba.PgDataSourceFactory")
.builder()
.url("jdbc:postgresql://localhost:5432/dvdrental")
.username("postgres")
.password("mysecretpassword")
.build();
CompletableFuture<String> result = new CompletableFuture<>();
Session session = ds.getSession();
// Very basic SubmissionPublisher (and a Flow.Processor), so I can expose the result as a publisher
RowProcessor rp = new RowProcessor();
final CompletableFuture<String> completableFuture = session.<String>rowPublisherOperation("select title from film limit 1000")
.subscribe(rp, result)
.submit()
.getCompletionStage()
.toCompletableFuture();
// Convert to RxJava2, because I'm most confortable with that
long count = FlowInterop.fromFlowPublisher(rp)
.doOnTerminate(()->session.close())
.map(e->(String)e.at("title").get())
.doOnNext(e->System.err.println("Title: "+e))
.count()
.blockingGet();
System.err.println("Count: "+count);
Where the rowprocessor is this:
public class RowProcessor extends SubmissionPublisher<Result.RowColumn> implements Flow.Processor<Result.RowColumn,Result.RowColumn> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(RowColumn item) {
submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.err.println("completed");
close();
}
}
Now what happens is this: If I limit to about 500 it works fine.
select title from film limit 500
results in:
completed
Count: 500
All good. If I increase to 900 I see this:
completed
Count: 895
Along with 5 IllegalStateExceptions:
java.lang.IllegalStateException: failed to offer item to subscriber
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.lambda$0(ProcessorSubmission.java:82)
at java.base/java.util.concurrent.SubmissionPublisher.retryOffer(SubmissionPublisher.java:445)
at java.base/java.util.concurrent.SubmissionPublisher.doOffer(SubmissionPublisher.java:422)
at java.base/java.util.concurrent.SubmissionPublisher.offer(SubmissionPublisher.java:550)
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.addRow(ProcessorSubmission.java:81)
at org.postgresql.adba/org.postgresql.adba.communication.network.Portal.addDataRow(Portal.java:152)
at org.postgresql.adba/org.postgresql.adba.communication.network.ExecuteResponse.read(ExecuteResponse.java:31)
at org.postgresql.adba/org.postgresql.adba.communication.NetworkConnection.handleRead(NetworkConnection.java:407)
at org.postgresql.adba/org.postgresql.adba.execution.DefaultNioLoop.run(DefaultNioLoop.java:127)
at java.base/java.lang.Thread.run(Thread.java:844)
If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.
The text was updated successfully, but these errors were encountered:
Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:
Where the rowprocessor is this:
Now what happens is this: If I limit to about 500 it works fine.
results in:
All good. If I increase to 900 I see this:
Along with 5 IllegalStateExceptions:
If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.
The text was updated successfully, but these errors were encountered: