Skip to content

Commit

Permalink
Merge pull request #13 from commercetools/errors/deserialization
Browse files Browse the repository at this point in the history
Wrap deserialization errors in `DeserializationException`
  • Loading branch information
satabin authored Apr 5, 2024
2 parents 86ac289 + 44acbb6 commit 0d4db6a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
Expand Down Expand Up @@ -57,20 +56,22 @@ class SQSPuller[F[_], T](
Chunk
.iterator(response.messages().iterator().asScala)
.traverse { message =>
deserializer.deserialize(message.body()).liftTo[F].map { payload =>
new SQSMessageContext(
payload = payload,
enqueuedAt =
Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong),
metadata = message.attributesAsStrings().asScala.toMap,
receiptHandle = message.receiptHandle(),
messageId = message.messageId(),
lockTTL = lockTTL,
queueName = queueName,
queueUrl = queueUrl,
client = client
)
}
deserializer
.deserializeF(message.body())
.map { payload =>
new SQSMessageContext(
payload = payload,
enqueuedAt =
Instant.ofEpochMilli(message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP).toLong),
metadata = message.attributesAsStrings().asScala.toMap,
receiptHandle = message.receiptHandle(),
messageId = message.messageId(),
lockTTL = lockTTL,
queueName = queueName,
queueUrl = queueUrl,
client = client
)
}
}
}.widen[Chunk[MessageContext[F, T]]]
.adaptError(makePullQueueException(_, queueName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ package object sqs {
new CannotPushException(queueName, makeQueueException(t, queueName))

def makePullQueueException(t: Throwable, queueName: String): QueueException =
new CannotPullException(queueName, makeQueueException(t, queueName))
t match {
case t: QueueException => t
case _ => new CannotPullException(queueName, makeQueueException(t, queueName))
}

def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException =
new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))
t match {
case t: QueueException => t
case _ => new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
Expand All @@ -43,10 +42,13 @@ class ServiceBusPuller[F[_], Data](
.iterator(receiver.receiveMessages(batchSize, Duration.ofMillis(waitingTime.toMillis)).iterator().asScala)
}
.flatMap { chunk =>
chunk.traverse(sbMessage =>
deserializer.deserialize(sbMessage.getBody().toString()).liftTo[F].map { data =>
new ServiceBusMessageContext(data, sbMessage, receiver)
})
chunk.traverse { sbMessage =>
deserializer
.deserializeF(sbMessage.getBody().toString())
.map { data =>
new ServiceBusMessageContext(data, sbMessage, receiver)
}
}
}
.widen[Chunk[MessageContext[F, Data]]]
.adaptError(makePullQueueException(_, queueName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ package object servicebus {
new CannotPushException(queueName, makeQueueException(t, queueName))

def makePullQueueException(t: Throwable, queueName: String): QueueException =
new CannotPullException(queueName, makeQueueException(t, queueName))
t match {
case t: QueueException => t
case _ => new CannotPullException(queueName, makeQueueException(t, queueName))
}

def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException =
new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))
t match {
case t: QueueException => t
case _ => new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@

package com.commercetools.queue

import cats.MonadThrow
import cats.syntax.either._
import cats.syntax.monadError._

/**
* Abstraction over how to deserialize data from string.
*/
trait Deserializer[T] {
def deserialize(s: String): Either[Throwable, T]

def deserializeF[F[_]: MonadThrow](s: String): F[T] =
deserialize(s).liftTo[F].adaptError(DeserializationException(s, _))
}

object Deserializer {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/commercetools/queue/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import cats.syntax.show._
*/
sealed abstract class QueueException(msg: String, inner: Throwable) extends Exception(msg, inner)

case class DeserializationException(body: String, inner: Throwable)
extends QueueException(show"Something went wrong when deserializing '$body'", inner)

case class QueueDoesNotExistException(name: String, inner: Throwable)
extends QueueException(show"Queue $name does not exist", inner)

Expand Down

0 comments on commit 0d4db6a

Please sign in to comment.