Skip to content

Commit

Permalink
[Java] Add info on different reactive APIs. (#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefano-ottolenghi committed Feb 17, 2025
1 parent 0c2cc3d commit 085ea2f
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions java-manual/modules/ROOT/pages/reactive.adoc
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
= Control results flow with reactive streams

[NOTE]
====
The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address.
For all other cases, the xref:query-simple.adoc[sync] and xref:async.adoc[async] APIs are recommended.
The async API works well with virtual threads on Java 21+.
====

In a reactive flow, consumers dictate the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server.

An example use-case is an application fetching records from a Neo4j server and doing some very time-consuming post-processing on each one.
If the server were allowed to push records to the client as soon as it has them available, the client may be overflown with a lot of entries while its processing is still lagging behind.
The Reactive API ensures that the receiving side is not forced to buffer arbitrary amounts of data.

The driver's reactive implementation lives in the link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/package-summary.html[`reactivestreams` sub-package] and relies on the link:https://projectreactor.io/docs/core/release/reference/[`reactor-core` package] from link:https://projectreactor.io/[Project Reactor].
The driver provides two implementations of reactive features:

[NOTE]
====
The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address.
For all other cases, the xref:query-simple.adoc[sync] and xref:async.adoc[async] APIs are recommended.
====
- link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/package-summary.html[`org.neo4j.driver.reactivestreams`] uses the Reactive Streams API for Java.
It depends on the link:https://projectreactor.io/docs/core/release/reference/[`reactor-core` package] from link:https://projectreactor.io/[Project Reactor].
- link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactive/package-summary.html[`org.neo4j.driver.reactive`] uses the native Java Flow API introduced in Java 9 (such as link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Flow.Publisher.html[`Flow.Publisher`]).
The examples on this page use `org.neo4j.driver.reactivestreams`.


== Install dependencies
Expand Down Expand Up @@ -103,7 +110,8 @@ public class App {
----

<1> link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#usingWhen-org.reactivestreams.Publisher-java.util.function.Function-java.util.function.Function-[`Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction)`] is used to create a new session, run queries using it, and finally close it.
It will ensure the resource is alive for the time it is needed, and allows to specify the cleanup operation to undertake at the end.
It will ensure the resource is alive for the time it is needed for, and allows to specify the cleanup operation to undertake at the end.
Read more on this pattern in xref:session-creation[Always defer session creation].
<2> `.usingWhen()` takes a _resource supplier_ in the form of a `Publisher`, hence why session creation is wrapped in a link:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#just-T-[`Mono.just()`] call, which spawns a `Mono` from any value.
<3> The session creation is similar to the async case, and xref:transactions.adoc#_session_configuration[the same configuration methods] apply.
The difference is that the first argument must be `ReactiveSession.class`, and the return value is a link:https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/reactivestreams/ReactiveSession.html[`ReactiveSession`] object.
Expand All @@ -115,7 +123,7 @@ The difference is that the first argument must be `ReactiveSession.class`, and t
In a real application you wouldn't block but rather forward the records publisher to your framework of choice, which would process them in a meaningful way.

[NOTE]
You may run several queries within the same reactive session through several calls to `executeRead/Write()` within the `workerClosure`.
You can run several queries within the same reactive session through several calls to `executeRead/Write()` within the `workerClosure`.

=== Implicit transaction with reactive sessions

Expand Down Expand Up @@ -169,16 +177,17 @@ public class App {
}
----


[#session-creation]
== Always defer session creation

It's important to remember that in reactive programming *a Publisher doesn't come to life until a Subscriber attaches to it*.
A Publisher is just an abstract description of your asynchronous process, but it's only the act of subscribing that triggers the flow of data in the whole chain.
In reactive programming, *a Publisher doesn't come to life until a Subscriber attaches to it*: a Publisher is just an abstract description of your asynchronous process, but it's only the act of subscribing that triggers the flow of data in the whole chain.

For this reason, always be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain.
For this reason, be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain.
Doing so may result in many open sessions, none doing work and all waiting for a Publisher to use them, potentially exhausting the number of available sessions for your application.
The previous examples use `Flux.usingWhen()` to address this.

.Bad practice example -- session is created but nobody uses it
.Bad practice -- Session is created but nobody uses it
[source, java]
----
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Expand Down

0 comments on commit 085ea2f

Please sign in to comment.