diff --git a/build.sbt b/build.sbt index 36dc250..19e478b 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3") ThisBuild / scalaVersion := Scala213 -lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe) +lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s) val commonSettings = List( libraryDependencies ++= Seq( @@ -38,6 +38,20 @@ lazy val core = crossProject(JVMPlatform) name := "cloud-queues-core" ) +lazy val otel4s = crossProject(JVMPlatform) + .crossType(CrossType.Pure) + .in(file("otel4s")) + .enablePlugins(NoPublishPlugin) + .settings(commonSettings) + .settings( + name := "cloud-queues-otel4s", + description := "Support for metrics and tracing using otel4s", + libraryDependencies ++= List( + "org.typelevel" %%% "otel4s-core" % "0.4.0" + ) + ) + .dependsOn(core % "compile->compile;test->test") + lazy val circe = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("circe")) diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala b/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala new file mode 100644 index 0000000..b30e0f9 --- /dev/null +++ b/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.IO +import com.commercetools.queue.MessageContext + +import java.time.Instant + +case class TestingMessageContext[T]( + payload: T, + enqueuedAt: Instant = Instant.EPOCH, + messageId: String = "", + metadata: Map[String, String] = Map.empty) { + self => + + def noop: MessageContext[IO, T] = new MessageContext[IO, T] { + override def messageId: String = self.messageId + override def payload: T = self.payload + override def enqueuedAt: Instant = self.enqueuedAt + override def metadata: Map[String, String] = self.metadata + override def ack(): IO[Unit] = IO.unit + override def nack(): IO[Unit] = IO.unit + override def extendLock(): IO[Unit] = IO.unit + } + + def failing(t: Exception): MessageContext[IO, T] = new MessageContext[IO, T] { + override def messageId: String = self.messageId + override def payload: T = self.payload + override def enqueuedAt: Instant = self.enqueuedAt + override def metadata: Map[String, String] = self.metadata + override def ack(): IO[Unit] = IO.raiseError(t) + override def nack(): IO[Unit] = IO.raiseError(t) + override def extendLock(): IO[Unit] = IO.raiseError(t) + } + + def canceled: MessageContext[IO, T] = new MessageContext[IO, T] { + override def messageId: String = self.messageId + override def payload: T = self.payload + override def enqueuedAt: Instant = self.enqueuedAt + override def metadata: Map[String, String] = self.metadata + override def ack(): IO[Unit] = IO.canceled + override def nack(): IO[Unit] = IO.canceled + override def extendLock(): IO[Unit] = IO.canceled + } + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala new file mode 100644 index 0000000..0d140f2 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import org.typelevel.otel4s.Attribute + +object Attributes { + final val send = Attribute("method", "send") + final val receive = Attribute("method", "receive") + final val create = Attribute("method", "create") + final val delete = Attribute("method", "delete") + final val exist = Attribute("method", "exist") + final val ack = Attribute("method", "ack") + final val nack = Attribute("method", "nack") + final val extendLock = Attribute("method", "extendLock") + + final val success = Attribute("outcome", "success") + final val failure = Attribute("outcome", "failure") + final val cancelation = Attribute("outcome", "cancelation") +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala new file mode 100644 index 0000000..02f3675 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.Temporal +import cats.effect.syntax.monadCancel._ +import com.commercetools.queue.MessageContext +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +import java.time.Instant + +class MeasuringMessageContext[F[_], T]( + underlying: MessageContext[F, T], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends MessageContext[F, T] { + + override def messageId: String = underlying.messageId + + override def payload: T = underlying.payload + + override def enqueuedAt: Instant = underlying.enqueuedAt + + override def metadata: Map[String, String] = underlying.metadata + + override def ack(): F[Unit] = + tracer + .span("queue.message.ack") + .surround { + underlying.ack() + } + .guaranteeCase(handleOutcome(Attributes.ack, requestCounter)) + + override def nack(): F[Unit] = + tracer + .span("queue.message.nack") + .surround { + underlying.nack() + } + .guaranteeCase(handleOutcome(Attributes.nack, requestCounter)) + + override def extendLock(): F[Unit] = + tracer + .span("queue.message.extendLock") + .surround { + underlying.extendLock() + } + .guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter)) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala new file mode 100644 index 0000000..abb01e2 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.MonadCancel +import cats.effect.syntax.monadCancel._ +import com.commercetools.queue.QueueAdministration +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +import scala.concurrent.duration.FiniteDuration + +class MeasuringQueueAdministration[F[_]]( + underlying: QueueAdministration[F], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: MonadCancel[F, Throwable]) + extends QueueAdministration[F] { + + override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = + tracer + .span("queue.create") + .surround { + underlying.create(name, messageTTL, lockTTL) + } + .guaranteeCase(handleOutcome(Attributes.create, requestCounter)) + + override def delete(name: String): F[Unit] = + tracer + .span("queue.delete") + .surround { + underlying.delete(name) + } + .guaranteeCase(handleOutcome(Attributes.delete, requestCounter)) + + override def exists(name: String): F[Boolean] = + tracer + .span("queue.exists") + .surround { + underlying.exists(name) + } + .guaranteeCase(handleOutcome(Attributes.exist, requestCounter)) +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala new file mode 100644 index 0000000..5d9c675 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.Temporal +import cats.syntax.functor._ +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import org.typelevel.otel4s.metrics.{Counter, Meter} +import org.typelevel.otel4s.trace.Tracer + +class MeasuringQueueClient[F[_]]( + private val underlying: QueueClient[F], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends QueueClient[F] { + + override def administration: QueueAdministration[F] = + new MeasuringQueueAdministration[F](underlying.administration, requestCounter, tracer) + + override def publish[T: Serializer](name: String): QueuePublisher[F, T] = + new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer) + + override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = + new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer) + +} + +/** Wraps a queue client with tracing and/or metrics. */ +object MeasuringQueueClient { + + final val defaultRequestMetricsName = "queue.service.call" + + /** A client tracking only metrics. */ + def metricsOnly[F[_]]( + inner: QueueClient[F], + requestMetricsName: String = defaultRequestMetricsName + )(implicit + F: Temporal[F], + meter: Meter[F] + ): F[QueueClient[F]] = + wrap(inner, requestMetricsName)(F = F, meter = meter, tracer = Tracer.noop) + + /** A client tracking only traces. */ + def tracesOnly[F[_]]( + inner: QueueClient[F] + )(implicit + F: Temporal[F], + tracer: Tracer[F] + ): F[QueueClient[F]] = + wrap(inner)(F = F, meter = Meter.noop, tracer = tracer) + + /** A client tracking metrics and traces according to the provided `meter` and `tracer`. */ + def wrap[F[_]]( + inner: QueueClient[F], + requestMetricsName: String = defaultRequestMetricsName + )(implicit + F: Temporal[F], + meter: Meter[F], + tracer: Tracer[F] + ): F[QueueClient[F]] = + inner match { + case inner: MeasuringQueueClient[F] => wrap(inner.underlying) + case _ => + meter + .counter(requestMetricsName) + .withUnit("call") + .withDescription("Counts the calls to the underlying queue service") + .create + .map(new MeasuringQueueClient(inner, _, tracer)) + } + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala new file mode 100644 index 0000000..2bc4a55 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.{MonadCancel, Resource} +import com.commercetools.queue.{QueuePublisher, QueuePusher} +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +class MeasuringQueuePublisher[F[_], T]( + underlying: QueuePublisher[F, T], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: MonadCancel[F, Throwable]) + extends QueuePublisher[F, T] { + + def pusher: Resource[F, QueuePusher[F, T]] = + underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer)) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala new file mode 100644 index 0000000..b41cabd --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.Temporal +import cats.effect.syntax.monadCancel._ +import cats.syntax.functor._ +import com.commercetools.queue.{MessageContext, QueuePuller} +import fs2.Chunk +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +import scala.concurrent.duration.FiniteDuration + +class MeasuringQueuePuller[F[_], T]( + underlying: QueuePuller[F, T], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends QueuePuller[F, T] { + + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = + tracer + .span("queue.pullBatch") + .surround { + underlying + .pullBatch(batchSize, waitingTime) + .map(_.map(new MeasuringMessageContext[F, T](_, requestCounter, tracer)).widen[MessageContext[F, T]]) + } + .guaranteeCase(handleOutcome(Attributes.receive, requestCounter)) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala new file mode 100644 index 0000000..b888e95 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.MonadCancel +import cats.effect.syntax.monadCancel._ +import com.commercetools.queue.QueuePusher +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +import scala.concurrent.duration.FiniteDuration + +class MeasuringQueuePusher[F[_], T]( + underlying: QueuePusher[F, T], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: MonadCancel[F, Throwable]) + extends QueuePusher[F, T] { + + override def push(message: T, delay: Option[FiniteDuration]): F[Unit] = + tracer + .span("queue.pushMessage") + .surround { + underlying + .push(message, delay) + } + .guaranteeCase(handleOutcome(Attributes.send, requestCounter)) + + override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] = + tracer + .span("queue.pushMessages") + .surround { + underlying + .push(messages, delay) + } + .guaranteeCase(handleOutcome(Attributes.send, requestCounter)) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala new file mode 100644 index 0000000..714f4c8 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueSubscriber.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.{Resource, Temporal} +import com.commercetools.queue.{QueuePuller, QueueSubscriber} +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +class MeasuringQueueSubscriber[F[_], T]( + underlying: QueueSubscriber[F, T], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends QueueSubscriber[F, T] { + + override def puller: Resource[F, QueuePuller[F, T]] = + underlying.puller.map(new MeasuringQueuePuller(_, requestCounter, tracer)) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala new file mode 100644 index 0000000..87baa31 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/package.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import cats.effect.{Outcome, Temporal} +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.metrics.{Counter, Meter} +import org.typelevel.otel4s.trace.Tracer + +package object otel4s { + + private[otel4s] def handleOutcome[F[_], T](method: Attribute[String], counter: Counter[F, Long]) + : Outcome[F, Throwable, T] => F[Unit] = { + case Outcome.Succeeded(_) => + counter.inc(method, Attributes.success) + case Outcome.Errored(_) => + counter.inc(method, Attributes.failure) + case Outcome.Canceled() => + counter.inc(method, Attributes.cancelation) + } + + implicit class ClientOps[F[_]](val client: QueueClient[F]) extends AnyVal { + + /** A client tracking only metrics. */ + def withMetrics( + requestMetricsName: String = MeasuringQueueClient.defaultRequestMetricsName + )(implicit + F: Temporal[F], + meter: Meter[F] + ): F[QueueClient[F]] = + MeasuringQueueClient.metricsOnly(client, requestMetricsName) + + /** A client tracking only traces. */ + def withTraces(implicit F: Temporal[F], tracer: Tracer[F]): F[QueueClient[F]] = + MeasuringQueueClient.tracesOnly(client) + + /** A client tracking metrics and traces according to the provided `meter` and `tracer`. */ + def withMetricsAndTraces( + requestMetricsName: String = MeasuringQueueClient.defaultRequestMetricsName + )(implicit + F: Temporal[F], + meter: Meter[F], + tracer: Tracer[F] + ): F[QueueClient[F]] = + MeasuringQueueClient.wrap(client, requestMetricsName) + + } + +} diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala new file mode 100644 index 0000000..6699873 --- /dev/null +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.data.Chain +import cats.effect.IO +import com.commercetools.queue.testing.TestingMessageContext +import munit.CatsEffectSuite +import org.typelevel.otel4s.trace.Tracer + +class MeasuringMessageContextSuite extends CatsEffectSuite { + + test("Succesfully acking a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + for { + fiber <- context.ack().start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.success)))) + } yield () + } + } + + test("Failing to ack a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = + new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + for { + fiber <- context.ack().start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.failure)))) + } yield () + } + } + + test("Cancelling acking a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + for { + fiber <- context.ack().start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.ack, Attributes.cancelation)))) + } yield () + } + } + + test("Succesfully nacking a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + for { + fiber <- context.nack().start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.success)))) + } yield () + } + } + + test("Failing to nack a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = + new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + for { + fiber <- context.nack().start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.failure)))) + } yield () + } + } + + test("Cancelling nacking a message should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + for { + fiber <- context.nack().start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.nack, Attributes.cancelation)))) + } yield () + } + } + + test("Succesfully extending a message lock should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").noop, counter, Tracer.noop) + for { + fiber <- context.extendLock().start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.success)))) + } yield () + } + } + + test("Failing to extend a message lock should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = + new MeasuringMessageContext[IO, String](TestingMessageContext("").failing(new Exception), counter, Tracer.noop) + for { + fiber <- context.extendLock().start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.failure)))) + } yield () + } + } + + test("Cancelling a message extension should increment the request counter") { + NaiveCounter.create.flatMap { counter => + val context = new MeasuringMessageContext[IO, String](TestingMessageContext("").canceled, counter, Tracer.noop) + for { + fiber <- context.extendLock().start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.extendLock, Attributes.cancelation)))) + } yield () + } + } + +} diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala new file mode 100644 index 0000000..6cff97b --- /dev/null +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.data.Chain +import cats.effect.IO +import com.commercetools.queue.testing.TestingMessageContext +import com.commercetools.queue.{MessageContext, QueuePuller} +import fs2.Chunk +import munit.CatsEffectSuite +import org.typelevel.otel4s.trace.Tracer + +import scala.concurrent.duration.{Duration, FiniteDuration} + +class MeasuringPullerSuite extends CatsEffectSuite { + def puller(batch: IO[Chunk[MessageContext[IO, String]]]) = new QueuePuller[IO, String] { + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, String]]] = + batch + } + + test("Successful pulling results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPuller = new MeasuringQueuePuller[IO, String]( + puller( + IO.pure( + Chunk.from( + List( + TestingMessageContext("first").noop, + TestingMessageContext("second").noop, + TestingMessageContext("third").noop, + TestingMessageContext("forth").noop)))), + counter, + Tracer.noop + ) + for { + fiber <- measuringPuller.pullBatch(0, Duration.Zero).start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.success)))) + } yield () + } + } + + test("Failed pulling results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPuller = + new MeasuringQueuePuller[IO, String](puller(IO.raiseError(new Exception)), counter, Tracer.noop) + for { + fiber <- measuringPuller.pullBatch(0, Duration.Zero).start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.failure)))) + } yield () + } + } + + test("Cancelled pulling results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPuller = + new MeasuringQueuePuller[IO, String](puller(IO.canceled.as(Chunk.empty)), counter, Tracer.noop) + for { + fiber <- measuringPuller.pullBatch(0, Duration.Zero).start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.receive, Attributes.cancelation)))) + } yield () + } + } + +} diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala new file mode 100644 index 0000000..91d90ef --- /dev/null +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.data.Chain +import cats.effect.IO +import com.commercetools.queue.QueuePusher +import munit.CatsEffectSuite +import org.typelevel.otel4s.trace.Tracer + +import scala.concurrent.duration.FiniteDuration + +class MeasuringPusherSuite extends CatsEffectSuite { + + def pusher(result: IO[Unit]) = new QueuePusher[IO, String] { + + override def push(message: String, delay: Option[FiniteDuration]): IO[Unit] = result + + override def push(messages: List[String], delay: Option[FiniteDuration]): IO[Unit] = result + + } + + test("Successfully pushing one message results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.unit), counter, Tracer.noop) + for { + fiber <- measuringPusher.push("msg", None).start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.success)))) + } yield () + } + } + + test("Successfully pushing several messages results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.unit), counter, Tracer.noop) + for { + fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.success)))) + } yield () + } + } + + test("Failing to push one message results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.raiseError(new Exception)), counter, Tracer.noop) + for { + fiber <- measuringPusher.push("msg", None).start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.failure)))) + } yield () + } + } + + test("Failing to push several messages results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = + new MeasuringQueuePusher[IO, String](pusher(IO.raiseError(new Exception)), counter, Tracer.noop) + for { + fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start + _ <- assertIO(fiber.join.map(_.isError), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.failure)))) + } yield () + } + } + + test("Canceling pushing one message results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.canceled), counter, Tracer.noop) + for { + fiber <- measuringPusher.push("msg", None).start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.cancelation)))) + } yield () + } + } + + test("Canceling pushing several messages results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPusher = new MeasuringQueuePusher[IO, String](pusher(IO.canceled), counter, Tracer.noop) + for { + fiber <- measuringPusher.push(List("msg1", "msg2", "msg3"), None).start + _ <- assertIO(fiber.join.map(_.isCanceled), true) + _ <- assertIO(counter.records.get, Chain.one((1L, List(Attributes.send, Attributes.cancelation)))) + } yield () + } + } + +} diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/NaiveCounter.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/NaiveCounter.scala new file mode 100644 index 0000000..15d5969 --- /dev/null +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/NaiveCounter.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.data.Chain +import cats.effect.{IO, Ref} +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.meta.InstrumentMeta +import org.typelevel.otel4s.metrics.Counter + +class NaiveCounter(val records: Ref[IO, Chain[(Long, List[Attribute[_]])]]) extends Counter[IO, Long] { + + override val backend: Counter.Backend[IO, Long] = new Counter.LongBackend[IO] { + + override val meta: InstrumentMeta[IO] = InstrumentMeta.enabled + + override def add(value: Long, attributes: Attribute[_]*): IO[Unit] = + records.update(_.append((value, attributes.toList))) + + } + +} + +object NaiveCounter { + + def create: IO[NaiveCounter] = + Ref[IO] + .of(Chain.empty[(Long, List[Attribute[_]])]) + .map(new NaiveCounter(_)) + +}