From f1dab33125c838b8a2953e8ec13ceb0091885294 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 20 Dec 2024 15:36:14 +0100 Subject: [PATCH] Add doc for pausable channels --- docs/src/main/asciidoc/messaging.adoc | 74 +++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/docs/src/main/asciidoc/messaging.adoc b/docs/src/main/asciidoc/messaging.adoc index e7cc35d6d2b36..478123e5f3a0a 100644 --- a/docs/src/main/asciidoc/messaging.adoc +++ b/docs/src/main/asciidoc/messaging.adoc @@ -356,6 +356,80 @@ public class MyProfileBean { } ---- +==== Pausable Channels + +Injected `@Channel` streams are not subscribed to by default, so the flow of messages is controlled by the application code using reactive streams and Mutiny APIs. +But for `@Incoming` methods, the flow of messages is controlled by the runtime. + +Pausable channels provide a mechanism to control message flow programmatically. +This is useful in scenarios where producers or consumers need to stop temporarily due to managing the lifecycle or performing maintenance operations. + +To use pausable channels, you need to activate it with the configuration property `pausable` set to `true`. + +[source, properties] +---- +mp.messaging.incoming.my-channel.pausable=true +# optional, by default the channel is NOT paused initially +mp.messaging.outgoing.my-channel.initially-paused=true +---- + +If a channel is configured to be pausable, you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically, and pause or resume the channel as needed: + +[source, java] +---- +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PausableChannel; + +@ApplicationScoped +public class PausableController { + + @Inject + ChannelRegistry registry; + + @PostConstruct + public void resume() { + // Wait for the application to be ready + // Retrieve the pausable channel + PausableChannel pausable = registry.getPausable("my-channel"); + // Pause the processing of the messages + pausable.resume(); + } + + public void pause() { + // Retrieve the pausable channel + PausableChannel pausable = registry.getPausable("my-channel"); + // Pause the processing of the messages + pausable.pause(); + } + + @Incoming("my-channel") + void process(String message) { + // Process the message + } + +} +---- + +This feature is independent of connectors and can be in theory used with channels backed by any connector. +Note that pausing message consumption applies back-pressure on the underlying consumer which receives messages from the remote broker. + +[NOTE] +==== +Kafka consumers provide a similar feature to pause and resume the consumption of messages from topic-partitions. +The Quarkus Kafka connector allows xref:kafka.adoc#kafka-bare-clients[access to the underlying client] to pause/resume the consumption. + +However, by default, with the `pause-if-no-requests=true` configuration, +the connector handles automatically the back-pressure, +by the pausing and resuming the Kafka consumer based on downstream requests. +It is therefore recommended to use pausable channels with the default `pause-if-no-requests=true` configuration. +==== + ==== Multiple Outgoings and `@Broadcast` By default, messages transmitted in a channel are only dispatched to a single consumer.