Skip to content

Commit

Permalink
Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mladens committed Sep 26, 2024
1 parent 0fcc27a commit 42a8175
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.effect.syntax.concurrent.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.monadError.*
import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
Expand All @@ -29,7 +29,7 @@ import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, Re
import java.time.Instant
import scala.annotation.nowarn
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.jdk.CollectionConverters._

private class SQSPuller[F[_], T](
val queueName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.effect.syntax.concurrent.*
import cats.syntax.flatMap.*
import cats.syntax.functor.*
import cats.syntax.monadError.*
import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.jdk.CollectionConverters._

private class ServiceBusPuller[F[_], Data](
val queueName: String,
Expand Down
50 changes: 49 additions & 1 deletion docs/getting-started/subscribing.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ subscriber
}
```

For high throughput scenarios where acknowledging individual messages wouldn't be optimal, consider using `messageBatches()`.
Batching method exposes `MessageBatch` giving user control over entire batch as a whole allowing for batched acknowledgement if the implementation supports it.
Chunked messages can be accessed via `messages`.

```scala mdoc:compile-only
subscriber
.messageBatches(batchSize = 10, waitingTime = 20.seconds)
.evalMap { batch =>
batch.messages.parTraverse_ { msg =>
msg.payload.flatTap { payload =>
IO.println(s"Received $payload")
}
}.guaranteeCase {
case Outcome.Succeeded(_) => batch.ackAll()
case _ => batch.nackAll
}
}
```


### `MessageContext` control flow

There are three different methods that can be used to control the message lifecycle from the subscriber point of view:
Expand All @@ -140,6 +160,14 @@ There are three different methods that can be used to control the message lifecy
2. `MessageContext.nack()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process.
3. `MessageContext.extendLock()` extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system.

### `MessageBatch` control flow

Methods have the same semantics to `MessageContext` ones with the difference that they act on all messages from the batch at once. Whether the action is atomic across all messages depends on the underlying implementation.

1. `MessageContext.ackAll()` acknowledges the message, and marks it as successfully processed in the queue system. It will be permanently removed and no other subscriber will ever receive it.
2. `MessageContext.nackAll()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process.


## Explicit pull

If you are integrating this library with an existing code base that performs explicit pulls from the queue, you can access the @:api(QueuePuller) lower level API, which exposes ways to pull batch of messages.
Expand Down Expand Up @@ -168,7 +196,6 @@ subscriber.puller.use { queuePuller =>
}
}
}

}
```

Expand Down Expand Up @@ -208,6 +235,27 @@ subscriber.puller.use { queuePuller =>
```
@:@

To pull batches that can be acknowledged in batches, use `pullMessageBatch()`

```scala mdoc:compile-only
subscriber.puller.use { queuePuller =>

queuePuller
.pullMessageBatch(batchSize = 10, waitingTime = 20.seconds)
.flatMap { batch =>
batch.messages.traverse_ { message =>
message.payload.flatMap { payload =>
IO.println(s"Received $payload")
}.guaranteeCase {
case Outcome.Succeeded(_) => batch.ackAll()
case _ => batch.nackAll()
}
}
}

}
```

[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource
[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor
[doc-deserializer]: serialization.md#data-deserializer
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.Async
import cats.effect.syntax.concurrent.*
import cats.syntax.all.*
import cats.effect.syntax.concurrent._
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.retrying.RetrySettings
Expand All @@ -30,7 +30,7 @@ import org.threeten.bp.Duration

import java.time
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.jdk.CollectionConverters._

private class PubSubPuller[F[_], T](
val queueName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.commercetools.queue.testing

import cats.effect.IO
import cats.syntax.foldable.*
import cats.syntax.foldable._
import com.commercetools.queue.{Message, MessageBatch, MessageContext, QueuePuller, UnsealedMessageBatch, UnsealedQueuePuller}
import fs2.Chunk

Expand Down

0 comments on commit 42a8175

Please sign in to comment.