Skip to content

Commit

Permalink
Add support for message attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 27, 2024
1 parent 46f5792 commit f8610c6
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.monadError._
import cats.syntax.traverse._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}
import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
Expand All @@ -36,21 +36,23 @@ class SQSPusher[F[_], T](
F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, attributes: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.messageAttributes(
attributes.view.mapValues(v => MessageAttributeValue.builder().stringValue(v).build()).toMap.asJava)
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
Expand All @@ -61,7 +63,9 @@ class SQSPusher[F[_], T](
.entries(messages.mapWithIndex { (message, idx) =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.messageBody(serializer.serialize(message._1))
.messageAttributes(
message._2.view.mapValues(v => MessageAttributeValue.builder().stringValue(v).build()).toMap.asJava)
.delaySeconds(delaySeconds)
.id(idx.toString())
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class ServiceBusPusher[F[_], Data](
F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
override def push(message: Data, attributes: Map[String, String], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
sbMessage.getApplicationProperties.putAll(attributes.asJava)
delay.traverse_ { delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
Expand All @@ -44,8 +45,12 @@ class ServiceBusPusher[F[_], Data](
.adaptError(makePushQueueException(_, queueName))
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
override def push(messages: List[(Data, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map { msg =>
val sbm = new ServiceBusMessage(serializer.serialize(msg._1))
sbm.getApplicationProperties.putAll(msg._2.asJava)
sbm
}
delay.traverse_ { delay =>
F.realTimeInstant.map { now =>
sbMessages.foreach { msg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10)(upstream: Stream[F, T]): Stream[F, Nothing] =
def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/com/commercetools/queue/QueuePusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ trait QueuePusher[F[_], T] {
/**
* Publishes a single message to the queue, with an optional delay.
*/
def push(message: T, delay: Option[FiniteDuration]): F[Unit]
def push(message: T, attributes: Map[String, String], delay: Option[FiniteDuration]): F[Unit]

/**
* Publishes a bunch of messages to the queue, with an optional delay.
*/
def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit]
def push(messages: (List[(T, Map[String, String])]), delay: Option[FiniteDuration]): F[Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ class TestQueue[T](
Chunk.chain(newlyLocked.map(_._2)))
}

def enqeueMessages(messages: List[T], delay: Option[FiniteDuration]) =
def enqeueMessages(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]) =
state.evalUpdate { state =>
for {
now <- IO.realTimeInstant
state <- update(state)
} yield delay match {
case None =>
state.copy(available = state.available.addAll(messages.map(TestMessage(_, now))))
state.copy(available = state.available.addAll(messages.map(x => TestMessage(x._1, now))))
case Some(delay) =>
val delayed = now.plusMillis(delay.toMillis)
state.copy(delayed = messages.map(TestMessage(_, delayed)) reverse_::: state.delayed)
state.copy(delayed = messages.map(x => TestMessage(x._1, delayed)) reverse_::: state.delayed)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] {

override val queueName: String = queue.name

override def push(message: T, delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(message :: Nil, delay)
override def push(message: T, attributes: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages((message, attributes) :: Nil, delay)

override def push(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(messages, delay)

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ class PubSubPusher[F[_], T](
serializer: Serializer[T])
extends QueuePusher[F, T] {

private def makeMessage(payload: T, waitUntil: Option[Instant]): F[PubsubMessage] =
private def makeMessage(payload: T, attributes: Map[String, String], waitUntil: Option[Instant]): F[PubsubMessage] =
F.delay {
val builder = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(serializer.serialize(payload)))
builder.putAllAttributes(attributes.asJava)
waitUntil.foreach(waitUntil => builder.putAttributes(delayAttribute, waitUntil.toString()))
builder.build
}

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, attributes: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msg <- makeMessage(message, waitUntil)
msg <- makeMessage(message, attributes, waitUntil)
_ <- wrapFuture(
F.delay(
publisher
Expand All @@ -58,10 +59,10 @@ class PubSubPusher[F[_], T](
} yield ())
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msgs <- messages.traverse(makeMessage(_, waitUntil))
msgs <- messages.traverse(x => makeMessage(x._1, x._2, waitUntil))
_ <- wrapFuture(
F.delay(publisher
.publishCallable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ class MeasuringQueuePusher[F[_], T](

override def queueName: String = underlying.queueName

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, attributes: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessage")
.surround {
underlying
.push(message, delay)
.push(message, attributes, delay)
}
.guaranteeCase(metrics.send)

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessages")
.surround {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class MeasuringPusherSuite extends CatsEffectSuite {

override def queueName: String = self.queueName

override def push(message: String, delay: Option[FiniteDuration]): IO[Unit] = result
override def push(message: String, attributes: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
result

override def push(messages: List[String], delay: Option[FiniteDuration]): IO[Unit] = result
override def push(messages: List[(String, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
result

}

Expand All @@ -47,7 +49,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -61,7 +63,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -78,7 +80,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -95,7 +97,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -109,7 +111,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -123,7 +125,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class QueueClientSuite extends CatsEffectSuite {
for {
random <- Random.scalaUtilRandom[IO]
size <- random.nextLongBounded(30L)
messages = List.range(0L, size).map(_.toString())
messages = List.range(0L, size).map(i => (i.toString(), Map.empty[String, String]))
received <- Ref[IO].of(List.empty[String])
client = clientFixture()
_ <- Stream
Expand All @@ -55,7 +55,7 @@ abstract class QueueClientSuite extends CatsEffectSuite {
)
.compile
.drain
_ <- assertIO(received.get.map(_.toSet), messages.toSet)
_ <- assertIO(received.get.map(_.toSet), messages.map(_._1).toSet)
} yield ()
}

Expand All @@ -79,7 +79,7 @@ abstract class QueueClientSuite extends CatsEffectSuite {
withQueue.test("delayed messages should not be pulled before deadline") { queueName =>
val client = clientFixture()
client.publish(queueName).pusher.use { pusher =>
pusher.push("delayed message", Some(2.seconds))
pusher.push("delayed message", Map.empty, Some(2.seconds))
} *> client.subscribe(queueName).puller.use { puller =>
for {
_ <- assertIO(puller.pullBatch(1, 1.second), Chunk.empty)
Expand Down

0 comments on commit f8610c6

Please sign in to comment.