From 085ea2fee552e60a4e037780e272edbeeb65133c Mon Sep 17 00:00:00 2001 From: Stefano Ottolenghi Date: Mon, 17 Feb 2025 10:42:21 +0100 Subject: [PATCH] [Java] Add info on different reactive APIs. (#455) --- java-manual/modules/ROOT/pages/reactive.adoc | 33 +++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/java-manual/modules/ROOT/pages/reactive.adoc b/java-manual/modules/ROOT/pages/reactive.adoc index 4e5d55ef..4e5f7d63 100644 --- a/java-manual/modules/ROOT/pages/reactive.adoc +++ b/java-manual/modules/ROOT/pages/reactive.adoc @@ -1,18 +1,25 @@ = Control results flow with reactive streams +[NOTE] +==== +The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address. +For all other cases, the xref:query-simple.adoc[sync] and xref:async.adoc[async] APIs are recommended. +The async API works well with virtual threads on Java 21+. +==== + In a reactive flow, consumers dictate the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server. An example use-case is an application fetching records from a Neo4j server and doing some very time-consuming post-processing on each one. If the server were allowed to push records to the client as soon as it has them available, the client may be overflown with a lot of entries while its processing is still lagging behind. The Reactive API ensures that the receiving side is not forced to buffer arbitrary amounts of data. -The driver's reactive implementation lives in the link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/package-summary.html[`reactivestreams` sub-package] and relies on the link:https://projectreactor.io/docs/core/release/reference/[`reactor-core` package] from link:https://projectreactor.io/[Project Reactor]. +The driver provides two implementations of reactive features: -[NOTE] -==== -The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address. -For all other cases, the xref:query-simple.adoc[sync] and xref:async.adoc[async] APIs are recommended. -==== +- link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/package-summary.html[`org.neo4j.driver.reactivestreams`] uses the Reactive Streams API for Java. +It depends on the link:https://projectreactor.io/docs/core/release/reference/[`reactor-core` package] from link:https://projectreactor.io/[Project Reactor]. +- link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactive/package-summary.html[`org.neo4j.driver.reactive`] uses the native Java Flow API introduced in Java 9 (such as link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Flow.Publisher.html[`Flow.Publisher`]). + +The examples on this page use `org.neo4j.driver.reactivestreams`. == Install dependencies @@ -103,7 +110,8 @@ public class App { ---- <1> link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#usingWhen-org.reactivestreams.Publisher-java.util.function.Function-java.util.function.Function-[`Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction)`] is used to create a new session, run queries using it, and finally close it. -It will ensure the resource is alive for the time it is needed, and allows to specify the cleanup operation to undertake at the end. +It will ensure the resource is alive for the time it is needed for, and allows to specify the cleanup operation to undertake at the end. +Read more on this pattern in xref:session-creation[Always defer session creation]. <2> `.usingWhen()` takes a _resource supplier_ in the form of a `Publisher`, hence why session creation is wrapped in a link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#just-T-[`Mono.just()`] call, which spawns a `Mono` from any value. <3> The session creation is similar to the async case, and xref:transactions.adoc#_session_configuration[the same configuration methods] apply. The difference is that the first argument must be `ReactiveSession.class`, and the return value is a link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/ReactiveSession.html[`ReactiveSession`] object. @@ -115,7 +123,7 @@ The difference is that the first argument must be `ReactiveSession.class`, and t In a real application you wouldn't block but rather forward the records publisher to your framework of choice, which would process them in a meaningful way. [NOTE] -You may run several queries within the same reactive session through several calls to `executeRead/Write()` within the `workerClosure`. +You can run several queries within the same reactive session through several calls to `executeRead/Write()` within the `workerClosure`. === Implicit transaction with reactive sessions @@ -169,16 +177,17 @@ public class App { } ---- + +[#session-creation] == Always defer session creation -It's important to remember that in reactive programming *a Publisher doesn't come to life until a Subscriber attaches to it*. -A Publisher is just an abstract description of your asynchronous process, but it's only the act of subscribing that triggers the flow of data in the whole chain. +In reactive programming, *a Publisher doesn't come to life until a Subscriber attaches to it*: a Publisher is just an abstract description of your asynchronous process, but it's only the act of subscribing that triggers the flow of data in the whole chain. -For this reason, always be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain. +For this reason, be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain. Doing so may result in many open sessions, none doing work and all waiting for a Publisher to use them, potentially exhausting the number of available sessions for your application. The previous examples use `Flux.usingWhen()` to address this. -.Bad practice example -- session is created but nobody uses it +.Bad practice -- Session is created but nobody uses it [source, java] ---- ReactiveSession rxSession = driver.session(ReactiveSession.class);