Skip to content

Commit

Permalink
Add documentation about receiving data
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Apr 29, 2024
1 parent 3d45534 commit e7457e7
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/getting-started/directory.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ laika.navigationOrder = [
publishing.md
subscribing.md
administration.md
serialization.md
]
11 changes: 7 additions & 4 deletions docs/getting-started/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

Publishing data is made using a @:api(com.commercetools.queue.QueuePublisher). You can acquire one through a @:api(com.commercetools.queue.QueueClient) by using the `publish()` method. A `QueuePublisher` is associated to a specific queue, which is provided when creating the publisher.

The publisher also requires a [data serializer][doc-serializer] upon creation for the type of data you want to publish to it.

```scala mdoc
import cats.effect.IO

Expand Down Expand Up @@ -39,8 +41,8 @@ Several `Stream`s can safely publish to the same sink concurrently, so you can r

## Explicit publish

If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes a way to publish a single message or a single batch.
This abstraction comes in handy when the messages you produces do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above.
If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes ways to publish a single message or a single batch.
This abstraction comes in handy when the messages you produce do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above.

A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly.

Expand All @@ -60,8 +62,8 @@ publisher.pusher.use { queuePusher =>
```

@:callout(warning)
Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using the a closed resource after the `use` block returns.
If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested withing the `queuePusher` one.
Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using a closed resource after the `use` block returns.
If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested within the `queuePusher` one.

```scala mdoc:compile-only
import cats.effect.std.Supervisor
Expand All @@ -85,3 +87,4 @@ publisher.pusher.use { queuePusher =>

[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource
[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor
[doc-serializer]: serialization.md#data-serializer
6 changes: 6 additions & 0 deletions docs/getting-started/serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% nav = true %}
# Data Serialization

## Data `Serializer`

## Data `Deserializer`
156 changes: 156 additions & 0 deletions docs/getting-started/subscribing.md
Original file line number Diff line number Diff line change
@@ -1 +1,157 @@
{% nav = true %}
# Receiving Data

Receiving data is achieved through a @:api(com.commercetools.queue.QueueSubscriber). You can acquire one throuh a @:api(com.commercetools.queue.QueueClient) by using the `subscribe()` method. A `QueueSubscriber` is associated with a specific queue, which is provided when creating the subscriber.

The subscriber also requires a [data deserializer][doc-deserializer] upon creation for the of data you want to receive from the queue.

```scala mdoc
import cats.effect.IO

import com.commercetools.queue.{QueueClient, QueueSubscriber}

def client: QueueClient[IO] = ???

// returns a subscriber for the queue named `my-queue`,
// which can receive messages of type String
def subscriber: QueueSubscriber[IO, String] =
client.subscribe[String]("my-queue")
```

When a message is received by a subscriber, it is _locked_ (or _leased_) for a queue level configured amount of time (see more on the [Managing Queues] page). This means that only one subscriber receives (and can process) a given message in this amount of time. A message needs to be settled once processed (or if processing fails) to ensure it is either removed from the queue or made available again. This is part of the message control flow.

In the following, we explain what kind of control flow handling is provided by the library.

## Processors

The @:api(com.commercetools.queue.QueueSubscriber) abstraction provides a `processWithAutoAck()` method, which automatically handles the control flow part for you. You only need to provide the processing function, allowing you to focus on your business logic.

```scala mdoc:compile-only
import scala.concurrent.duration._

subscriber.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds) { message =>
IO.println(s"Received ${message.payload}").as(message.messageId)
}
```

The processing function receives a @:api(com.commercetools.queue.Message), which gives access to the content and some metadata of the received messages.

The result is a `Stream` of the processing results, emitted in the order the messages where received. Only the successfully processed messages are emitted down-stream. The stream is failed upon the first failed processing.

The `processWithAutoAck` method performs automatic acking/nacking for you depending on the processing outcome. It comes in handy to implement at-most once delivery startegies, releasing the message to be reprocessed by another subscriber in case of error.

If you wish to implement a stream that does not fail upon error, you can use the `attemptProcessWithAutoAck()` methods, which emits the results of the processing as an `Either[Throwable, T]`. The resulting stream does not fail if some processing fails. Otherwise it has the same behavior as the stream above.

## Raw message stream

If you want more fine-grained tuning of the control flow part, you can resort to the `messages()` stream available via the `QueueSubscriber`.
The stream emits a @:api(com.commercetools.queue.MessageContext) for each received message, giving access to the message content and metadata, as well as to message control methods.

Using this stream, you can implement your processing strategy.

@:callout(warning)
Using this stream, you need to explicitly settle the message (either by acking or nacking it) once processed. Failing to do so, results in the message being made available to other subscribers once the lock expires.

It is up to you to ensure that all control flow paths lead to explicit settlement of the messages you received.

The recommendation is to only resort to this stream if you need to implement complex custom control flow for messages.
@:@

A simplified `processWithAutoAck` method described above could be implemented this way.

@:callout(info)
The real implementation is more complex to ensure that all successfully processed messages are actually emitted down-stream and is more efficient.
@:@

```scala mdoc:compile-only
import cats.effect.Outcome

import scala.concurrent.duration._

subscriber
.messages(batchSize = 10, waitingTime = 20.seconds)
.evalMap { context =>
IO.println(s"Received ${context.payload}")
.as(context.messageId)
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
}
```

### `MessageContext` control flow

There are three different methods that can be used to control the message lifecycle from the subscriber point of view:

1. `MessageContext.ack()` acknowledges the message, and marks it as successfully processed in the queue system. It will be permanently removed and no other subscriber will ever receive it.
2. `MessageContext.nack()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process.
3. `MessageContext.extendLock()` extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system.

## Explicit pull

If you are integrated the library with an existing code base that performs explicit pulls from the queue, you can access the @:api(com.commercetools.queue.QueuePuller) lower level API, which exposes ways to pull batch of messages.
This abstraction comes in handy when your processing code is based on a callback approach and is not implemented as a `Stream`, otherwise you should prefer the streams presented above.

A `QueuePuller` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly.

```scala mdoc:compile-only
import cats.effect.Outcome

import scala.concurrent.duration._

import cats.syntax.foldable._

subscriber.puller.use { queuePuller =>

queuePuller
.pullBatch(batchSize = 10, waitingTime = 20.seconds)
.flatMap { chunk =>
chunk.traverse_ { context =>
IO.println(s"Received ${context.payload}").guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
}
}

}
```

@:callout(warning)
Make sure that `IO`s pulling from the `queuePuller` do not outlive the `use` scope, otherwise you will be using a closed resource after the `use` block returns.
If you need to spawn background fibers using the `queuePuller`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested within the `queuePuller` one.

```scala mdoc:compile-only
import cats.effect.Outcome
import cats.effect.std.Supervisor

import scala.concurrent.duration._

import cats.syntax.foldable._

subscriber.puller.use { queuePuller =>
// create a supervisor that waits for supervised spawn fibers
// to finish before being released
Supervisor[IO](await = true).use { supervisor =>
queuePuller
.pullBatch(batchSize = 10, waitingTime = 20.seconds)
.flatMap { chunk =>
chunk.traverse_ { context =>
supervisor.supervise {
IO.println(s"Received ${context.payload}")
.guaranteeCase {
case Outcome.Succeeded(_) => context.ack()
case _ => context.nack()
}
}
}
}
}
}
```
@:@

[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource
[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor
[doc-deserializer]: serialization.md#data-deserializer

0 comments on commit e7457e7

Please sign in to comment.