Skip to content

Commit

Permalink
Make auto-ack streams access the message
Browse files Browse the repository at this point in the history
This allows for these variant to use the metadata, without having
control over the lifecycle of the message, which is managed by the
stream itself.
  • Loading branch information
satabin committed Feb 20, 2024
1 parent 2260776 commit 7ae1046
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 36 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.effect.IO
import cats.effect.std.Random
import scala.concurrent.duration._

import de.commercetools.queue._
import com.commercetools.queue._

def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] =
Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random =>
Expand All @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// waiting max for 20 seconds
// print every received message,
// and ack automatically
.processWithAutoAck(5, 20.seconds)(IO.println(_))
.processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload))
// results are non important
.drain

Expand All @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = {
## Working with Azure Service Bus queues

```scala
import de.commercetools.queue.azure.servicebus._
import com.commercetools.queue.azure.servicebus._
import com.azure.identity.DefaultAzureCredentialBuilder

val namespace = "{namespace}.servicebus.windows.net" // your namespace
Expand All @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_))


```scala
import de.commercetools.queue.aws.sqs._
import com.commercetools.queue.aws.sqs._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/com/commercetools/queue/Message.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.commercetools.queue

import java.time.Instant

/**
* Interface to access message data received from a queue.
*/
trait Message[T] {

/**
* Unique message identifier
*/
def messageId: String

/**
* The message payload
*/
def payload: T

/**
* When the message was put into the queue.
*/
def enqueuedAt: Instant

/**
* Raw message metadata (depending on the underlying queue system).
*/
def metadata: Map[String, String]

}
23 changes: 1 addition & 22 deletions core/src/main/scala/com/commercetools/queue/MessageContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,13 @@ package com.commercetools.queue
import cats.effect.IO

import java.time
import java.time.Instant
import scala.concurrent.duration._

/**
* Interface to interact with a message received from a queue.
* The messages must be explicitly aknowledged after having been processed.
*/
trait MessageContext[T] {

/**
* Unique message identifier
*/
def messageId: String

/**
* The message payload
*/
def payload: T

/**
* When the message was put into the queue.
*/
def enqueuedAt: Instant

/**
* Raw message metadata (depending on the underlying queue system).
*/
def metadata: Map[String, String]
trait MessageContext[T] extends Message[T] {

/**
* Acknowledges the message. It will be removed from the queue, so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ trait QueueSubscriber[T] {
* Messages in a batch are processed sequentially, stopping at the first error.
* All results up to the error will be emitted downstream before failing.
*/
def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res]): Stream[IO, Res] = {
def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res])
: Stream[IO, Res] = {
// to have full control over nacking things in time after a failure, and emitting
// results up to the error, we resort to a `Pull`, which allows this fine graind control
// over pulling/emitting/failing
Expand All @@ -53,7 +54,7 @@ trait QueueSubscriber[T] {
} else {
val ctx = chunk(idx)
Pull
.eval(f(ctx.payload).guaranteeCase {
.eval(f(ctx).guaranteeCase {
case Outcome.Succeeded(_) => ctx.ack()
case _ =>
// if it was cancelled or errored, let's nack this and up to the end of the chunk
Expand Down Expand Up @@ -86,10 +87,10 @@ trait QueueSubscriber[T] {
* Messages in a batch are processed in parallel but result is emitted in
* order the messages were received.
*/
def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res])
def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res])
: Stream[IO, Either[Throwable, Res]] =
messages(batchSize, waitingTime).parEvalMap(batchSize)(ctx =>
f(ctx.payload).attempt.flatTap {
f(ctx).attempt.flatTap {
case Right(_) => ctx.ack()
case Left(_) => ctx.nack()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ class SubscriberSuite extends CatsEffectSuite {
result <- subscriber
// take all messages in one big batch
.processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m =>
IO.raiseWhen(m == "message-43")(new Exception("BOOM")).as(m))
IO.raiseWhen(m.payload == "message-43")(new Exception("BOOM")).as(m))
.attempt
.compile
.toList
} yield (messages, result))
.flatMap { case (originals, result) =>
for {
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.init.map(_.map(_.payload))), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, originals.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
Expand Down
8 changes: 4 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.effect.IO
import cats.effect.std.Random
import scala.concurrent.duration._

import de.commercetools.queue._
import com.commercetools.queue._

def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] =
Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random =>
Expand All @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// waiting max for 20 seconds
// print every received message,
// and ack automatically
.processWithAutoAck(5, 20.seconds)(IO.println(_))
.processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload))
// results are non important
.drain

Expand All @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = {
## Working with Azure Service Bus queues

```scala mdoc:compile-only
import de.commercetools.queue.azure.servicebus._
import com.commercetools.queue.azure.servicebus._
import com.azure.identity.DefaultAzureCredentialBuilder

val namespace = "{namespace}.servicebus.windows.net" // your namespace
Expand All @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_))


```scala mdoc:compile-only
import de.commercetools.queue.aws.sqs._
import com.commercetools.queue.aws.sqs._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

Expand Down

0 comments on commit 7ae1046

Please sign in to comment.