From 80372bbb61e6ace3f6deab6ebbbad151d0ff8589 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 27 May 2024 16:21:09 +0200 Subject: [PATCH] Add support for dead letter queues --- .../queue/aws/sqs/SQSAdministration.scala | 84 ++++++++++++++++--- .../queue/aws/sqs/SQSClient.scala | 24 +++++- .../servicebus/ServiceBusAdministration.scala | 36 +++++--- .../queue/DeadletterQueueConfiguration.scala | 7 ++ ...DeadletterQueueCreationConfiguration.scala | 3 + .../queue/QueueAdministration.scala | 9 +- .../queue/QueueConfiguration.scala | 10 ++- .../queue/QueueCreationConfiguration.scala | 18 ++++ .../gcp/pubsub/PubSubAdministration.scala | 46 +++++----- .../queue/gcp/pubsub/PubSubClient.scala | 44 ++++++++-- .../otel4s/MeasuringQueueAdministration.scala | 6 +- .../queue/testkit/QueueClientSuite.scala | 10 +-- 12 files changed, 239 insertions(+), 58 deletions(-) create mode 100644 core/src/main/scala/com/commercetools/queue/DeadletterQueueConfiguration.scala create mode 100644 core/src/main/scala/com/commercetools/queue/DeadletterQueueCreationConfiguration.scala create mode 100644 core/src/main/scala/com/commercetools/queue/QueueCreationConfiguration.scala diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index a4c7295..e1ab44f 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -23,31 +23,85 @@ import cats.syntax.functor._ import cats.syntax.functorFilter._ import cats.syntax.monadError._ import cats.syntax.option._ +import cats.syntax.traverse._ import com.commercetools.queue.aws.sqs.makeQueueException -import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException} +import com.commercetools.queue.{DeadletterQueueConfiguration, MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueCreationConfiguration, QueueDoesNotExistException} +import software.amazon.awssdk.protocols.jsoncore.JsonNodeParser import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest} import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ -class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F]) +class SQSAdministration[F[_]]( + client: SqsAsyncClient, + getQueueUrl: String => F[String], + makeDQLName: String => String +)(implicit F: Async[F]) extends QueueAdministration[F] { - override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = + /** + * In SQS, a dead letter queue is a standard queue with a `RedriveAllowPolicy` attribute. + * Its ARN will be referenced in the attributes of the source queue, so it is returned + * after creation. + */ + private def createDeadLetterQueue(baseName: String): F[String] = { + val dlqName = makeDQLName(baseName) F.fromCompletableFuture { F.delay { client.createQueue( CreateQueueRequest .builder() - .queueName(name) + .queueName(dlqName) .attributes(Map( - QueueAttributeName.MESSAGE_RETENTION_PERIOD -> messageTTL.toSeconds.toString(), - QueueAttributeName.VISIBILITY_TIMEOUT -> lockTTL.toSeconds.toString()).asJava) + QueueAttributeName.REDRIVE_ALLOW_POLICY -> """{"redrivePermission": "allowAll"}""", + // SQS has a maximum retention period of 14 days + // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html + QueueAttributeName.MESSAGE_RETENTION_PERIOD -> 14.days.toSeconds.toString() + ).asJava) .build()) } - }.void - .adaptError(makeQueueException(_, name)) + }.flatMap { response => + F.fromCompletableFuture { + F.delay { + client.getQueueAttributes( + GetQueueAttributesRequest + .builder() + .queueUrl(response.queueUrl()) + .attributeNames(QueueAttributeName.QUEUE_ARN) + .build()) + } + }.flatMap { response => + val arn = response.attributes().get(QueueAttributeName.QUEUE_ARN) + F.raiseWhen(arn == null)( + MalformedQueueConfigurationException(dlqName, QueueAttributeName.QUEUE_ARN.toString(), "")) + .as(arn) + } + } + } + + override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] = + configuration.deadletter + .traverse(maxAttempts => createDeadLetterQueue(name).map(_ -> maxAttempts)) + .flatMap { dlq => + F.fromCompletableFuture { + F.delay { + client.createQueue( + CreateQueueRequest + .builder() + .queueName(name) + .attributes(Map( + QueueAttributeName.MESSAGE_RETENTION_PERIOD -> Some(configuration.messageTTL.toSeconds.toString()), + QueueAttributeName.VISIBILITY_TIMEOUT -> Some(configuration.lockTTL.toSeconds.toString()), + QueueAttributeName.REDRIVE_POLICY -> dlq.map { case (dlqArn, maxAttempts) => + s"""{"deadLetterTargetArn":"$dlqArn","maxReceiveCount":$maxAttempts}""" + } + ).flattenOption.asJava) + .build()) + } + }.void + .adaptError(makeQueueException(_, name)) + } override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = getQueueUrl(name) @@ -77,7 +131,10 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S GetQueueAttributesRequest .builder() .queueUrl(queueUrl) - .attributeNames(QueueAttributeName.MESSAGE_RETENTION_PERIOD, QueueAttributeName.VISIBILITY_TIMEOUT) + .attributeNames( + QueueAttributeName.MESSAGE_RETENTION_PERIOD, + QueueAttributeName.VISIBILITY_TIMEOUT, + QueueAttributeName.REDRIVE_POLICY) .build()) } } @@ -101,7 +158,14 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S ttl.toIntOption .map(_.seconds) .liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", ttl))) - } yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) + deadletter <- attributes.get(QueueAttributeName.REDRIVE_POLICY).traverse { policy => + for { + bag <- F.delay(JsonNodeParser.create().parse(policy)) + dlq <- F.delay(bag.field("deadLetterTargetArn").get().asString().split(":").last) + maxAttempts <- F.delay(bag.field("maxReceiveCount").get().asNumber().toInt) + } yield DeadletterQueueConfiguration(dlq, maxAttempts) + } + } yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter) } .adaptError(makeQueueException(_, name)) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala index 4f82290..fe792af 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala @@ -28,7 +28,8 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest import java.net.URI -class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends QueueClient[F] { +class SQSClient[F[_]] private (client: SqsAsyncClient, makeDLQName: String => String)(implicit F: Async[F]) + extends QueueClient[F] { private def getQueueUrl(name: String): F[String] = F.fromCompletableFuture { @@ -39,7 +40,7 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext .adaptError(makeQueueException(_, name)) override def administration: QueueAdministration[F] = - new SQSAdministration(client, getQueueUrl(_)) + new SQSAdministration(client, getQueueUrl(_), makeDLQName) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new SQSPublisher(name, client, getQueueUrl(name)) @@ -51,9 +52,26 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext object SQSClient { + private def defaultMakeDLQName(name: String): String = + s"$name-dlq" + + /** + * Creates a new [[SQSClient]]. + * + * @param region the region to use + * @param credentials the credentials to use + * @param makeDLQName how the dead letter queue name is derived from the queue name + * by default it suffixes the queue name with `-dlq` + * @param httpClient the existing HTTP client to use. + * '''Note:''' if provided, it is not closed when resource is released, + * otherwise the client manages its own client and will close it when + * released + * @param endpoint the service endpoint to use. + */ def apply[F[_]]( region: Region, credentials: AwsCredentialsProvider, + makeDLQName: String => String = defaultMakeDLQName, endpoint: Option[URI] = None, httpClient: Option[SdkAsyncHttpClient] = None )(implicit F: Async[F] @@ -71,6 +89,6 @@ object SQSClient { builder.build() } } - .map(new SQSClient(_)) + .map(new SQSClient(_, makeDLQName)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala index 1372eac..733aad2 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala @@ -21,7 +21,7 @@ import cats.syntax.functor._ import cats.syntax.monadError._ import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient import com.azure.messaging.servicebus.administration.models.CreateQueueOptions -import com.commercetools.queue.{QueueAdministration, QueueConfiguration} +import com.commercetools.queue.{DeadletterQueueConfiguration, QueueAdministration, QueueConfiguration, QueueCreationConfiguration} import java.time.Duration import scala.concurrent.duration._ @@ -29,14 +29,26 @@ import scala.concurrent.duration._ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F]) extends QueueAdministration[F] { - override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = - F.blocking( + override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] = + F.blocking { + val options = new CreateQueueOptions() + .setDefaultMessageTimeToLive(Duration.ofMillis(configuration.messageTTL.toMillis)) + .setLockDuration(Duration.ofMillis(configuration.lockTTL.toMillis)) + configuration.deadletter match { + case Some(configuration) => + options + .setMaxDeliveryCount(configuration.maxAttempts) + .setDeadLetteringOnMessageExpiration(true) + case None => + options + .setMaxDeliveryCount(Int.MaxValue) + .setDeadLetteringOnMessageExpiration(false) + } client.createQueue( name, - new CreateQueueOptions() - .setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis)) - .setLockDuration(Duration.ofMillis(lockTTL.toMillis)))) - .void + options + ) + }.void .adaptError(makeQueueException(_, name)) override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = @@ -45,15 +57,19 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp messageTTL.foreach(ttl => properties.setDefaultMessageTimeToLive(Duration.ofMillis(ttl.toMillis))) lockTTL.foreach(ttl => properties.setLockDuration(Duration.ofMillis(ttl.toMillis))) val _ = client.updateQueue(properties) - } + }.adaptError(makeQueueException(_, name)) override def configuration(name: String): F[QueueConfiguration] = F.blocking { val properties = client.getQueue(name) val messageTTL = properties.getDefaultMessageTimeToLive().toMillis.millis val lockTTL = properties.getLockDuration().toMillis.millis - QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) - } + val deadletter = + Option.when(properties.isDeadLetteringOnMessageExpiration())( + DeadletterQueueConfiguration(properties.getForwardDeadLetteredMessagesTo(), properties.getMaxDeliveryCount())) + + QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter = deadletter) + }.adaptError(makeQueueException(_, name)) override def delete(name: String): F[Unit] = F.blocking(client.deleteQueue(name)) diff --git a/core/src/main/scala/com/commercetools/queue/DeadletterQueueConfiguration.scala b/core/src/main/scala/com/commercetools/queue/DeadletterQueueConfiguration.scala new file mode 100644 index 0000000..e9013fd --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/DeadletterQueueConfiguration.scala @@ -0,0 +1,7 @@ +package com.commercetools.queue + +/** + * @param name the name of the deadletter queue + * @param maxAttempts the maximum number of delivery attempts made before forwarding a message to the dead letter queue + */ +final case class DeadletterQueueConfiguration(name: String, maxAttempts: Int) diff --git a/core/src/main/scala/com/commercetools/queue/DeadletterQueueCreationConfiguration.scala b/core/src/main/scala/com/commercetools/queue/DeadletterQueueCreationConfiguration.scala new file mode 100644 index 0000000..c730a3f --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/DeadletterQueueCreationConfiguration.scala @@ -0,0 +1,3 @@ +package com.commercetools.queue + +final case class DeadletterQueueCreationConfiguration(maxAttempts: Int) diff --git a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala index 947bf54..8905cba 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala @@ -23,8 +23,13 @@ import scala.concurrent.duration.FiniteDuration */ trait QueueAdministration[F[_]] { - /** Creates a queue with the given name, message TTL and lock TTL. */ - def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] + /** + * Creates a queue with the given name and configuration. + * If the configuration contains a `deadletter` element, a dead letter + * queue is created and associated to the main queue with the configured + * maximum delivery attempt. + */ + def create(name: String, configuration: QueueCreationConfiguration): F[Unit] /** * Updates the queue with the given name, with provided message TTL and/or lock TTL. diff --git a/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala b/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala index f46a0ee..7a179f2 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala @@ -18,4 +18,12 @@ package com.commercetools.queue import scala.concurrent.duration.FiniteDuration -final case class QueueConfiguration(messageTTL: FiniteDuration, lockTTL: FiniteDuration) +/** + * @param messageTTL the time a message is guaranteed to stay in the queue before being discarded by the underlying system + * @param lockTTL the time a message is locked (or leased) upon reception by a subscriber before being eligible to redelivery + * @param deadletter the dead-letter queue configuration if any + */ +final case class QueueConfiguration( + messageTTL: FiniteDuration, + lockTTL: FiniteDuration, + deadletter: Option[DeadletterQueueConfiguration]) diff --git a/core/src/main/scala/com/commercetools/queue/QueueCreationConfiguration.scala b/core/src/main/scala/com/commercetools/queue/QueueCreationConfiguration.scala new file mode 100644 index 0000000..98bf991 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/QueueCreationConfiguration.scala @@ -0,0 +1,18 @@ +package com.commercetools.queue + +import scala.concurrent.duration.FiniteDuration + +/** + * Configuration provided upon queue creation. + * + * @param messageTTL the time a message is kept in the queue before being discarded + * @param lockTTL the time a message is locked (or leased) after having been delivered + * to a consumer and before being made available to other again + * @param deadletter whether to create a dead letter queue associated to this queue + * with the configured amount of delivery tries before moving a message + * to the dead letter queue + */ +final case class QueueCreationConfiguration( + messageTTL: FiniteDuration, + lockTTL: FiniteDuration, + deadletter: Option[DeadletterQueueCreationConfiguration]) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index 234c24a..ff237b0 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -18,7 +18,7 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} import cats.syntax.all._ -import com.commercetools.queue.{QueueAdministration, QueueConfiguration} +import com.commercetools.queue.{DeadletterQueueConfiguration, QueueAdministration, QueueConfiguration, QueueCreationConfiguration} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider} import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings} @@ -31,7 +31,8 @@ class PubSubAdministration[F[_]]( project: String, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, - endpoint: Option[String] + endpoint: Option[String], + makeDLQName: String => String )(implicit F: Async[F]) extends QueueAdministration[F] { @@ -53,9 +54,9 @@ class PubSubAdministration[F[_]]( SubscriptionAdminClient.create(builder.build()) }) - override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = { + override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] = { val topicName = TopicName.of(project, name) - val ttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + val ttl = Duration.newBuilder().setSeconds(configuration.messageTTL.toSeconds).build() adminClient.use { client => wrapFuture(F.delay { client @@ -71,7 +72,7 @@ class PubSubAdministration[F[_]]( .newBuilder() .setTopic(topicName.toString()) .setName(SubscriptionName.of(project, s"fs2-queue-$name").toString()) - .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .setAckDeadlineSeconds(configuration.lockTTL.toSeconds.toInt) .setMessageRetentionDuration(ttl) // An empty expiration policy (no TTL set) ensures the subscription is never deleted .setExpirationPolicy(ExpirationPolicy.newBuilder().build()) @@ -146,21 +147,28 @@ class PubSubAdministration[F[_]]( } override def configuration(name: String): F[QueueConfiguration] = - subscriptionClient.use { client => - wrapFuture[F, Subscription](F.delay { - val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") - client - .getSubscriptionCallable() - .futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build()) - }).map { (sub: Subscription) => - val messageTTL = - sub.getMessageRetentionDuration().getSeconds.seconds + - sub.getMessageRetentionDuration().getNanos().nanos - val lockTTL = - sub.getAckDeadlineSeconds().seconds - QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) + subscriptionClient + .use { client => + wrapFuture(F.delay { + val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") + client + .getSubscriptionCallable() + .futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build()) + }).map { (sub: Subscription) => + val messageTTL = + sub.getMessageRetentionDuration().getSeconds.seconds + + sub.getMessageRetentionDuration().getNanos().nanos + val lockTTL = + sub.getAckDeadlineSeconds().seconds + val policy = sub.getDeadLetterPolicy() + val deadletter = + Option(TopicName.parse(policy.getDeadLetterTopic())).map { topicName => + DeadletterQueueConfiguration(topicName.toString(), policy.getMaxDeliveryAttempts()) + } + QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter = deadletter) + } } - } + .adaptError(makeQueueException(_, name)) override def delete(name: String): F[Unit] = { adminClient.use { client => diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index 252d586..54646e8 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -28,11 +28,12 @@ class PubSubClient[F[_]: Async] private ( channelProvider: TransportChannelProvider, useGrpc: Boolean, credentials: CredentialsProvider, - endpoint: Option[String]) + endpoint: Option[String], + makeDLQName: String => String) extends QueueClient[F] { override def administration: QueueAdministration[F] = - new PubSubAdministration[F](project, channelProvider, credentials, endpoint) + new PubSubAdministration[F](project, channelProvider, credentials, endpoint, makeDLQName) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint) @@ -54,27 +55,60 @@ object PubSubClient { HttpJsonTransportChannel.create( ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build()) + private def makeDefaultDLQName(name: String): String = + s"$name-dlq" + + /** + * Creates a [[PubSubClient]]. + * + * @param project the project to use + * @param credentials the credentials to use + * @param makeDLQName how the dead letter queue name is derived from the queue name + * by default it suffixes the queue name with `-dlq` + * @param endpoint the service endpoint to use + * @param mkTransportChannel how the HTTP transport channel is created + * by default it uses the `NetHttpTransport` client + */ def apply[F[_]]( project: String, credentials: CredentialsProvider, + makeDLQName: String => String = makeDefaultDLQName, endpoint: Option[String] = None, - mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _ + mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel )(implicit F: Async[F] ): Resource[F, PubSubClient[F]] = Resource .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) .map { channel => - new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint) + new PubSubClient[F]( + project, + FixedTransportChannelProvider.create(channel), + false, + credentials, + endpoint, + makeDLQName) } + /** + * Creates an unmanaged [[PubSubClient]]. + * + * @param project the project to use + * @param credentials the credentials to use + * @param makeDLQName how the dead letter queue name is derived from the queue name + * by default it suffixes the queue name with `-dlq` + * @param channelProvider the channel provider to use, needs to be managed by caller + * @param useGrpc whether the channel provider uses gRPC + * @param endpoint the service endpoint to use + */ def unmanaged[F[_]]( project: String, credentials: CredentialsProvider, channelProvider: TransportChannelProvider, useGrpc: Boolean, + makeDLQName: String => String = makeDefaultDLQName, endpoint: Option[String] = None )(implicit F: Async[F] ): PubSubClient[F] = - new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint) + new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint, makeDLQName) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala index 4713e94..b55b0ee 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala @@ -18,7 +18,7 @@ package com.commercetools.queue.otel4s import cats.effect.MonadCancel import cats.effect.syntax.monadCancel._ -import com.commercetools.queue.{QueueAdministration, QueueConfiguration} +import com.commercetools.queue.{QueueAdministration, QueueConfiguration, QueueCreationConfiguration} import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer @@ -32,11 +32,11 @@ class MeasuringQueueAdministration[F[_]]( )(implicit F: MonadCancel[F, Throwable]) extends QueueAdministration[F] { - override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = + override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] = tracer .span("queue.create") .surround { - underlying.create(name, messageTTL, lockTTL) + underlying.create(name, configuration) } .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.create, requestCounter)) diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala index 5781a3b..5a5cdf4 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -2,7 +2,7 @@ package com.commercetools.queue.testkit import cats.effect.std.Random import cats.effect.{IO, Ref, Resource} -import com.commercetools.queue.{QueueClient, QueueConfiguration} +import com.commercetools.queue.{QueueClient, QueueConfiguration, QueueCreationConfiguration} import fs2.{Chunk, Stream} import munit.CatsEffectSuite @@ -34,7 +34,7 @@ abstract class QueueClientSuite extends CatsEffectSuite { .map(uuid => s"queue-$uuid") .flatTap { queueName => clientFixture().administration - .create(queueName, originalMessageTTL, originalLockTTL) + .create(queueName, QueueCreationConfiguration(originalMessageTTL, originalLockTTL, None)) })(queueName => clientFixture().administration.delete(queueName))) withQueue.test("published messages are received by a processor") { queueName => @@ -91,15 +91,15 @@ abstract class QueueClientSuite extends CatsEffectSuite { } withQueue.test("configuration can be updated") { queueName => - assume(queueUpdateSupported, "The test environment does not support queue update") val client = clientFixture() val admin = client.administration for { - _ <- assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL)) + _ <- assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL, None)) + _ = assume(queueUpdateSupported, "The test environment does not support queue update") _ <- admin.update(queueName, Some(originalMessageTTL + 1.minute), Some(originalLockTTL + 10.seconds)) _ <- assertIO( admin.configuration(queueName), - QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds)) + QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds, None)) } yield () }