Skip to content

Commit

Permalink
Cleaner reenqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 24, 2024
1 parent 6904b63 commit 65fbca2
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
new SQSPublisher(name, client, getQueueUrl(name))

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new SQSSubscriber[F, T](name, client, getQueueUrl(name))
new SQSSubscriber[F, T](name, client, getQueueUrl(name), this)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, Que

class SQSSubscriber[F[_], T](
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
asyncClient: SqsAsyncClient,
getQueueUrl: F[String],
client: SQSClient[F]
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueueSubscriber[F, T] {
extends QueueSubscriber[F, T](client) {

private def getLockTTL(queueUrl: String): F[Int] =
F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
asyncClient.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
Expand All @@ -49,7 +50,7 @@ class SQSSubscriber[F[_], T](
for {
queueUrl <- getQueueUrl
lockTTL <- getLockTTL(queueUrl)
} yield new SQSPuller(queueName, client, queueUrl, lockTTL)
} yield new SQSPuller(queueName, asyncClient, queueUrl, lockTTL)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ServiceBusClient[F[_]] private (
new ServiceBusQueuePublisher[F, T](name, clientBuilder)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new ServiceBusQueueSubscriber[F, T](name, clientBuilder)
new ServiceBusQueueSubscriber[F, T](name, clientBuilder, this)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}

class ServiceBusQueueSubscriber[F[_], Data](
val queueName: String,
builder: ServiceBusClientBuilder
builder: ServiceBusClientBuilder,
client: ServiceBusClient[F]
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
extends QueueSubscriber[F, Data] {
extends QueueSubscriber[F, Data](client) {

override def puller: Resource[F, QueuePuller[F, Data]] = Resource
.fromAutoCloseable {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/com/commercetools/queue/MessageHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import cats.syntax.functor._

import scala.concurrent.duration.FiniteDuration

trait MessageHandler[F[_], T, O, D[_] <: Decision[?]] {
trait MessageHandler[F[_], T, O, D[_] <: Decision[T, ?]] {
def handle(msg: Message[F, T]): F[D[O]]
}

trait ImmediateDecisionMessageHandler[F[_], T, O] extends MessageHandler[F, T, O, ImmediateDecision] {
override def handle(msg: Message[F, T]): F[ImmediateDecision[O]]
}

sealed trait Decision[+O]
sealed trait ImmediateDecision[+O] extends Decision[O]
sealed trait Decision[+I, +O]
sealed trait ImmediateDecision[+O] extends Decision[Nothing, O]
object Decision {
case class Ok[O](res: O) extends ImmediateDecision[O]
case object Drop extends ImmediateDecision[Nothing]
case class Fail(t: Throwable, ack: Boolean) extends ImmediateDecision[Nothing]
case object DeadLetter extends Decision[Nothing]
case class Reenqueue(delay: Option[FiniteDuration]) extends Decision[Nothing]
case object DeadLetter extends Decision[Nothing, Nothing]
case class Reenqueue[+I](message: I, delay: Option[FiniteDuration]) extends Decision[I, Nothing]
}

object MessageHandler {
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package com.commercetools.queue

import cats.effect.syntax.all._
import cats.effect.syntax.all.*
import cats.effect.{Concurrent, Outcome, Resource}
import cats.syntax.all._
import cats.syntax.all.*
import fs2.{Chunk, Pull, Stream}

import scala.concurrent.duration.FiniteDuration

/**
* The base interface to subscribe to a queue.
*/
abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
abstract class QueueSubscriber[F[_], T](private val client: QueueClient[F])(implicit F: Concurrent[F]) {

/** The queue name to which this subscriber subscribes. */
def queueName: String
Expand Down Expand Up @@ -134,17 +134,15 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
*
* Messages in a batch are processed in parallel but result is emitted in order the messages were received,
* with the exclusion of the messages that have been reenqueued, dropped and DLQ'ed.
*
* @param pusher used to reenqueue messages
*/
final def process[Res, D[_] <: Decision[Res]](
final def process[Res, D[_] <: Decision[T, Res]](
batchSize: Int,
waitingTime: FiniteDuration
)(pusher: Resource[F, QueuePusher[F, T]]
)(handler: MessageHandler[F, T, Res, D]
)(implicit serializer: Serializer[T]
): Stream[F, Either[Throwable, Res]] =
Stream
.resource(pusher)
.resource(client.publish[T](queueName).pusher)
.flatMap { pusher =>
messages(batchSize, waitingTime)
.parEvalMap(batchSize) { ctx =>
Expand All @@ -154,9 +152,9 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
case Decision.Fail(t, true) => ctx.ack().as(t.asLeft.some)
case Decision.Fail(t, false) => ctx.nack().as(t.asLeft.some)
case Decision.DeadLetter => ???
case Decision.Reenqueue(delay) =>
case Decision.Reenqueue(t, delay) =>
ctx.payload
.flatMap(p => pusher.push(p, delay))
.flatMap(p => pusher.push(t, delay))
.as(none)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.commercetools.queue.testing
import cats.effect.{IO, Resource}
import com.commercetools.queue.{QueuePuller, QueueSubscriber}

class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[IO, T] {
class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[IO, T](null) { // TODO

override val queueName: String = queue.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class PubSubClient[F[_]: Async] private (
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
endpoint)
endpoint,
this
)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import cats.syntax.functor._
import cats.syntax.functor.*
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
Expand All @@ -30,11 +30,12 @@ class PubSubSubscriber[F[_], T](
subscriptionName: SubscriptionName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
endpoint: Option[String]
endpoint: Option[String],
client: PubSubClient[F]
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueueSubscriber[F, T] {
extends QueueSubscriber[F, T](client) {

override def puller: Resource[F, QueuePuller[F, T]] =
Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MeasuringQueueClient[F[_]](
new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer)
new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer, this)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package com.commercetools.queue.otel4s

import cats.effect.{Resource, Temporal}
import com.commercetools.queue.{QueuePuller, QueueSubscriber}
import com.commercetools.queue.{QueueClient, QueuePuller, QueueSubscriber}
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

class MeasuringQueueSubscriber[F[_], T](
underlying: QueueSubscriber[F, T],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
tracer: Tracer[F],
client: QueueClient[F]
)(implicit F: Temporal[F])
extends QueueSubscriber[F, T] {
extends QueueSubscriber[F, T](client) {

override def queueName: String = underlying.queueName

Expand Down

0 comments on commit 65fbca2

Please sign in to comment.