Skip to content

Commit

Permalink
Add support for dead letter queues
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed May 27, 2024
1 parent 46f5792 commit 80372bb
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,85 @@ import cats.syntax.functor._
import cats.syntax.functorFilter._
import cats.syntax.monadError._
import cats.syntax.option._
import cats.syntax.traverse._
import com.commercetools.queue.aws.sqs.makeQueueException
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException}
import com.commercetools.queue.{DeadletterQueueConfiguration, MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueCreationConfiguration, QueueDoesNotExistException}
import software.amazon.awssdk.protocols.jsoncore.JsonNodeParser
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
class SQSAdministration[F[_]](
client: SqsAsyncClient,
getQueueUrl: String => F[String],
makeDQLName: String => String
)(implicit F: Async[F])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
/**
* In SQS, a dead letter queue is a standard queue with a `RedriveAllowPolicy` attribute.
* Its ARN will be referenced in the attributes of the source queue, so it is returned
* after creation.
*/
private def createDeadLetterQueue(baseName: String): F[String] = {
val dlqName = makeDQLName(baseName)
F.fromCompletableFuture {
F.delay {
client.createQueue(
CreateQueueRequest
.builder()
.queueName(name)
.queueName(dlqName)
.attributes(Map(
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> messageTTL.toSeconds.toString(),
QueueAttributeName.VISIBILITY_TIMEOUT -> lockTTL.toSeconds.toString()).asJava)
QueueAttributeName.REDRIVE_ALLOW_POLICY -> """{"redrivePermission": "allowAll"}""",
// SQS has a maximum retention period of 14 days
// see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> 14.days.toSeconds.toString()
).asJava)
.build())
}
}.void
.adaptError(makeQueueException(_, name))
}.flatMap { response =>
F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(response.queueUrl())
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build())
}
}.flatMap { response =>
val arn = response.attributes().get(QueueAttributeName.QUEUE_ARN)
F.raiseWhen(arn == null)(
MalformedQueueConfigurationException(dlqName, QueueAttributeName.QUEUE_ARN.toString(), "<missing>"))
.as(arn)
}
}
}

override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] =
configuration.deadletter
.traverse(maxAttempts => createDeadLetterQueue(name).map(_ -> maxAttempts))
.flatMap { dlq =>
F.fromCompletableFuture {
F.delay {
client.createQueue(
CreateQueueRequest
.builder()
.queueName(name)
.attributes(Map(
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> Some(configuration.messageTTL.toSeconds.toString()),
QueueAttributeName.VISIBILITY_TIMEOUT -> Some(configuration.lockTTL.toSeconds.toString()),
QueueAttributeName.REDRIVE_POLICY -> dlq.map { case (dlqArn, maxAttempts) =>
s"""{"deadLetterTargetArn":"$dlqArn","maxReceiveCount":$maxAttempts}"""
}
).flattenOption.asJava)
.build())
}
}.void
.adaptError(makeQueueException(_, name))
}

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] =
getQueueUrl(name)
Expand Down Expand Up @@ -77,7 +131,10 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.MESSAGE_RETENTION_PERIOD, QueueAttributeName.VISIBILITY_TIMEOUT)
.attributeNames(
QueueAttributeName.MESSAGE_RETENTION_PERIOD,
QueueAttributeName.VISIBILITY_TIMEOUT,
QueueAttributeName.REDRIVE_POLICY)
.build())
}
}
Expand All @@ -101,7 +158,14 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
ttl.toIntOption
.map(_.seconds)
.liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", ttl)))
} yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
deadletter <- attributes.get(QueueAttributeName.REDRIVE_POLICY).traverse { policy =>
for {
bag <- F.delay(JsonNodeParser.create().parse(policy))
dlq <- F.delay(bag.field("deadLetterTargetArn").get().asString().split(":").last)
maxAttempts <- F.delay(bag.field("maxReceiveCount").get().asNumber().toInt)
} yield DeadletterQueueConfiguration(dlq, maxAttempts)
}
} yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter)
}
.adaptError(makeQueueException(_, name))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest

import java.net.URI

class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends QueueClient[F] {
class SQSClient[F[_]] private (client: SqsAsyncClient, makeDLQName: String => String)(implicit F: Async[F])
extends QueueClient[F] {

private def getQueueUrl(name: String): F[String] =
F.fromCompletableFuture {
Expand All @@ -39,7 +40,7 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
.adaptError(makeQueueException(_, name))

override def administration: QueueAdministration[F] =
new SQSAdministration(client, getQueueUrl(_))
new SQSAdministration(client, getQueueUrl(_), makeDLQName)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new SQSPublisher(name, client, getQueueUrl(name))
Expand All @@ -51,9 +52,26 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext

object SQSClient {

private def defaultMakeDLQName(name: String): String =
s"$name-dlq"

/**
* Creates a new [[SQSClient]].
*
* @param region the region to use
* @param credentials the credentials to use
* @param makeDLQName how the dead letter queue name is derived from the queue name
* by default it suffixes the queue name with `-dlq`
* @param httpClient the existing HTTP client to use.
* '''Note:''' if provided, it is not closed when resource is released,
* otherwise the client manages its own client and will close it when
* released
* @param endpoint the service endpoint to use.
*/
def apply[F[_]](
region: Region,
credentials: AwsCredentialsProvider,
makeDLQName: String => String = defaultMakeDLQName,
endpoint: Option[URI] = None,
httpClient: Option[SdkAsyncHttpClient] = None
)(implicit F: Async[F]
Expand All @@ -71,6 +89,6 @@ object SQSClient {
builder.build()
}
}
.map(new SQSClient(_))
.map(new SQSClient(_, makeDLQName))

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,34 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import com.commercetools.queue.{DeadletterQueueConfiguration, QueueAdministration, QueueConfiguration, QueueCreationConfiguration}

import java.time.Duration
import scala.concurrent.duration._

class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.blocking(
override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] =
F.blocking {
val options = new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(configuration.messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(configuration.lockTTL.toMillis))
configuration.deadletter match {
case Some(configuration) =>
options
.setMaxDeliveryCount(configuration.maxAttempts)
.setDeadLetteringOnMessageExpiration(true)
case None =>
options
.setMaxDeliveryCount(Int.MaxValue)
.setDeadLetteringOnMessageExpiration(false)
}
client.createQueue(
name,
new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis))))
.void
options
)
}.void
.adaptError(makeQueueException(_, name))

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] =
Expand All @@ -45,15 +57,19 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp
messageTTL.foreach(ttl => properties.setDefaultMessageTimeToLive(Duration.ofMillis(ttl.toMillis)))
lockTTL.foreach(ttl => properties.setLockDuration(Duration.ofMillis(ttl.toMillis)))
val _ = client.updateQueue(properties)
}
}.adaptError(makeQueueException(_, name))

override def configuration(name: String): F[QueueConfiguration] =
F.blocking {
val properties = client.getQueue(name)
val messageTTL = properties.getDefaultMessageTimeToLive().toMillis.millis
val lockTTL = properties.getLockDuration().toMillis.millis
QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
}
val deadletter =
Option.when(properties.isDeadLetteringOnMessageExpiration())(
DeadletterQueueConfiguration(properties.getForwardDeadLetteredMessagesTo(), properties.getMaxDeliveryCount()))

QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter = deadletter)
}.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
F.blocking(client.deleteQueue(name))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.commercetools.queue

/**
* @param name the name of the deadletter queue
* @param maxAttempts the maximum number of delivery attempts made before forwarding a message to the dead letter queue
*/
final case class DeadletterQueueConfiguration(name: String, maxAttempts: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.commercetools.queue

final case class DeadletterQueueCreationConfiguration(maxAttempts: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import scala.concurrent.duration.FiniteDuration
*/
trait QueueAdministration[F[_]] {

/** Creates a queue with the given name, message TTL and lock TTL. */
def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit]
/**
* Creates a queue with the given name and configuration.
* If the configuration contains a `deadletter` element, a dead letter
* queue is created and associated to the main queue with the configured
* maximum delivery attempt.
*/
def create(name: String, configuration: QueueCreationConfiguration): F[Unit]

/**
* Updates the queue with the given name, with provided message TTL and/or lock TTL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ package com.commercetools.queue

import scala.concurrent.duration.FiniteDuration

final case class QueueConfiguration(messageTTL: FiniteDuration, lockTTL: FiniteDuration)
/**
* @param messageTTL the time a message is guaranteed to stay in the queue before being discarded by the underlying system
* @param lockTTL the time a message is locked (or leased) upon reception by a subscriber before being eligible to redelivery
* @param deadletter the dead-letter queue configuration if any
*/
final case class QueueConfiguration(
messageTTL: FiniteDuration,
lockTTL: FiniteDuration,
deadletter: Option[DeadletterQueueConfiguration])
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.commercetools.queue

import scala.concurrent.duration.FiniteDuration

/**
* Configuration provided upon queue creation.
*
* @param messageTTL the time a message is kept in the queue before being discarded
* @param lockTTL the time a message is locked (or leased) after having been delivered
* to a consumer and before being made available to other again
* @param deadletter whether to create a dead letter queue associated to this queue
* with the configured amount of delivery tries before moving a message
* to the dead letter queue
*/
final case class QueueCreationConfiguration(
messageTTL: FiniteDuration,
lockTTL: FiniteDuration,
deadletter: Option[DeadletterQueueCreationConfiguration])
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import cats.syntax.all._
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import com.commercetools.queue.{DeadletterQueueConfiguration, QueueAdministration, QueueConfiguration, QueueCreationConfiguration}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider}
import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings}
Expand All @@ -31,7 +31,8 @@ class PubSubAdministration[F[_]](
project: String,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
endpoint: Option[String]
endpoint: Option[String],
makeDLQName: String => String
)(implicit F: Async[F])
extends QueueAdministration[F] {

Expand All @@ -53,9 +54,9 @@ class PubSubAdministration[F[_]](
SubscriptionAdminClient.create(builder.build())
})

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = {
override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] = {
val topicName = TopicName.of(project, name)
val ttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build()
val ttl = Duration.newBuilder().setSeconds(configuration.messageTTL.toSeconds).build()
adminClient.use { client =>
wrapFuture(F.delay {
client
Expand All @@ -71,7 +72,7 @@ class PubSubAdministration[F[_]](
.newBuilder()
.setTopic(topicName.toString())
.setName(SubscriptionName.of(project, s"fs2-queue-$name").toString())
.setAckDeadlineSeconds(lockTTL.toSeconds.toInt)
.setAckDeadlineSeconds(configuration.lockTTL.toSeconds.toInt)
.setMessageRetentionDuration(ttl)
// An empty expiration policy (no TTL set) ensures the subscription is never deleted
.setExpirationPolicy(ExpirationPolicy.newBuilder().build())
Expand Down Expand Up @@ -146,21 +147,28 @@ class PubSubAdministration[F[_]](
}

override def configuration(name: String): F[QueueConfiguration] =
subscriptionClient.use { client =>
wrapFuture[F, Subscription](F.delay {
val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name")
client
.getSubscriptionCallable()
.futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build())
}).map { (sub: Subscription) =>
val messageTTL =
sub.getMessageRetentionDuration().getSeconds.seconds +
sub.getMessageRetentionDuration().getNanos().nanos
val lockTTL =
sub.getAckDeadlineSeconds().seconds
QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
subscriptionClient
.use { client =>
wrapFuture(F.delay {
val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name")
client
.getSubscriptionCallable()
.futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build())
}).map { (sub: Subscription) =>
val messageTTL =
sub.getMessageRetentionDuration().getSeconds.seconds +
sub.getMessageRetentionDuration().getNanos().nanos
val lockTTL =
sub.getAckDeadlineSeconds().seconds
val policy = sub.getDeadLetterPolicy()
val deadletter =
Option(TopicName.parse(policy.getDeadLetterTopic())).map { topicName =>
DeadletterQueueConfiguration(topicName.toString(), policy.getMaxDeliveryAttempts())
}
QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter = deadletter)
}
}
}
.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] = {
adminClient.use { client =>
Expand Down
Loading

0 comments on commit 80372bb

Please sign in to comment.