Skip to content

Commit

Permalink
Add support for metrics and tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Mar 21, 2024
1 parent 5c6f507 commit 3ea0674
Show file tree
Hide file tree
Showing 14 changed files with 804 additions and 1 deletion.
16 changes: 15 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe)
lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s)

val commonSettings = List(
libraryDependencies ++= Seq(
Expand All @@ -38,6 +38,20 @@ lazy val core = crossProject(JVMPlatform)
name := "cloud-queues-core"
)

lazy val otel4s = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("otel4s"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
name := "cloud-queues-otel4s",
description := "Support for metrics and tracing using otel4s",
libraryDependencies ++= List(
"org.typelevel" %%% "otel4s-core" % "0.4.0"
)
)
.dependsOn(core % "compile->compile;test->test")

lazy val circe = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("circe"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.testing

import cats.effect.IO
import com.commercetools.queue.MessageContext

import java.time.Instant

case class TestingMessageContext[T](
payload: T,
enqueuedAt: Instant = Instant.EPOCH,
messageId: String = "",
metadata: Map[String, String] = Map.empty) {
self =>

def noop: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.unit
override def nack(): IO[Unit] = IO.unit
override def extendLock(): IO[Unit] = IO.unit
}

def failing(t: Exception): MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.raiseError(t)
override def nack(): IO[Unit] = IO.raiseError(t)
override def extendLock(): IO[Unit] = IO.raiseError(t)
}

def canceled: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.canceled
override def nack(): IO[Unit] = IO.canceled
override def extendLock(): IO[Unit] = IO.canceled
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import org.typelevel.otel4s.Attribute

object Attributes {
final val send = Attribute("method", "send")
final val receive = Attribute("method", "receive")
final val create = Attribute("method", "create")
final val delete = Attribute("method", "delete")
final val exist = Attribute("method", "exist")
final val ack = Attribute("method", "ack")
final val nack = Attribute("method", "nack")
final val extendLock = Attribute("method", "extendLock")

final val success = Attribute("outcome", "success")
final val failure = Attribute("outcome", "failure")
final val cancelation = Attribute("outcome", "cancelation")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.Temporal
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.MessageContext
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import java.time.Instant

class MeasuringMessageContext[F[_], T](
underlying: MessageContext[F, T],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends MessageContext[F, T] {

override def messageId: String = underlying.messageId

override def payload: T = underlying.payload

override def enqueuedAt: Instant = underlying.enqueuedAt

override def metadata: Map[String, String] = underlying.metadata

override def ack(): F[Unit] =
tracer
.span("queue.message.ack")
.surround {
underlying.ack()
}
.guaranteeCase(handleOutcome(Attributes.ack, requestCounter))

override def nack(): F[Unit] =
tracer
.span("queue.message.nack")
.surround {
underlying.nack()
}
.guaranteeCase(handleOutcome(Attributes.nack, requestCounter))

override def extendLock(): F[Unit] =
tracer
.span("queue.message.extendLock")
.surround {
underlying.extendLock()
}
.guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.MonadCancel
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.QueueAdministration
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import scala.concurrent.duration.FiniteDuration

class MeasuringQueueAdministration[F[_]](
underlying: QueueAdministration[F],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: MonadCancel[F, Throwable])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
tracer
.span("queue.create")
.surround {
underlying.create(name, messageTTL, lockTTL)
}
.guaranteeCase(handleOutcome(Attributes.create, requestCounter))

override def delete(name: String): F[Unit] =
tracer
.span("queue.delete")
.surround {
underlying.delete(name)
}
.guaranteeCase(handleOutcome(Attributes.delete, requestCounter))

override def exists(name: String): F[Boolean] =
tracer
.span("queue.exists")
.surround {
underlying.exists(name)
}
.guaranteeCase(handleOutcome(Attributes.exist, requestCounter))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.Temporal
import cats.syntax.functor._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import org.typelevel.otel4s.metrics.{Counter, Meter}
import org.typelevel.otel4s.trace.Tracer

class MeasuringQueueClient[F[_]](
private val underlying: QueueClient[F],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends QueueClient[F] {

override def administration: QueueAdministration[F] =
new MeasuringQueueAdministration[F](underlying.administration, requestCounter, tracer)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer)

}

/** Wraps a queue client with tracing and/or metrics. */
object MeasuringQueueClient {

final val defaultRequestMetricsName = "queue.service.call"

/** A client tracking only metrics. */
def metricsOnly[F[_]](
inner: QueueClient[F],
requestMetricsName: String = defaultRequestMetricsName
)(implicit
F: Temporal[F],
meter: Meter[F]
): F[QueueClient[F]] =
wrap(inner, requestMetricsName)(F = F, meter = meter, tracer = Tracer.noop)

/** A client tracking only traces. */
def tracesOnly[F[_]](
inner: QueueClient[F]
)(implicit
F: Temporal[F],
tracer: Tracer[F]
): F[QueueClient[F]] =
wrap(inner)(F = F, meter = Meter.noop, tracer = tracer)

/** A client tracking metrics and traces according to the provided `meter` and `tracer`. */
def wrap[F[_]](
inner: QueueClient[F],
requestMetricsName: String = defaultRequestMetricsName
)(implicit
F: Temporal[F],
meter: Meter[F],
tracer: Tracer[F]
): F[QueueClient[F]] =
inner match {
case inner: MeasuringQueueClient[F] => wrap(inner.underlying)
case _ =>
meter
.counter(requestMetricsName)
.withUnit("call")
.withDescription("Counts the calls to the underlying queue service")
.create
.map(new MeasuringQueueClient(inner, _, tracer))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.{MonadCancel, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher}
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

class MeasuringQueuePublisher[F[_], T](
underlying: QueuePublisher[F, T],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: MonadCancel[F, Throwable])
extends QueuePublisher[F, T] {

def pusher: Resource[F, QueuePusher[F, T]] =
underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer))

}
Loading

0 comments on commit 3ea0674

Please sign in to comment.