Skip to content

Commit

Permalink
Avoid recreating a metrics closure on each call
Browse files Browse the repository at this point in the history
Components having metrics now take an instance of the `QueueMetrics`
class, which allows for instantiating the outcome handling closure only
once when the `QueueSubscriber` or `QueuePublisher` is created.
  • Loading branch information
satabin committed Mar 29, 2024
1 parent 42c73b2 commit 5c6dd23
Show file tree
Hide file tree
Showing 32 changed files with 284 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
new SQSAdministration(client, getQueueUrl(_))

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new SQSPublisher(client, getQueueUrl(name))
new SQSPublisher(name, client, getQueueUrl(name))

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new SQSSubscriber[F, T](getQueueUrl(name), client)
new SQSSubscriber[F, T](name, client, getQueueUrl(name))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSPublisher[F[_], T](
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
Expand All @@ -29,6 +30,6 @@ class SQSPublisher[F[_], T](
extends QueuePublisher[F, T] {

override def pusher: Resource[F, QueuePusher[F, T]] =
Resource.eval(getQueueUrl).map(new SQSPusher(client, _))
Resource.eval(getQueueUrl).map(new SQSPusher(queueName, client, _))

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPuller[F[_], T](
val queueName: String,
client: SqsAsyncClient,
queueUrl: String,
lockTTL: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendM
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit serializer: Serializer[T], F: Async[F])
class SQSPusher[F[_], T](
val queueName: String,
client: SqsAsyncClient,
queueUrl: String
)(implicit
serializer: Serializer[T],
F: Async[F])
extends QueuePusher[F, T] {

override def push(message: T, delay: Option[FiniteDuration]): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

class SQSSubscriber[F[_], T](
getQueueUrl: F[String],
client: SqsAsyncClient
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
F: Async[F],
deserializer: Deserializer[T])
Expand All @@ -47,7 +48,7 @@ class SQSSubscriber[F[_], T](
for {
queueUrl <- getQueueUrl
lockTTL <- getLockTTL(queueUrl)
} yield new SQSPuller(client, queueUrl, lockTTL)
} yield new SQSPuller(queueName, client, queueUrl, lockTTL)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ServiceBusClient[F[_]] private (
new ServiceBusAdministration(adminBuilder.buildClient())

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new ServiceBusQueuePublisher[F, T](clientBuilder, name)
new ServiceBusQueuePublisher[F, T](name, clientBuilder)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new ServiceBusQueueSubscriber[F, T](name, clientBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPuller[F[_], Data](
val queueName: String,
receiver: ServiceBusReceiverClient
)(implicit
F: Async[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit serializer: Serializer[Data], F: Async[F])
class ServiceBusPusher[F[_], Data](
val queueName: String,
sender: ServiceBusSenderClient
)(implicit
serializer: Serializer[Data],
F: Async[F])
extends QueuePusher[F, Data] {

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}

class ServiceBusQueuePublisher[F[_], Data](
clientBuilder: ServiceBusClientBuilder,
queueName: String
val queueName: String,
clientBuilder: ServiceBusClientBuilder
)(implicit
F: Async[F],
serializer: Serializer[Data])
Expand All @@ -35,6 +35,6 @@ class ServiceBusQueuePublisher[F[_], Data](
} { s =>
F.delay(s.close())
}
.map(new ServiceBusPusher(_))
.map(new ServiceBusPusher(queueName, _))

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.azure.messaging.servicebus.models.ServiceBusReceiveMode
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}

class ServiceBusQueueSubscriber[F[_], Data](
name: String,
val queueName: String,
builder: ServiceBusClientBuilder
)(implicit
F: Async[F],
Expand All @@ -33,14 +33,14 @@ class ServiceBusQueueSubscriber[F[_], Data](
F.delay {
builder
.receiver()
.queueName(name)
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.buildClient()
}
}
.map { receiver =>
new ServiceBusPuller(receiver)
new ServiceBusPuller(queueName, receiver)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import fs2.Stream
*/
abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {

/** The queue name to which this publisher publishes. */
def queueName: String

/**
* Returns a way to bush messages into the queue.
* This is a low-level construct, mainly aiming at integrating existing
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueuePuller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import scala.concurrent.duration.FiniteDuration
*/
trait QueuePuller[F[_], T] {

/** The queue name from which this puller is pulling. */
def queueName: String

/**
* Pulls one batch of messages from the underlying queue system.
*
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueuePusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import scala.concurrent.duration.FiniteDuration
*/
trait QueuePusher[F[_], T] {

/** The queue name to which this pusher is pushing. */
def queueName: String

/**
* Publishes a single message to the queue, with an optional delay.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import scala.concurrent.duration.FiniteDuration
*/
abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {

/** The queue name to which this subscriber subscribes. */
def queueName: String

/**
* Returns a way to pull batches from the queue.
* This is a low-level construct mainly aiming at integrating with existing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class SubscriberSuite extends CatsEffectSuite {
AtomicCell[IO]
.of(testing.QueueState[String](Heap.empty, List.empty, Map.empty))
.map { state =>
val queue = new TestQueue[String](state = state, messageTTL = 15.minutes, lockTTL = 1.minute)
val queue =
new TestQueue[String](name = "test-queue", state = state, messageTTL = 15.minutes, lockTTL = 1.minute)
(queue, new TestQueueSubscriber(queue))
}
.toResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

class TestQueue[T](
val name: String,
state: AtomicCell[IO, QueueState[T]],
val messageTTL: FiniteDuration,
val lockTTL: FiniteDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.commercetools.queue.{QueuePublisher, QueuePusher}

class TestQueuePublisher[T](queue: TestQueue[T]) extends QueuePublisher[IO, T] {

override val queueName = queue.name

override def pusher: Resource[IO, QueuePusher[IO, T]] = Resource.pure(new TestQueuePusher(queue))

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent.duration.FiniteDuration

class TestQueuePuller[T](queue: TestQueue[T]) extends QueuePuller[IO, T] {

override val queueName: String = queue.name

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] =
IO.sleep(waitingTime) *> queue.lockMessages(batchSize)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import scala.concurrent.duration.FiniteDuration

class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] {

override val queueName: String = queue.name

override def push(message: T, delay: Option[FiniteDuration]): IO[Unit] =
queue.enqeueMessages(message :: Nil, delay)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.commercetools.queue.{QueuePuller, QueueSubscriber}

class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[IO, T] {

override val queueName: String = queue.name

override def puller: Resource[IO, QueuePuller[IO, T]] = Resource.pure(new TestQueuePuller(queue))

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package com.commercetools.queue.otel4s
import cats.effect.Temporal
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.MessageContext
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import java.time.Instant

class MeasuringMessageContext[F[_], T](
underlying: MessageContext[F, T],
requestCounter: Counter[F, Long],
metrics: QueueMetrics[F],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends MessageContext[F, T] {
Expand All @@ -45,22 +44,22 @@ class MeasuringMessageContext[F[_], T](
.surround {
underlying.ack()
}
.guaranteeCase(handleOutcome(Attributes.ack, requestCounter))
.guaranteeCase(metrics.ack)

override def nack(): F[Unit] =
tracer
.span("queue.message.nack")
.surround {
underlying.nack()
}
.guaranteeCase(handleOutcome(Attributes.nack, requestCounter))
.guaranteeCase(metrics.nack)

override def extendLock(): F[Unit] =
tracer
.span("queue.message.extendLock")
.surround {
underlying.extendLock()
}
.guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter))
.guaranteeCase(metrics.extendLock)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.commercetools.queue.otel4s
import cats.effect.MonadCancel
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.QueueAdministration
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

Expand All @@ -37,21 +38,21 @@ class MeasuringQueueAdministration[F[_]](
.surround {
underlying.create(name, messageTTL, lockTTL)
}
.guaranteeCase(handleOutcome(Attributes.create, requestCounter))
.guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.create, requestCounter))

override def delete(name: String): F[Unit] =
tracer
.span("queue.delete")
.surround {
underlying.delete(name)
}
.guaranteeCase(handleOutcome(Attributes.delete, requestCounter))
.guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.delete, requestCounter))

override def exists(name: String): F[Boolean] =
tracer
.span("queue.exists")
.surround {
underlying.exists(name)
}
.guaranteeCase(handleOutcome(Attributes.exist, requestCounter))
.guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.exist, requestCounter))
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class MeasuringQueuePublisher[F[_], T](
)(implicit F: MonadCancel[F, Throwable])
extends QueuePublisher[F, T] {

override def queueName: String = underlying.queueName

def pusher: Resource[F, QueuePusher[F, T]] =
underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer))
underlying.pusher.map(new MeasuringQueuePusher(_, new QueueMetrics[F](queueName, requestCounter), tracer))

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,27 @@ import cats.effect.syntax.monadCancel._
import cats.syntax.functor._
import com.commercetools.queue.{MessageContext, QueuePuller}
import fs2.Chunk
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import scala.concurrent.duration.FiniteDuration

class MeasuringQueuePuller[F[_], T](
underlying: QueuePuller[F, T],
requestCounter: Counter[F, Long],
metrics: QueueMetrics[F],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends QueuePuller[F, T] {

override def queueName: String = underlying.queueName

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
tracer
.span("queue.pullBatch")
.surround {
underlying
.pullBatch(batchSize, waitingTime)
.map(_.map(new MeasuringMessageContext[F, T](_, requestCounter, tracer)).widen[MessageContext[F, T]])
.map(_.map(new MeasuringMessageContext[F, T](_, metrics, tracer)).widen[MessageContext[F, T]])
}
.guaranteeCase(handleOutcome(Attributes.receive, requestCounter))
.guaranteeCase(metrics.receive)

}
Loading

0 comments on commit 5c6dd23

Please sign in to comment.