diff --git a/core/src/main/scala/com/commercetools/queue/ErrorHandler.scala b/core/src/main/scala/com/commercetools/queue/ErrorHandler.scala index 525b47b..b22068c 100644 --- a/core/src/main/scala/com/commercetools/queue/ErrorHandler.scala +++ b/core/src/main/scala/com/commercetools/queue/ErrorHandler.scala @@ -27,10 +27,11 @@ trait MessageHandler[F[_], T, O] { } sealed trait Decision[+O] +sealed trait ImmediateDecision[+O] extends Decision[O] object Decision { - case class Ok[O](res: O) extends Decision[O] - case object Drop extends Decision[Nothing] - case class Fail(t: Throwable, ack: Boolean) extends Decision[Nothing] + case class Ok[O](res: O) extends ImmediateDecision[O] + case object Drop extends ImmediateDecision[Nothing] + case class Fail(t: Throwable, ack: Boolean) extends ImmediateDecision[Nothing] case object DeadLetter extends Decision[Nothing] case class Reenqueue(delay: Option[FiniteDuration]) extends Decision[Nothing] } diff --git a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala index eeac65c..d3dc148 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala @@ -25,8 +25,9 @@ import scala.concurrent.duration.FiniteDuration /** * The base interface to subscribe to a queue. + * @param pusher used to handle the messages to be `reenqueued` */ -abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) { +abstract class QueueSubscriber[F[_], T](private val pusher: Resource[F, QueuePusher[F, T]])(implicit F: Concurrent[F]) { /** The queue name to which this subscriber subscribes. */ def queueName: String @@ -126,4 +127,34 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) { case Left(_) => ctx.nack() }) + /** + * Processes the messages with the provided message handler. + * The messages are ack'ed, nack'ed, reenqueued or moved to a DLQ based on the decision returned from the handler. + * The stream emits results or errors down-stream and does not fail on business logic errors, + * allowing you to build error recovery logic. + * + * Messages in a batch are processed in parallel but result is emitted in order the messages were received, + * with the exclusion of the messages that have been reenqueued, dropped and DLQ'ed + */ + final def processWithHandler[Res](batchSize: Int, waitingTime: FiniteDuration)(handler: MessageHandler[F, T, Res]) + : Stream[F, Either[Throwable, Res]] = + Stream + .resource(pusher) + .flatMap { pusher => + messages(batchSize, waitingTime) + .parEvalMap(batchSize) { ctx => + handler.handle(ctx).flatMap[Option[Either[Throwable, Res]]] { + case Decision.Ok(res) => ctx.ack().as(res.asRight.some) + case Decision.Drop => ctx.ack().as(none) + case Decision.Fail(t, true) => ctx.ack().as(t.asLeft.some) + case Decision.Fail(t, false) => ctx.nack().as(t.asLeft.some) + case Decision.DeadLetter => ??? + case Decision.Reenqueue(delay) => + ctx.payload + .flatMap(p => pusher.push(p, delay)) + .as(none) + } + } + .flattenOption + } }