From 5c6dd23e17e864cb4d9032ee5c0848507ea48711 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 29 Mar 2024 10:40:41 +0100 Subject: [PATCH] Avoid recreating a metrics closure on each call Components having metrics now take an instance of the `QueueMetrics` class, which allows for instantiating the outcome handling closure only once when the `QueueSubscriber` or `QueuePublisher` is created. --- .../queue/aws/sqs/SQSClient.scala | 4 +- .../queue/aws/sqs/SQSPublisher.scala | 3 +- .../queue/aws/sqs/SQSPuller.scala | 1 + .../queue/aws/sqs/SQSPusher.scala | 8 +- .../queue/aws/sqs/SQSSubscriber.scala | 7 +- .../azure/servicebus/ServiceBusClient.scala | 2 +- .../azure/servicebus/ServiceBusPuller.scala | 1 + .../azure/servicebus/ServiceBusPusher.scala | 7 +- .../servicebus/ServiceBusQueuePublisher.scala | 6 +- .../ServiceBusQueueSubscriber.scala | 6 +- .../commercetools/queue/QueuePublisher.scala | 3 + .../com/commercetools/queue/QueuePuller.scala | 3 + .../com/commercetools/queue/QueuePusher.scala | 3 + .../commercetools/queue/QueueSubscriber.scala | 3 + .../commercetools/queue/SubscriberSuite.scala | 3 +- .../queue/testing/TestQueue.scala | 1 + .../queue/testing/TestQueuePublisher.scala | 2 + .../queue/testing/TestQueuePuller.scala | 2 + .../queue/testing/TestQueuePusher.scala | 2 + .../queue/testing/TestQueueSubscriber.scala | 2 + .../queue/otel4s/Attributes.scala | 34 -------- .../otel4s/MeasuringMessageContext.scala | 9 +- .../otel4s/MeasuringQueueAdministration.scala | 7 +- .../otel4s/MeasuringQueuePublisher.scala | 4 +- .../queue/otel4s/MeasuringQueuePuller.scala | 9 +- .../queue/otel4s/MeasuringQueuePusher.scala | 9 +- .../otel4s/MeasuringQueueSubscriber.scala | 4 +- .../queue/otel4s/QueueMetrics.scala | 67 +++++++++++++++ .../commercetools/queue/otel4s/package.scala | 15 +--- .../otel4s/MeasuringMessageContext.scala | 86 +++++++++++++++---- .../queue/otel4s/MeasuringPullerSuite.scala | 34 ++++++-- .../queue/otel4s/MeasuringPusherSuite.scala | 54 +++++++++--- 32 files changed, 284 insertions(+), 117 deletions(-) delete mode 100644 otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala create mode 100644 otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala index 7732729..0afe19d 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala @@ -40,10 +40,10 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext new SQSAdministration(client, getQueueUrl(_)) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = - new SQSPublisher(client, getQueueUrl(name)) + new SQSPublisher(name, client, getQueueUrl(name)) override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = - new SQSSubscriber[F, T](getQueueUrl(name), client) + new SQSSubscriber[F, T](name, client, getQueueUrl(name)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala index c70e9c6..0f759bb 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPublisher.scala @@ -21,6 +21,7 @@ import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer} import software.amazon.awssdk.services.sqs.SqsAsyncClient class SQSPublisher[F[_], T]( + val queueName: String, client: SqsAsyncClient, getQueueUrl: F[String] )(implicit @@ -29,6 +30,6 @@ class SQSPublisher[F[_], T]( extends QueuePublisher[F, T] { override def pusher: Resource[F, QueuePusher[F, T]] = - Resource.eval(getQueueUrl).map(new SQSPusher(client, _)) + Resource.eval(getQueueUrl).map(new SQSPusher(queueName, client, _)) } diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala index c85fc95..14279ae 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ class SQSPuller[F[_], T]( + val queueName: String, client: SqsAsyncClient, queueUrl: String, lockTTL: Int diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala index 4005e1f..26ee52f 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala @@ -25,7 +25,13 @@ import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendM import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -class SQSPusher[F[_], T](client: SqsAsyncClient, queueUrl: String)(implicit serializer: Serializer[T], F: Async[F]) +class SQSPusher[F[_], T]( + val queueName: String, + client: SqsAsyncClient, + queueUrl: String +)(implicit + serializer: Serializer[T], + F: Async[F]) extends QueuePusher[F, T] { override def push(message: T, delay: Option[FiniteDuration]): F[Unit] = diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala index 3f53ae1..65802cc 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSSubscriber.scala @@ -23,8 +23,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName} class SQSSubscriber[F[_], T]( - getQueueUrl: F[String], - client: SqsAsyncClient + val queueName: String, + client: SqsAsyncClient, + getQueueUrl: F[String] )(implicit F: Async[F], deserializer: Deserializer[T]) @@ -47,7 +48,7 @@ class SQSSubscriber[F[_], T]( for { queueUrl <- getQueueUrl lockTTL <- getLockTTL(queueUrl) - } yield new SQSPuller(client, queueUrl, lockTTL) + } yield new SQSPuller(queueName, client, queueUrl, lockTTL) } } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala index 45bc597..56977e0 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala @@ -33,7 +33,7 @@ class ServiceBusClient[F[_]] private ( new ServiceBusAdministration(adminBuilder.buildClient()) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = - new ServiceBusQueuePublisher[F, T](clientBuilder, name) + new ServiceBusQueuePublisher[F, T](name, clientBuilder) override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new ServiceBusQueueSubscriber[F, T](name, clientBuilder) diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala index 205d19e..64eb8b2 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ class ServiceBusPuller[F[_], Data]( + val queueName: String, receiver: ServiceBusReceiverClient )(implicit F: Async[F], diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala index 24fa039..8122345 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala @@ -25,7 +25,12 @@ import java.time.ZoneOffset import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit serializer: Serializer[Data], F: Async[F]) +class ServiceBusPusher[F[_], Data]( + val queueName: String, + sender: ServiceBusSenderClient +)(implicit + serializer: Serializer[Data], + F: Async[F]) extends QueuePusher[F, Data] { override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = { diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala index df0ef70..a71a17c 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueuePublisher.scala @@ -21,8 +21,8 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer} class ServiceBusQueuePublisher[F[_], Data]( - clientBuilder: ServiceBusClientBuilder, - queueName: String + val queueName: String, + clientBuilder: ServiceBusClientBuilder )(implicit F: Async[F], serializer: Serializer[Data]) @@ -35,6 +35,6 @@ class ServiceBusQueuePublisher[F[_], Data]( } { s => F.delay(s.close()) } - .map(new ServiceBusPusher(_)) + .map(new ServiceBusPusher(queueName, _)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala index 54d37c6..3fd7b3f 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusQueueSubscriber.scala @@ -21,7 +21,7 @@ import com.azure.messaging.servicebus.models.ServiceBusReceiveMode import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber} class ServiceBusQueueSubscriber[F[_], Data]( - name: String, + val queueName: String, builder: ServiceBusClientBuilder )(implicit F: Async[F], @@ -33,14 +33,14 @@ class ServiceBusQueueSubscriber[F[_], Data]( F.delay { builder .receiver() - .queueName(name) + .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .disableAutoComplete() .buildClient() } } .map { receiver => - new ServiceBusPuller(receiver) + new ServiceBusPuller(queueName, receiver) } } diff --git a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala index 027b830..c00998c 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala @@ -24,6 +24,9 @@ import fs2.Stream */ abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) { + /** The queue name to which this publisher publishes. */ + def queueName: String + /** * Returns a way to bush messages into the queue. * This is a low-level construct, mainly aiming at integrating existing diff --git a/core/src/main/scala/com/commercetools/queue/QueuePuller.scala b/core/src/main/scala/com/commercetools/queue/QueuePuller.scala index 052056b..50300be 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePuller.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePuller.scala @@ -25,6 +25,9 @@ import scala.concurrent.duration.FiniteDuration */ trait QueuePuller[F[_], T] { + /** The queue name from which this puller is pulling. */ + def queueName: String + /** * Pulls one batch of messages from the underlying queue system. * diff --git a/core/src/main/scala/com/commercetools/queue/QueuePusher.scala b/core/src/main/scala/com/commercetools/queue/QueuePusher.scala index 8fd7fb8..af8b877 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePusher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePusher.scala @@ -24,6 +24,9 @@ import scala.concurrent.duration.FiniteDuration */ trait QueuePusher[F[_], T] { + /** The queue name to which this pusher is pushing. */ + def queueName: String + /** * Publishes a single message to the queue, with an optional delay. */ diff --git a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala index 493aaec..974bff7 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala @@ -28,6 +28,9 @@ import scala.concurrent.duration.FiniteDuration */ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) { + /** The queue name to which this subscriber subscribes. */ + def queueName: String + /** * Returns a way to pull batches from the queue. * This is a low-level construct mainly aiming at integrating with existing diff --git a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala index 3642622..5b973f8 100644 --- a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala +++ b/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala @@ -33,7 +33,8 @@ class SubscriberSuite extends CatsEffectSuite { AtomicCell[IO] .of(testing.QueueState[String](Heap.empty, List.empty, Map.empty)) .map { state => - val queue = new TestQueue[String](state = state, messageTTL = 15.minutes, lockTTL = 1.minute) + val queue = + new TestQueue[String](name = "test-queue", state = state, messageTTL = 15.minutes, lockTTL = 1.minute) (queue, new TestQueueSubscriber(queue)) } .toResource) diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala index 18444ac..3d50fa8 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala @@ -27,6 +27,7 @@ import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration class TestQueue[T]( + val name: String, state: AtomicCell[IO, QueueState[T]], val messageTTL: FiniteDuration, val lockTTL: FiniteDuration) { diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala index 558db07..7aa5b4b 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala @@ -21,6 +21,8 @@ import com.commercetools.queue.{QueuePublisher, QueuePusher} class TestQueuePublisher[T](queue: TestQueue[T]) extends QueuePublisher[IO, T] { + override val queueName = queue.name + override def pusher: Resource[IO, QueuePusher[IO, T]] = Resource.pure(new TestQueuePusher(queue)) } diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala index f44fd19..dcb860c 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration.FiniteDuration class TestQueuePuller[T](queue: TestQueue[T]) extends QueuePuller[IO, T] { + override val queueName: String = queue.name + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = IO.sleep(waitingTime) *> queue.lockMessages(batchSize) diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala index 6c4d5b5..57c77df 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala @@ -23,6 +23,8 @@ import scala.concurrent.duration.FiniteDuration class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] { + override val queueName: String = queue.name + override def push(message: T, delay: Option[FiniteDuration]): IO[Unit] = queue.enqeueMessages(message :: Nil, delay) diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala index 48daf41..72c6abc 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala +++ b/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala @@ -21,6 +21,8 @@ import com.commercetools.queue.{QueuePuller, QueueSubscriber} class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[IO, T] { + override val queueName: String = queue.name + override def puller: Resource[IO, QueuePuller[IO, T]] = Resource.pure(new TestQueuePuller(queue)) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala deleted file mode 100644 index 0d140f2..0000000 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.otel4s - -import org.typelevel.otel4s.Attribute - -object Attributes { - final val send = Attribute("method", "send") - final val receive = Attribute("method", "receive") - final val create = Attribute("method", "create") - final val delete = Attribute("method", "delete") - final val exist = Attribute("method", "exist") - final val ack = Attribute("method", "ack") - final val nack = Attribute("method", "nack") - final val extendLock = Attribute("method", "extendLock") - - final val success = Attribute("outcome", "success") - final val failure = Attribute("outcome", "failure") - final val cancelation = Attribute("outcome", "cancelation") -} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala index 02f3675..70291cb 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala @@ -19,14 +19,13 @@ package com.commercetools.queue.otel4s import cats.effect.Temporal import cats.effect.syntax.monadCancel._ import com.commercetools.queue.MessageContext -import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer import java.time.Instant class MeasuringMessageContext[F[_], T]( underlying: MessageContext[F, T], - requestCounter: Counter[F, Long], + metrics: QueueMetrics[F], tracer: Tracer[F] )(implicit F: Temporal[F]) extends MessageContext[F, T] { @@ -45,7 +44,7 @@ class MeasuringMessageContext[F[_], T]( .surround { underlying.ack() } - .guaranteeCase(handleOutcome(Attributes.ack, requestCounter)) + .guaranteeCase(metrics.ack) override def nack(): F[Unit] = tracer @@ -53,7 +52,7 @@ class MeasuringMessageContext[F[_], T]( .surround { underlying.nack() } - .guaranteeCase(handleOutcome(Attributes.nack, requestCounter)) + .guaranteeCase(metrics.nack) override def extendLock(): F[Unit] = tracer @@ -61,6 +60,6 @@ class MeasuringMessageContext[F[_], T]( .surround { underlying.extendLock() } - .guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter)) + .guaranteeCase(metrics.extendLock) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala index abb01e2..d8402bb 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala @@ -19,6 +19,7 @@ package com.commercetools.queue.otel4s import cats.effect.MonadCancel import cats.effect.syntax.monadCancel._ import com.commercetools.queue.QueueAdministration +import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer @@ -37,7 +38,7 @@ class MeasuringQueueAdministration[F[_]]( .surround { underlying.create(name, messageTTL, lockTTL) } - .guaranteeCase(handleOutcome(Attributes.create, requestCounter)) + .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.create, requestCounter)) override def delete(name: String): F[Unit] = tracer @@ -45,7 +46,7 @@ class MeasuringQueueAdministration[F[_]]( .surround { underlying.delete(name) } - .guaranteeCase(handleOutcome(Attributes.delete, requestCounter)) + .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.delete, requestCounter)) override def exists(name: String): F[Boolean] = tracer @@ -53,5 +54,5 @@ class MeasuringQueueAdministration[F[_]]( .surround { underlying.exists(name) } - .guaranteeCase(handleOutcome(Attributes.exist, requestCounter)) + .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.exist, requestCounter)) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala index 2bc4a55..ba5f416 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala @@ -28,7 +28,9 @@ class MeasuringQueuePublisher[F[_], T]( )(implicit F: MonadCancel[F, Throwable]) extends QueuePublisher[F, T] { + override def queueName: String = underlying.queueName + def pusher: Resource[F, QueuePusher[F, T]] = - underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer)) + underlying.pusher.map(new MeasuringQueuePusher(_, new QueueMetrics[F](queueName, requestCounter), tracer)) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala index b41cabd..f4a2e92 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala @@ -21,26 +21,27 @@ import cats.effect.syntax.monadCancel._ import cats.syntax.functor._ import com.commercetools.queue.{MessageContext, QueuePuller} import fs2.Chunk -import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer import scala.concurrent.duration.FiniteDuration class MeasuringQueuePuller[F[_], T]( underlying: QueuePuller[F, T], - requestCounter: Counter[F, Long], + metrics: QueueMetrics[F], tracer: Tracer[F] )(implicit F: Temporal[F]) extends QueuePuller[F, T] { + override def queueName: String = underlying.queueName + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = tracer .span("queue.pullBatch") .surround { underlying .pullBatch(batchSize, waitingTime) - .map(_.map(new MeasuringMessageContext[F, T](_, requestCounter, tracer)).widen[MessageContext[F, T]]) + .map(_.map(new MeasuringMessageContext[F, T](_, metrics, tracer)).widen[MessageContext[F, T]]) } - .guaranteeCase(handleOutcome(Attributes.receive, requestCounter)) + .guaranteeCase(metrics.receive) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala index b888e95..963cf49 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala @@ -19,18 +19,19 @@ package com.commercetools.queue.otel4s import cats.effect.MonadCancel import cats.effect.syntax.monadCancel._ import com.commercetools.queue.QueuePusher -import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer import scala.concurrent.duration.FiniteDuration class MeasuringQueuePusher[F[_], T]( underlying: QueuePusher[F, T], - requestCounter: Counter[F, Long], + metrics: QueueMetrics[F], tracer: Tracer[F] )(implicit F: MonadCancel[F, Throwable]) extends QueuePusher[F, T] { + override def queueName: String = underlying.queueName + override def push(message: T, delay: Option[FiniteDuration]): F[Unit] = tracer .span("queue.pushMessage") @@ -38,7 +39,7 @@ class MeasuringQueuePusher[F[_], T]( underlying .push(message, delay) } - .guaranteeCase(handleOutcome(Attributes.send, requestCounter)) + .guaranteeCase(metrics.send) override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] = tracer @@ -47,6 +48,6 @@ class MeasuringQueuePusher[F[_], T]( underlying .push(messages, delay) } - .guaranteeCase(handleOutcome(Attributes.send, requestCounter)) + .guaranteeCase(metrics.send) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala index 714f4c8..c50418c 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala @@ -28,7 +28,9 @@ class MeasuringQueueSubscriber[F[_], T]( )(implicit F: Temporal[F]) extends QueueSubscriber[F, T] { + override def queueName: String = underlying.queueName + override def puller: Resource[F, QueuePuller[F, T]] = - underlying.puller.map(new MeasuringQueuePuller(_, requestCounter, tracer)) + underlying.puller.map(new MeasuringQueuePuller(_, new QueueMetrics[F](queueName, requestCounter), tracer)) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala new file mode 100644 index 0000000..c20df06 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.Outcome +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.metrics.Counter + +private class QueueMetrics[F[_]](queueName: String, requestCounter: Counter[F, Long]) { + final private[this] val queue = Attribute("queue", queueName) + + final val send: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.send, requestCounter) + final val receive: Outcome[F, Throwable, _] => F[Unit] = + QueueMetrics.increment(queue, QueueMetrics.receive, requestCounter) + final val ack: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.ack, requestCounter) + final val nack: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.nack, requestCounter) + final val extendLock: Outcome[F, Throwable, _] => F[Unit] = + QueueMetrics.increment(queue, QueueMetrics.extendLock, requestCounter) + +} + +private object QueueMetrics { + + // queue instance attributes + final val send = Attribute("method", "send") + final val receive = Attribute("method", "receive") + final val ack = Attribute("method", "ack") + final val nack = Attribute("method", """|nack""".stripMargin) + final val extendLock = Attribute("method", "extendLock") + + // queueue management attributes + final val create = Attribute("method", "create") + final val delete = Attribute("method", "delete") + final val exist = Attribute("method", "exist") + + final val success = Attribute("outcome", "success") + final val failure = Attribute("outcome", "failure") + final val cancelation = Attribute("outcome", "cancelation") + + def increment[F[_]]( + queue: Attribute[String], + method: Attribute[String], + counter: Counter[F, Long] + ): Outcome[F, Throwable, _] => F[Unit] = { + case Outcome.Succeeded(_) => + counter.inc(queue, method, success) + case Outcome.Errored(_) => + counter.inc(queue, method, failure) + case Outcome.Canceled() => + counter.inc(queue, method, cancelation) + } + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala index 87baa31..853bf87 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala @@ -16,23 +16,12 @@ package com.commercetools.queue -import cats.effect.{Outcome, Temporal} -import org.typelevel.otel4s.Attribute -import org.typelevel.otel4s.metrics.{Counter, Meter} +import cats.effect.Temporal +import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.trace.Tracer package object otel4s { - private[otel4s] def handleOutcome[F[_], T](method: Attribute[String], counter: Counter[F, Long]) - : Outcome[F, Throwable, T] => F[Unit] = { - case Outcome.Succeeded(_) => - counter.inc(method, Attributes.success) - case Outcome.Errored(_) => - counter.inc(method, Attributes.failure) - case Outcome.Canceled() => - counter.inc(method, Attributes.cancelation) - } - implicit class ClientOps[F[_]](val client: QueueClient[F]) extends AnyVal { /** A client tracking only metrics. */ diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala index 6699873..067a36d 100644 --- a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala @@ -20,17 +20,27 @@ import cats.data.Chain import cats.effect.IO import com.commercetools.queue.testing.TestingMessageContext import munit.CatsEffectSuite +import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Tracer class MeasuringMessageContextSuite extends CatsEffectSuite { + val queueName = "test-queue" + + val queueAttribute = Attribute("queue", queueName) + test("Succesfully acking a message should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").noop, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.ack().start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.ack, QueueMetrics.success)))) } yield () } } @@ -38,33 +48,48 @@ class MeasuringMessageContextSuite extends CatsEffectSuite { test("Failing to ack a message should increment the request counter") { NaiveCounter.create.flatMap { counter => val context = - new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + new MeasuringMessageContext[IO, String]( + TestingMessageContext("").failing(new Exception), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.ack().start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.ack, QueueMetrics.failure)))) } yield () } } test("Cancelling acking a message should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").canceled, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.ack().start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.ack, QueueMetrics.cancelation)))) } yield () } } test("Succesfully nacking a message should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").noop, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.nack().start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.nack, QueueMetrics.success)))) } yield () } } @@ -72,33 +97,48 @@ class MeasuringMessageContextSuite extends CatsEffectSuite { test("Failing to nack a message should increment the request counter") { NaiveCounter.create.flatMap { counter => val context = - new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + new MeasuringMessageContext[IO, String]( + TestingMessageContext("").failing(new Exception), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.nack().start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.nack, QueueMetrics.failure)))) } yield () } } test("Cancelling nacking a message should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").canceled, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.nack().start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.nack, QueueMetrics.cancelation)))) } yield () } } test("Succesfully extending a message lock should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").noop, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.extendLock().start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.extendLock, QueueMetrics.success)))) } yield () } } @@ -106,22 +146,32 @@ class MeasuringMessageContextSuite extends CatsEffectSuite { test("Failing to extend a message lock should increment the request counter") { NaiveCounter.create.flatMap { counter => val context = - new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + new MeasuringMessageContext[IO, String]( + TestingMessageContext("").failing(new Exception), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.extendLock().start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.extendLock, QueueMetrics.failure)))) } yield () } } test("Cancelling a message extension should increment the request counter") { NaiveCounter.create.flatMap { counter => - val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + val context = new MeasuringMessageContext[IO, String]( + TestingMessageContext("").canceled, + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- context.extendLock().start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.extendLock, QueueMetrics.cancelation)))) } yield () } } diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala index 6cff97b..51ee2ac 100644 --- a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala @@ -22,12 +22,22 @@ import com.commercetools.queue.testing.TestingMessageContext import com.commercetools.queue.{MessageContext, QueuePuller} import fs2.Chunk import munit.CatsEffectSuite +import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Tracer import scala.concurrent.duration.{Duration, FiniteDuration} class MeasuringPullerSuite extends CatsEffectSuite { + self => + + val queueName = "test-queue" + + val queueAttribute = Attribute("queue", queueName) + def puller(batch: IO[Chunk[MessageContext[IO, String]]]) = new QueuePuller[IO, String] { + + override def queueName: String = self.queueName + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, String]]] = batch } @@ -43,13 +53,15 @@ class MeasuringPullerSuite extends CatsEffectSuite { TestingMessageContext("second").noop, TestingMessageContext("third").noop, TestingMessageContext("forth").noop)))), - counter, + new QueueMetrics(queueName, counter), Tracer.noop ) for { fiber <- measuringPuller.pullBatch(0, Duration.Zero).start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.receive, QueueMetrics.success)))) } yield () } } @@ -57,11 +69,16 @@ class MeasuringPullerSuite extends CatsEffectSuite { test("Failed pulling results in incrementing the counter") { NaiveCounter.create.flatMap { counter => val measuringPuller = - new MeasuringQueuePuller[IO, String](puller(IO.raiseError(new Exception)), counter, Tracer.noop) + new MeasuringQueuePuller[IO, String]( + puller(IO.raiseError(new Exception)), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- measuringPuller.pullBatch(0, Duration.Zero).start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.receive, QueueMetrics.failure)))) } yield () } } @@ -69,11 +86,16 @@ class MeasuringPullerSuite extends CatsEffectSuite { test("Cancelled pulling results in incrementing the counter") { NaiveCounter.create.flatMap { counter => val measuringPuller = - new MeasuringQueuePuller[IO, String](puller(IO.canceled.as(Chunk.empty)), counter, Tracer.noop) + new MeasuringQueuePuller[IO, String]( + puller(IO.canceled.as(Chunk.empty)), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- measuringPuller.pullBatch(0, Duration.Zero).start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.receive, QueueMetrics.cancelation)))) } yield () } } diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala index 91d90ef..5d3bc82 100644 --- a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala @@ -20,14 +20,22 @@ import cats.data.Chain import cats.effect.IO import com.commercetools.queue.QueuePusher import munit.CatsEffectSuite +import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Tracer import scala.concurrent.duration.FiniteDuration class MeasuringPusherSuite extends CatsEffectSuite { + self => + + val queueName = "test-queue" + + val queueAttribute = Attribute("queue", queueName) def pusher(result: IO[Unit]) = new QueuePusher[IO, String] { + override def queueName: String = self.queueName + override def push(message: String, delay: Option[FiniteDuration]): IO[Unit] = result override def push(messages: List[String], delay: Option[FiniteDuration]): IO[Unit] = result @@ -36,22 +44,28 @@ class MeasuringPusherSuite extends CatsEffectSuite { test("Successfully pushing one message results in incrementing the counter") { NaiveCounter.create.flatMap { counter => - val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.unit), counter, Tracer.noop) + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop) for { fiber <- measuringPusher.push("msg", None).start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.success)))) } yield () } } test("Successfully pushing several messages results in incrementing the counter") { NaiveCounter.create.flatMap { counter => - val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.unit), counter, Tracer.noop) + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.unit), new QueueMetrics(queueName, counter), Tracer.noop) for { fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start _ <- assertIO(fiber.join.map(_.isSuccess), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.success)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.success)))) } yield () } } @@ -59,11 +73,16 @@ class MeasuringPusherSuite extends CatsEffectSuite { test("Failing to push one message results in incrementing the counter") { NaiveCounter.create.flatMap { counter => val measuringPusher = - new MeasuringQueuePusher[IO, String](pusher(IO.raiseError(new Exception)), counter, Tracer.noop) + new MeasuringQueuePusher[IO, String]( + pusher(IO.raiseError(new Exception)), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- measuringPusher.push("msg", None).start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.failure)))) } yield () } } @@ -71,33 +90,44 @@ class MeasuringPusherSuite extends CatsEffectSuite { test("Failing to push several messages results in incrementing the counter") { NaiveCounter.create.flatMap { counter => val measuringPusher = - new MeasuringQueuePusher[IO, String](pusher(IO.raiseError(new Exception)), counter, Tracer.noop) + new MeasuringQueuePusher[IO, String]( + pusher(IO.raiseError(new Exception)), + new QueueMetrics(queueName, counter), + Tracer.noop) for { fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start _ <- assertIO(fiber.join.map(_.isError), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.failure)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.failure)))) } yield () } } test("Canceling pushing one message results in incrementing the counter") { NaiveCounter.create.flatMap { counter => - val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.canceled), counter, Tracer.noop) + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop) for { fiber <- measuringPusher.push("msg", None).start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.cancelation)))) } yield () } } test("Canceling pushing several messages results in incrementing the counter") { NaiveCounter.create.flatMap { counter => - val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.canceled), counter, Tracer.noop) + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.canceled), new QueueMetrics(queueName, counter), Tracer.noop) for { fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start _ <- assertIO(fiber.join.map(_.isCanceled), true) - _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.cancelation)))) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.send, QueueMetrics.cancelation)))) } yield () } }