From 7ae10466a7ec0ad79cbc260aeacabe104fb200db Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 20 Feb 2024 09:22:32 +0100 Subject: [PATCH] Make auto-ack streams access the message This allows for these variant to use the metadata, without having control over the lifecycle of the message, which is managed by the stream itself. --- README.md | 8 ++--- .../com/commercetools/queue/Message.scala | 30 +++++++++++++++++++ .../commercetools/queue/MessageContext.scala | 23 +------------- .../commercetools/queue/QueueSubscriber.scala | 9 +++--- .../commercetools/queue/SubscriberSuite.scala | 4 +-- docs/README.md | 8 ++--- 6 files changed, 46 insertions(+), 36 deletions(-) create mode 100644 core/src/main/scala/com/commercetools/queue/Message.scala diff --git a/README.md b/README.md index 17bcd43..eee0c26 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ import cats.effect.IO import cats.effect.std.Random import scala.concurrent.duration._ -import de.commercetools.queue._ +import com.commercetools.queue._ def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] = Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random => @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] = // waiting max for 20 seconds // print every received message, // and ack automatically - .processWithAutoAck(5, 20.seconds)(IO.println(_)) + .processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload)) // results are non important .drain @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = { ## Working with Azure Service Bus queues ```scala -import de.commercetools.queue.azure.servicebus._ +import com.commercetools.queue.azure.servicebus._ import com.azure.identity.DefaultAzureCredentialBuilder val namespace = "{namespace}.servicebus.windows.net" // your namespace @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_)) ```scala -import de.commercetools.queue.aws.sqs._ +import com.commercetools.queue.aws.sqs._ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider diff --git a/core/src/main/scala/com/commercetools/queue/Message.scala b/core/src/main/scala/com/commercetools/queue/Message.scala new file mode 100644 index 0000000..2dcbc82 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/Message.scala @@ -0,0 +1,30 @@ +package com.commercetools.queue + +import java.time.Instant + +/** + * Interface to access message data received from a queue. + */ +trait Message[T] { + + /** + * Unique message identifier + */ + def messageId: String + + /** + * The message payload + */ + def payload: T + + /** + * When the message was put into the queue. + */ + def enqueuedAt: Instant + + /** + * Raw message metadata (depending on the underlying queue system). + */ + def metadata: Map[String, String] + +} diff --git a/core/src/main/scala/com/commercetools/queue/MessageContext.scala b/core/src/main/scala/com/commercetools/queue/MessageContext.scala index c198e7b..faa16c6 100644 --- a/core/src/main/scala/com/commercetools/queue/MessageContext.scala +++ b/core/src/main/scala/com/commercetools/queue/MessageContext.scala @@ -3,34 +3,13 @@ package com.commercetools.queue import cats.effect.IO import java.time -import java.time.Instant import scala.concurrent.duration._ /** * Interface to interact with a message received from a queue. * The messages must be explicitly aknowledged after having been processed. */ -trait MessageContext[T] { - - /** - * Unique message identifier - */ - def messageId: String - - /** - * The message payload - */ - def payload: T - - /** - * When the message was put into the queue. - */ - def enqueuedAt: Instant - - /** - * Raw message metadata (depending on the underlying queue system). - */ - def metadata: Map[String, String] +trait MessageContext[T] extends Message[T] { /** * Acknowledges the message. It will be removed from the queue, so that diff --git a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala index ff8fe95..f7f8041 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala @@ -41,7 +41,8 @@ trait QueueSubscriber[T] { * Messages in a batch are processed sequentially, stopping at the first error. * All results up to the error will be emitted downstream before failing. */ - def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res]): Stream[IO, Res] = { + def processWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res]) + : Stream[IO, Res] = { // to have full control over nacking things in time after a failure, and emitting // results up to the error, we resort to a `Pull`, which allows this fine graind control // over pulling/emitting/failing @@ -53,7 +54,7 @@ trait QueueSubscriber[T] { } else { val ctx = chunk(idx) Pull - .eval(f(ctx.payload).guaranteeCase { + .eval(f(ctx).guaranteeCase { case Outcome.Succeeded(_) => ctx.ack() case _ => // if it was cancelled or errored, let's nack this and up to the end of the chunk @@ -86,10 +87,10 @@ trait QueueSubscriber[T] { * Messages in a batch are processed in parallel but result is emitted in * order the messages were received. */ - def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: T => IO[Res]) + def attemptProcessWithAutoAck[Res](batchSize: Int, waitingTime: FiniteDuration)(f: Message[T] => IO[Res]) : Stream[IO, Either[Throwable, Res]] = messages(batchSize, waitingTime).parEvalMap(batchSize)(ctx => - f(ctx.payload).attempt.flatTap { + f(ctx).attempt.flatTap { case Right(_) => ctx.ack() case Left(_) => ctx.nack() }) diff --git a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala index 7a32b80..a3ebec7 100644 --- a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala +++ b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala @@ -64,7 +64,7 @@ class SubscriberSuite extends CatsEffectSuite { result <- subscriber // take all messages in one big batch .processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m => - IO.raiseWhen(m == "message-43")(new Exception("BOOM")).as(m)) + IO.raiseWhen(m.payload == "message-43")(new Exception("BOOM")).as(m)) .attempt .compile .toList @@ -72,7 +72,7 @@ class SubscriberSuite extends CatsEffectSuite { .flatMap { case (originals, result) => for { // check that all messages were consumed up to message #43 - _ <- assertIO(IO.pure(result.init), originals.take(43).map(m => Right(m.payload))) + _ <- assertIO(IO.pure(result.init.map(_.map(_.payload))), originals.take(43).map(m => Right(m.payload))) _ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM")) _ <- assertIO(queue.getAvailableMessages, originals.drop(43)) _ <- assertIO(queue.getLockedMessages, Nil) diff --git a/docs/README.md b/docs/README.md index 83fd46b..87e2456 100644 --- a/docs/README.md +++ b/docs/README.md @@ -22,7 +22,7 @@ import cats.effect.IO import cats.effect.std.Random import scala.concurrent.duration._ -import de.commercetools.queue._ +import com.commercetools.queue._ def publishStream(publisher: QueuePublisher[String]): Stream[IO, Nothing] = Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random => @@ -41,7 +41,7 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] = // waiting max for 20 seconds // print every received message, // and ack automatically - .processWithAutoAck(5, 20.seconds)(IO.println(_)) + .processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload)) // results are non important .drain @@ -61,7 +61,7 @@ def program(client: QueueClient): IO[Unit] = { ## Working with Azure Service Bus queues ```scala mdoc:compile-only -import de.commercetools.queue.azure.servicebus._ +import com.commercetools.queue.azure.servicebus._ import com.azure.identity.DefaultAzureCredentialBuilder val namespace = "{namespace}.servicebus.windows.net" // your namespace @@ -74,7 +74,7 @@ ServiceBusClient(namespace, credentials).use(program(_)) ```scala mdoc:compile-only -import de.commercetools.queue.aws.sqs._ +import com.commercetools.queue.aws.sqs._ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider