Skip to content

Commit

Permalink
Merge pull request #22 from commercetools/publish-message-with-attrib…
Browse files Browse the repository at this point in the history
…utes

Publish messages with metadata
  • Loading branch information
satabin authored May 30, 2024
2 parents 46f5792 + 8d6db07 commit b6be5e6
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class SQSPuller[F[_], T](
.queueUrl(queueUrl)
.maxNumberOfMessages(batchSize)
.waitTimeSeconds(waitingTime.toSeconds.toInt)
.attributeNamesWithStrings(MessageSystemAttributeName.SENT_TIMESTAMP.toString())
.messageAttributeNames(".*")
.attributeNamesWithStrings(MessageSystemAttributeName.SENT_TIMESTAMP.toString)
.build(): @nowarn("msg=method attributeNamesWithStrings in trait Builder is deprecated")
)
}
Expand All @@ -72,7 +73,12 @@ class SQSPuller[F[_], T](
.attributes()
.get(MessageSystemAttributeName.SENT_TIMESTAMP)
.toLong),
metadata = message.attributesAsStrings().asScala.toMap,
metadata = message
.messageAttributes()
.asScala
.view
.collect { case (k, v) if v.dataType() == "String" => (k, v.stringValue()) }
.toMap,
receiptHandle = message.receiptHandle(),
messageId = message.messageId(),
lockTTL = lockTTL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.monadError._
import cats.syntax.traverse._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}
import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}

import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
Expand All @@ -36,32 +36,34 @@ class SQSPusher[F[_], T](
F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
client.sendMessage(
SendMessageRequest
.builder()
.queueUrl(queueUrl)
.messageBody(serializer.serialize(message))
.messageAttributes(metadata.view.mapValues(mkStringAttributeValue).toMap.asJava)
.delaySeconds(delay.fold(0)(_.toSeconds.toInt))
.build())
}
}.void
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
F.fromCompletableFuture {
F.delay {
val delaySeconds = delay.fold(0)(_.toSeconds.toInt)
client.sendMessageBatch(
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.mapWithIndex { (message, idx) =>
.entries(messages.mapWithIndex { case ((payload, metadata), idx) =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.messageBody(serializer.serialize(payload))
.messageAttributes(metadata.view.mapValues(mkStringAttributeValue).toMap.asJava)
.delaySeconds(delaySeconds)
.id(idx.toString())
.build()
Expand All @@ -71,4 +73,7 @@ class SQSPusher[F[_], T](
}.void
.adaptError(makePushQueueException(_, queueName))

private def mkStringAttributeValue(s: String): MessageAttributeValue =
MessageAttributeValue.builder().dataType("String").stringValue(s).build()

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.azure.messaging.servicebus.{ServiceBusReceivedMessage, ServiceBusRece
import com.commercetools.queue.MessageContext

import java.time.Instant
import scala.jdk.CollectionConverters.MapHasAsScala

class ServiceBusMessageContext[F[_], T](
val payload: F[T],
Expand All @@ -34,8 +35,10 @@ class ServiceBusMessageContext[F[_], T](

override def enqueuedAt: Instant = underlying.getEnqueuedTime().toInstant()

override def metadata: Map[String, String] =
Map.empty
override lazy val metadata: Map[String, String] =
underlying.getRawAmqpMessage.getApplicationProperties.asScala.view.collect { case (k, v: String) =>
(k, v)
}.toMap

override def ack(): F[Unit] =
F.blocking(receiver.complete(underlying)).void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class ServiceBusPusher[F[_], Data](
F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
override def push(message: Data, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
sbMessage.getApplicationProperties.putAll(metadata.asJava)
delay.traverse_ { delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
Expand All @@ -44,8 +45,12 @@ class ServiceBusPusher[F[_], Data](
.adaptError(makePushQueueException(_, queueName))
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
override def push(messages: List[(Data, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map { case (payload, metadata) =>
val sbm = new ServiceBusMessage(serializer.serialize(payload))
sbm.getApplicationProperties.putAll(metadata.asJava)
sbm
}
delay.traverse_ { delay =>
F.realTimeInstant.map { now =>
sbMessages.foreach { msg =>
Expand Down
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ lazy val core = crossProject(JVMPlatform)
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration")
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push")
)
)

Expand Down Expand Up @@ -90,6 +92,10 @@ lazy val otel4s = crossProject(JVMPlatform)
description := "Support for metrics and tracing using otel4s",
libraryDependencies ++= List(
"org.typelevel" %%% "otel4s-core" % "0.7.0"
),
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.otel4s.MeasuringQueuePusher.push")
)
)
.dependsOn(core % "compile->compile;test->test")
Expand All @@ -114,6 +120,11 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
name := "fs2-queues-azure-service-bus",
libraryDependencies ++= List(
"com.azure" % "azure-messaging-servicebus" % "7.17.0"
),
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.azure.servicebus.ServiceBusPusher.push")
)
)
.dependsOn(core, testkit % Test)
Expand All @@ -129,7 +140,8 @@ lazy val awsSQS = crossProject(JVMPlatform)
),
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSMessageContext.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSMessageContext.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSPusher.push")
)
)
.dependsOn(core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10)(upstream: Stream[F, T]): Stream[F, Nothing] =
def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/com/commercetools/queue/QueuePusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ trait QueuePusher[F[_], T] {
/**
* Publishes a single message to the queue, with an optional delay.
*/
def push(message: T, delay: Option[FiniteDuration]): F[Unit]
def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit]

/**
* Publishes a bunch of messages to the queue, with an optional delay.
*/
def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit]
def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ class TestQueue[T](
Chunk.chain(newlyLocked.map(_._2)))
}

def enqeueMessages(messages: List[T], delay: Option[FiniteDuration]) =
def enqeueMessages(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]) =
state.evalUpdate { state =>
for {
now <- IO.realTimeInstant
state <- update(state)
} yield delay match {
case None =>
state.copy(available = state.available.addAll(messages.map(TestMessage(_, now))))
state.copy(available = state.available.addAll(messages.map(x => TestMessage(x._1, now))))
case Some(delay) =>
val delayed = now.plusMillis(delay.toMillis)
state.copy(delayed = messages.map(TestMessage(_, delayed)) reverse_::: state.delayed)
state.copy(delayed = messages.map(x => TestMessage(x._1, delayed)) reverse_::: state.delayed)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] {

override val queueName: String = queue.name

override def push(message: T, delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(message :: Nil, delay)
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages((message, metadata) :: Nil, delay)

override def push(messages: List[T], delay: Option[FiniteDuration]): IO[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(messages, delay)

}
8 changes: 4 additions & 4 deletions docs/getting-started/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ The pipe takes a parameter allowing for batching publications.
```scala mdoc:compile-only
import fs2.{Pipe, Stream}

val input: Stream[IO, String] = ???
val input: Stream[IO, (String, Map[String, String])] = ???

// messages are published in batch of 10
val publicationSink: Pipe[IO, String, Nothing] = publisher.sink(batchSize = 10)
val publicationSink: Pipe[IO, (String, Map[String, String]), Nothing] = publisher.sink(batchSize = 10)

// pipe the message producing stream through the publication sink
input.through(publicationSink)
Expand All @@ -48,7 +48,7 @@ A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usuall

```scala mdoc:compile-only
publisher.pusher.use { queuePusher =>
val produceMessages: IO[List[String]] = ???
val produceMessages: IO[List[(String, Map[String, String])]] = ???

// produce a batch
produceMessages
Expand All @@ -69,7 +69,7 @@ If you need to spawn background fibers using the `queuePusher`, you can for inst
import cats.effect.std.Supervisor

publisher.pusher.use { queuePusher =>
val produceMessages: IO[List[String]] = ???
val produceMessages: IO[List[(String, Map[String, String])]] = ???

// create a supervisor that waits for supervised spawn fibers
// to finish before being released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ class PubSubPusher[F[_], T](
serializer: Serializer[T])
extends QueuePusher[F, T] {

private def makeMessage(payload: T, waitUntil: Option[Instant]): F[PubsubMessage] =
private def makeMessage(payload: T, metadata: Map[String, String], waitUntil: Option[Instant]): F[PubsubMessage] =
F.delay {
val builder = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(serializer.serialize(payload)))
builder.putAllAttributes(metadata.asJava)
waitUntil.foreach(waitUntil => builder.putAttributes(delayAttribute, waitUntil.toString()))
builder.build
}

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msg <- makeMessage(message, waitUntil)
msg <- makeMessage(message, metadata, waitUntil)
_ <- wrapFuture(
F.delay(
publisher
Expand All @@ -58,10 +59,10 @@ class PubSubPusher[F[_], T](
} yield ())
.adaptError(makePushQueueException(_, queueName))

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
(for {
waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis)))
msgs <- messages.traverse(makeMessage(_, waitUntil))
msgs <- messages.traverse { case (payload, metadata) => makeMessage(payload, metadata, waitUntil) }
_ <- wrapFuture(
F.delay(publisher
.publishCallable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ class MeasuringQueuePusher[F[_], T](

override def queueName: String = underlying.queueName

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessage")
.surround {
underlying
.push(message, delay)
.push(message, metadata, delay)
}
.guaranteeCase(metrics.send)

override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] =
override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] =
tracer
.span("queue.pushMessages")
.surround {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class MeasuringPusherSuite extends CatsEffectSuite {

override def queueName: String = self.queueName

override def push(message: String, delay: Option[FiniteDuration]): IO[Unit] = result
override def push(message: String, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] =
result

override def push(messages: List[String], delay: Option[FiniteDuration]): IO[Unit] = result
override def push(messages: List[(String, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] =
result

}

Expand All @@ -47,7 +49,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -61,7 +63,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isSuccess), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -78,7 +80,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -95,7 +97,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
new QueueMetrics(queueName, counter),
Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isError), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -109,7 +111,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push("msg", None).start
fiber <- measuringPusher.push("msg", Map.empty, None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand All @@ -123,7 +125,7 @@ class MeasuringPusherSuite extends CatsEffectSuite {
val measuringPusher =
new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop)
for {
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start
fiber <- measuringPusher.push(List("msg1", "msg2", "msg3").map(x => (x, Map.empty)), None).start
_ <- assertIO(fiber.join.map(_.isCanceled), true)
_ <- assertIO(
counter.records.get,
Expand Down
Loading

0 comments on commit b6be5e6

Please sign in to comment.