Skip to content

Commit

Permalink
Process with message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 24, 2024
1 parent e1b614f commit 91f45ce
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ trait MessageHandler[F[_], T, O] {
}

sealed trait Decision[+O]
sealed trait ImmediateDecision[+O] extends Decision[O]
object Decision {
case class Ok[O](res: O) extends Decision[O]
case object Drop extends Decision[Nothing]
case class Fail(t: Throwable, ack: Boolean) extends Decision[Nothing]
case class Ok[O](res: O) extends ImmediateDecision[O]
case object Drop extends ImmediateDecision[Nothing]
case class Fail(t: Throwable, ack: Boolean) extends ImmediateDecision[Nothing]
case object DeadLetter extends Decision[Nothing]
case class Reenqueue(delay: Option[FiniteDuration]) extends Decision[Nothing]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import scala.concurrent.duration.FiniteDuration

/**
* The base interface to subscribe to a queue.
* @param pusher used to handle the messages to be `reenqueued`
*/
abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
abstract class QueueSubscriber[F[_], T](private val pusher: Resource[F, QueuePusher[F, T]])(implicit F: Concurrent[F]) {

/** The queue name to which this subscriber subscribes. */
def queueName: String
Expand Down Expand Up @@ -126,4 +127,34 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
case Left(_) => ctx.nack()
})

/**
* Processes the messages with the provided message handler.
* The messages are ack'ed, nack'ed, reenqueued or moved to a DLQ based on the decision returned from the handler.
* The stream emits results or errors down-stream and does not fail on business logic errors,
* allowing you to build error recovery logic.
*
* Messages in a batch are processed in parallel but result is emitted in order the messages were received,
* with the exclusion of the messages that have been reenqueued, dropped and DLQ'ed
*/
final def processWithHandler[Res](batchSize: Int, waitingTime: FiniteDuration)(handler: MessageHandler[F, T, Res])
: Stream[F, Either[Throwable, Res]] =
Stream
.resource(pusher)
.flatMap { pusher =>
messages(batchSize, waitingTime)
.parEvalMap(batchSize) { ctx =>
handler.handle(ctx).flatMap[Option[Either[Throwable, Res]]] {
case Decision.Ok(res) => ctx.ack().as(res.asRight.some)
case Decision.Drop => ctx.ack().as(none)
case Decision.Fail(t, true) => ctx.ack().as(t.asLeft.some)
case Decision.Fail(t, false) => ctx.nack().as(t.asLeft.some)
case Decision.DeadLetter => ???
case Decision.Reenqueue(delay) =>
ctx.payload
.flatMap(p => pusher.push(p, delay))
.as(none)
}
}
.flattenOption
}
}

0 comments on commit 91f45ce

Please sign in to comment.