From f58aff2854af0ecad1888a7a3c1f147390274508 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 22 Mar 2024 11:20:40 +0100 Subject: [PATCH 1/6] Add common error model --- .../com/commercetools/queue/Settlement.scala | 33 ++++++++++++++ .../com/commercetools/queue/errors.scala | 44 +++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 core/src/main/scala/com/commercetools/queue/Settlement.scala create mode 100644 core/src/main/scala/com/commercetools/queue/errors.scala diff --git a/core/src/main/scala/com/commercetools/queue/Settlement.scala b/core/src/main/scala/com/commercetools/queue/Settlement.scala new file mode 100644 index 0000000..ce376c9 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/Settlement.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import cats.Show + +sealed trait Settlement + +object Settlement { + case object Ack extends Settlement + case object Nack extends Settlement + case object ExtendLock extends Settlement + + implicit val show: Show[Settlement] = Show.show { + case Ack => "ack" + case Nack => "nack" + case ExtendLock => "extend lock" + } +} diff --git a/core/src/main/scala/com/commercetools/queue/errors.scala b/core/src/main/scala/com/commercetools/queue/errors.scala new file mode 100644 index 0000000..c1de280 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/errors.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import cats.syntax.show._ + +/** + * The base exception thrown by the clients. + * It defines some well known common exception and avoid leaking the underlying + * queue system exception model when failing. + */ +sealed abstract class QueueException(msg: String, inner: Throwable) extends Exception(msg, inner) + +case class QueueDoesNotExistException(name: String, inner: Throwable) + extends QueueException(show"Queue $name does not exist", inner) + +case class QueueAlreadyExistException(name: String, inner: Throwable) + extends QueueException(show"Queue $name does not exist", inner) + +case class CannotPushException(name: String, inner: Throwable) + extends QueueException(show"Cannot push messages to queue $name", inner) + +case class CannotPullException(name: String, inner: Throwable) + extends QueueException(show"Cannot pull messages from queue $name", inner) + +case class CannotSettleException(msgId: String, action: Settlement, inner: Throwable) + extends QueueException(show"Cannot $action message $msgId", inner) + +case class UnknownQueueException(name: String, inner: Throwable) + extends QueueException(show"Something wrong happened when interacting with queue $name", inner) From bb98221b32a05f998f0ee4f59991b9144b49f51e Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 22 Mar 2024 11:21:37 +0100 Subject: [PATCH 2/6] Adapt errors from AWS to specific `QueueException` --- .../queue/aws/sqs/SQSAdministration.scala | 27 ++++++++----- .../queue/aws/sqs/SQSClient.scala | 6 ++- .../queue/aws/sqs/SQSMessageContext.scala | 7 +++- .../queue/aws/sqs/SQSPublisher.scala | 3 +- .../queue/aws/sqs/SQSPuller.scala | 39 +++++++++++------- .../queue/aws/sqs/SQSPusher.scala | 11 ++++- .../queue/aws/sqs/SQSSubscriber.scala | 8 ++-- .../commercetools/queue/aws/sqs/package.scala | 40 +++++++++++++++++++ 8 files changed, 108 insertions(+), 33 deletions(-) create mode 100644 aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index 69575d2..f5fbf64 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -41,21 +41,28 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S .build()) } }.void + .adaptError(makeQueueException(_, name)) override def delete(name: String): F[Unit] = - getQueueUrl(name).flatMap { queueUrl => - F.fromCompletableFuture { - F.delay { - client.deleteQueue( - DeleteQueueRequest - .builder() - .queueUrl(queueUrl) - .build()) + getQueueUrl(name) + .flatMap { queueUrl => + F.fromCompletableFuture { + F.delay { + client.deleteQueue( + DeleteQueueRequest + .builder() + .queueUrl(queueUrl) + .build()) + } } } - }.void + .void + .adaptError(makeQueueException(_, name)) override def exists(name: String): F[Boolean] = - getQueueUrl(name).as(true).recover { case _: QueueDoesNotExistException => false } + getQueueUrl(name) + .as(true) + .recover { case _: QueueDoesNotExistException => false } + .adaptError(makeQueueException(_, name)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala index 7732729..a41ada6 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala @@ -18,6 +18,7 @@ package com.commercetools.queue.aws.sqs import cats.effect.{Async, Resource} import cats.syntax.functor._ +import cats.syntax.monadError._ import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import software.amazon.awssdk.http.async.SdkAsyncHttpClient @@ -35,15 +36,16 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext client.getQueueUrl(GetQueueUrlRequest.builder().queueName(name).build()) } }.map(_.queueUrl) + .adaptError(makeQueueException(_, name)) override def administration: QueueAdministration[F] = new SQSAdministration(client, getQueueUrl(_)) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = - new SQSPublisher(client, getQueueUrl(name)) + new SQSPublisher(client, name, getQueueUrl(name)) override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = - new SQSSubscriber[F, T](getQueueUrl(name), client) + new SQSSubscriber[F, T](client, name, getQueueUrl(name)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala index 5bdef97..63dccec 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala @@ -18,7 +18,8 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.functor._ -import com.commercetools.queue.MessageContext +import cats.syntax.monadError._ +import com.commercetools.queue.{MessageContext, Settlement} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityRequest, DeleteMessageRequest} @@ -31,6 +32,7 @@ class SQSMessageContext[F[_], T]( receiptHandle: String, val messageId: String, lockTTL: Int, + queueName: String, queueUrl: String, client: SqsAsyncClient )(implicit F: Async[F]) @@ -42,6 +44,7 @@ class SQSMessageContext[F[_], T]( client.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build()) } }.void + .adaptError(makeSettlementException(_, queueName, messageId, Settlement.Ack)) override def nack(): F[Unit] = F.fromCompletableFuture { @@ -55,6 +58,7 @@ class SQSMessageContext[F[_], T]( .build()) } }.void + .adaptError(makeSettlementException(_, queueName, messageId, Settlement.Nack)) override def extendLock(): F[Unit] = F.fromCompletableFuture { @@ -68,5 +72,6 @@ class SQSMessageContext[F[_], T]( .build()) } }.void + .adaptError(makeSettlementException(_, queueName, messageId, Settlement.ExtendLock)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala index c70e9c6..ba351d7 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala @@ -22,6 +22,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient class SQSPublisher[F[_], T]( client: SqsAsyncClient, + queueName: String, getQueueUrl: F[String] )(implicit F: Async[F], @@ -29,6 +30,6 @@ class SQSPublisher[F[_], T]( extends QueuePublisher[F, T] { override def pusher: Resource[F, QueuePusher[F, T]] = - Resource.eval(getQueueUrl).map(new SQSPusher(client, _)) + Resource.eval(getQueueUrl).map(new SQSPusher(client, queueName, _)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala index c85fc95..cae83c7 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala @@ -17,7 +17,10 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async -import cats.syntax.all._ +import cats.syntax.either._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.monadError._ import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller} import fs2.Chunk import software.amazon.awssdk.services.sqs.SqsAsyncClient @@ -29,6 +32,7 @@ import scala.jdk.CollectionConverters._ class SQSPuller[F[_], T]( client: SqsAsyncClient, + queueName: String, queueUrl: String, lockTTL: Int )(implicit @@ -50,19 +54,24 @@ class SQSPuller[F[_], T]( .build()) } }.flatMap { response => - Chunk.iterator(response.messages().iterator().asScala).traverse { message => - deserializer.deserialize(message.body()).liftTo[F].map { payload => - new SQSMessageContext( - payload, - Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong), - message.attributesAsStrings().asScala.toMap, - message.receiptHandle(), - message.messageId(), - lockTTL, - queueUrl, - client - ) + Chunk + .iterator(response.messages().iterator().asScala) + .traverse { message => + deserializer.deserialize(message.body()).liftTo[F].map { payload => + new SQSMessageContext( + payload = payload, + enqueuedAt = + Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong), + metadata = message.attributesAsStrings().asScala.toMap, + receiptHandle = message.receiptHandle(), + messageId = message.messageId(), + lockTTL = lockTTL, + queueName = queueName, + queueUrl = queueUrl, + client = client + ) + } } - } - } + }.widen[Chunk[MessageContext[F, T]]] + .adaptError(makePullQueueException(_, queueName)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala index 4005e1f..c34f09a 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala @@ -18,6 +18,7 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.functor._ +import cats.syntax.monadError._ import com.commercetools.queue.{QueuePusher, Serializer} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest} @@ -25,7 +26,13 @@ import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendM import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit serializer: Serializer[T], F: Async[F]) +class SQSPusher[F[_], T]( + client: SqsAsyncClient, + queueName: String, + queueUrl: String +)(implicit + serializer: Serializer[T], + F: Async[F]) extends QueuePusher[F, T] { override def push(message: T, delay: Option[FiniteDuration]): F[Unit] = @@ -40,6 +47,7 @@ class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit seri .build()) } }.void + .adaptError(makePushQueueException(_, queueName)) override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] = F.fromCompletableFuture { @@ -59,5 +67,6 @@ class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit seri .build()) } }.void + .adaptError(makePushQueueException(_, queueName)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala index 3f53ae1..333ba1a 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala @@ -23,8 +23,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName} class SQSSubscriber[F[_], T]( - getQueueUrl: F[String], - client: SqsAsyncClient + client: SqsAsyncClient, + queueName: String, + getQueueUrl: F[String] )(implicit F: Async[F], deserializer: Deserializer[T]) @@ -41,13 +42,14 @@ class SQSSubscriber[F[_], T]( .build()) } }.map(_.attributes().get(QueueAttributeName.VISIBILITY_TIMEOUT).toInt) + .adaptError(makeQueueException(_, queueName)) override def puller: Resource[F, QueuePuller[F, T]] = Resource.eval { for { queueUrl <- getQueueUrl lockTTL <- getLockTTL(queueUrl) - } yield new SQSPuller(client, queueUrl, lockTTL) + } yield new SQSPuller(client, queueName, queueUrl, lockTTL) } } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala new file mode 100644 index 0000000..6194ed8 --- /dev/null +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.aws + +import com.commercetools.queue.{CannotPullException, CannotPushException, CannotSettleException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, Settlement, UnknownQueueException} +import software.amazon.awssdk.services.sqs.model.{QueueDoesNotExistException => AwsQueueDoesNotExistException, QueueNameExistsException} + +package object sqs { + + def makeQueueException(t: Throwable, queueName: String): QueueException = t match { + case _: AwsQueueDoesNotExistException => QueueDoesNotExistException(queueName, t) + case _: QueueNameExistsException => QueueAlreadyExistException(queueName, t) + case t: QueueException => t + case _ => UnknownQueueException(queueName, t) + } + + def makePushQueueException(t: Throwable, queueName: String): QueueException = + new CannotPushException(queueName, makeQueueException(t, queueName)) + + def makePullQueueException(t: Throwable, queueName: String): QueueException = + new CannotPullException(queueName, makeQueueException(t, queueName)) + + def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException = + new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) + +} From b6df1eaff204cb7eacd11beecd61a4880f586ab3 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 22 Mar 2024 12:05:40 +0100 Subject: [PATCH 3/6] Adapt errors from Azure to specific `QueueException` --- .../servicebus/ServiceBusAdministration.scala | 10 ++++- .../azure/servicebus/ServiceBusPuller.scala | 12 ++++-- .../azure/servicebus/ServiceBusPusher.scala | 15 +++++-- .../servicebus/ServiceBusQueuePublisher.scala | 2 +- .../ServiceBusQueueSubscriber.scala | 2 +- .../queue/azure/servicebus/package.scala | 41 +++++++++++++++++++ 6 files changed, 72 insertions(+), 10 deletions(-) create mode 100644 azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala index ff13c2d..00707e8 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala @@ -18,6 +18,7 @@ package com.commercetools.queue.azure.servicebus import cats.effect.Async import cats.syntax.functor._ +import cats.syntax.monadError._ import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient import com.azure.messaging.servicebus.administration.models.CreateQueueOptions import com.commercetools.queue.QueueAdministration @@ -36,11 +37,16 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp .setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis)) .setLockDuration(Duration.ofMillis(lockTTL.toMillis)))) .void + .adaptError(makeQueueException(_, name)) override def delete(name: String): F[Unit] = - F.blocking(client.deleteQueue(name)).void + F.blocking(client.deleteQueue(name)) + .void + .adaptError(makeQueueException(_, name)) override def exists(name: String): F[Boolean] = - F.blocking(client.getQueueExists(name)).map(_.booleanValue) + F.blocking(client.getQueueExists(name)) + .map(_.booleanValue) + .adaptError(makeQueueException(_, name)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala index 205d19e..d29f4ac 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala @@ -16,8 +16,11 @@ package com.commercetools.queue.azure.servicebus -import cats.effect.kernel.Async -import cats.syntax.all._ +import cats.effect.Async +import cats.syntax.either._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.monadError._ import com.azure.messaging.servicebus.ServiceBusReceiverClient import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller} import fs2.Chunk @@ -27,7 +30,8 @@ import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ class ServiceBusPuller[F[_], Data]( - receiver: ServiceBusReceiverClient + receiver: ServiceBusReceiverClient, + queueName: String )(implicit F: Async[F], deserializer: Deserializer[Data]) @@ -44,5 +48,7 @@ class ServiceBusPuller[F[_], Data]( new ServiceBusMessageContext(data, sbMessage, receiver) }) } + .widen[Chunk[MessageContext[F, Data]]] + .adaptError(makePullQueueException(_, queueName)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala index 24fa039..128c46c 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala @@ -25,7 +25,12 @@ import java.time.ZoneOffset import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit serializer: Serializer[Data], F: Async[F]) +class ServiceBusPusher[F[_], Data]( + sender: ServiceBusSenderClient, + queueName: String +)(implicit + serializer: Serializer[Data], + F: Async[F]) extends QueuePusher[F, Data] { override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = { @@ -34,7 +39,9 @@ class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit seri F.realTimeInstant .map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC))) } *> - F.blocking(sender.sendMessage(sbMessage)).void + F.blocking(sender.sendMessage(sbMessage)) + .void + .adaptError(makePushQueueException(_, queueName)) } override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = { @@ -46,7 +53,9 @@ class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit seri } } } *> - F.blocking(sender.sendMessages(sbMessages.asJava)).void + F.blocking(sender.sendMessages(sbMessages.asJava)) + .void + .adaptError(makePushQueueException(_, queueName)) } } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala index df0ef70..9864047 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala @@ -35,6 +35,6 @@ class ServiceBusQueuePublisher[F[_], Data]( } { s => F.delay(s.close()) } - .map(new ServiceBusPusher(_)) + .map(new ServiceBusPusher(_, queueName)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala index 54d37c6..6c54b40 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala @@ -40,7 +40,7 @@ class ServiceBusQueueSubscriber[F[_], Data]( } } .map { receiver => - new ServiceBusPuller(receiver) + new ServiceBusPuller(receiver, name) } } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala new file mode 100644 index 0000000..a150356 --- /dev/null +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.azure + +import com.azure.core.exception.{ResourceExistsException, ResourceNotFoundException} +import com.commercetools.queue.{CannotPullException, CannotPushException, CannotSettleException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, Settlement, UnknownQueueException} + +package object servicebus { + + def makeQueueException(t: Throwable, queueName: String): QueueException = + t match { + case _: ResourceNotFoundException => QueueDoesNotExistException(queueName, t) + case _: ResourceExistsException => QueueAlreadyExistException(queueName, t) + case t: QueueException => t + case _ => UnknownQueueException(queueName, t) + } + + def makePushQueueException(t: Throwable, queueName: String): QueueException = + new CannotPushException(queueName, makeQueueException(t, queueName)) + + def makePullQueueException(t: Throwable, queueName: String): QueueException = + new CannotPullException(queueName, makeQueueException(t, queueName)) + + def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException = + new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) + +} From 68ca822111aa6018c58673ffe1a24430755e2845 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 29 Mar 2024 14:32:04 +0100 Subject: [PATCH 4/6] Rephrase error message --- core/src/main/scala/com/commercetools/queue/errors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/commercetools/queue/errors.scala b/core/src/main/scala/com/commercetools/queue/errors.scala index c1de280..8b3bec0 100644 --- a/core/src/main/scala/com/commercetools/queue/errors.scala +++ b/core/src/main/scala/com/commercetools/queue/errors.scala @@ -41,4 +41,4 @@ case class CannotSettleException(msgId: String, action: Settlement, inner: Throw extends QueueException(show"Cannot $action message $msgId", inner) case class UnknownQueueException(name: String, inner: Throwable) - extends QueueException(show"Something wrong happened when interacting with queue $name", inner) + extends QueueException(show"Something went wrong when interacting with queue $name", inner) From 77cceaefd083d0bea23c0ecac0910eb0b842f864 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 29 Mar 2024 14:35:50 +0100 Subject: [PATCH 5/6] Improve naming around message exceptions --- .../queue/aws/sqs/SQSMessageContext.scala | 8 ++++---- .../com/commercetools/queue/aws/sqs/package.scala | 6 +++--- .../queue/azure/servicebus/package.scala | 6 +++--- .../scala/com/commercetools/queue/Settlement.scala | 12 ++++++------ .../main/scala/com/commercetools/queue/errors.scala | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala index 63dccec..9fb6ae9 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala @@ -19,7 +19,7 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.functor._ import cats.syntax.monadError._ -import com.commercetools.queue.{MessageContext, Settlement} +import com.commercetools.queue.{Action, MessageContext} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityRequest, DeleteMessageRequest} @@ -44,7 +44,7 @@ class SQSMessageContext[F[_], T]( client.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build()) } }.void - .adaptError(makeSettlementException(_, queueName, messageId, Settlement.Ack)) + .adaptError(makeMessageException(_, queueName, messageId, Action.Ack)) override def nack(): F[Unit] = F.fromCompletableFuture { @@ -58,7 +58,7 @@ class SQSMessageContext[F[_], T]( .build()) } }.void - .adaptError(makeSettlementException(_, queueName, messageId, Settlement.Nack)) + .adaptError(makeMessageException(_, queueName, messageId, Action.Nack)) override def extendLock(): F[Unit] = F.fromCompletableFuture { @@ -72,6 +72,6 @@ class SQSMessageContext[F[_], T]( .build()) } }.void - .adaptError(makeSettlementException(_, queueName, messageId, Settlement.ExtendLock)) + .adaptError(makeMessageException(_, queueName, messageId, Action.ExtendLock)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala index 6194ed8..aca1c54 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/package.scala @@ -16,7 +16,7 @@ package com.commercetools.queue.aws -import com.commercetools.queue.{CannotPullException, CannotPushException, CannotSettleException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, Settlement, UnknownQueueException} +import com.commercetools.queue.{Action, CannotPullException, CannotPushException, MessageException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, UnknownQueueException} import software.amazon.awssdk.services.sqs.model.{QueueDoesNotExistException => AwsQueueDoesNotExistException, QueueNameExistsException} package object sqs { @@ -34,7 +34,7 @@ package object sqs { def makePullQueueException(t: Throwable, queueName: String): QueueException = new CannotPullException(queueName, makeQueueException(t, queueName)) - def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException = - new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) + def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException = + new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala index a150356..d140c17 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/package.scala @@ -17,7 +17,7 @@ package com.commercetools.queue.azure import com.azure.core.exception.{ResourceExistsException, ResourceNotFoundException} -import com.commercetools.queue.{CannotPullException, CannotPushException, CannotSettleException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, Settlement, UnknownQueueException} +import com.commercetools.queue.{Action, CannotPullException, CannotPushException, MessageException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, UnknownQueueException} package object servicebus { @@ -35,7 +35,7 @@ package object servicebus { def makePullQueueException(t: Throwable, queueName: String): QueueException = new CannotPullException(queueName, makeQueueException(t, queueName)) - def makeSettlementException(t: Throwable, queueName: String, msgId: String, action: Settlement): QueueException = - new CannotSettleException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) + def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException = + new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) } diff --git a/core/src/main/scala/com/commercetools/queue/Settlement.scala b/core/src/main/scala/com/commercetools/queue/Settlement.scala index ce376c9..6b33db6 100644 --- a/core/src/main/scala/com/commercetools/queue/Settlement.scala +++ b/core/src/main/scala/com/commercetools/queue/Settlement.scala @@ -18,14 +18,14 @@ package com.commercetools.queue import cats.Show -sealed trait Settlement +sealed trait Action -object Settlement { - case object Ack extends Settlement - case object Nack extends Settlement - case object ExtendLock extends Settlement +object Action { + case object Ack extends Action + case object Nack extends Action + case object ExtendLock extends Action - implicit val show: Show[Settlement] = Show.show { + implicit val show: Show[Action] = Show.show { case Ack => "ack" case Nack => "nack" case ExtendLock => "extend lock" diff --git a/core/src/main/scala/com/commercetools/queue/errors.scala b/core/src/main/scala/com/commercetools/queue/errors.scala index 8b3bec0..4851ad8 100644 --- a/core/src/main/scala/com/commercetools/queue/errors.scala +++ b/core/src/main/scala/com/commercetools/queue/errors.scala @@ -37,7 +37,7 @@ case class CannotPushException(name: String, inner: Throwable) case class CannotPullException(name: String, inner: Throwable) extends QueueException(show"Cannot pull messages from queue $name", inner) -case class CannotSettleException(msgId: String, action: Settlement, inner: Throwable) +case class MessageException(msgId: String, action: Action, inner: Throwable) extends QueueException(show"Cannot $action message $msgId", inner) case class UnknownQueueException(name: String, inner: Throwable) From 3af3cf254839bfec72e9395d730cdc84e9205e4e Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 4 Apr 2024 10:20:28 +0200 Subject: [PATCH 6/6] Rename file for consistency --- .../com/commercetools/queue/{Settlement.scala => Action.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/src/main/scala/com/commercetools/queue/{Settlement.scala => Action.scala} (100%) diff --git a/core/src/main/scala/com/commercetools/queue/Settlement.scala b/core/src/main/scala/com/commercetools/queue/Action.scala similarity index 100% rename from core/src/main/scala/com/commercetools/queue/Settlement.scala rename to core/src/main/scala/com/commercetools/queue/Action.scala