Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for metrics and tracing #8

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe)
lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s)

val commonSettings = List(
libraryDependencies ++= Seq(
Expand All @@ -38,6 +38,20 @@ lazy val core = crossProject(JVMPlatform)
name := "cloud-queues-core"
)

lazy val otel4s = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("otel4s"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
name := "cloud-queues-otel4s",
description := "Support for metrics and tracing using otel4s",
libraryDependencies ++= List(
"org.typelevel" %%% "otel4s-core" % "0.4.0"
)
)
.dependsOn(core % "compile->compile;test->test")

lazy val circe = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("circe"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.testing

import cats.effect.IO
import com.commercetools.queue.MessageContext

import java.time.Instant

case class TestingMessageContext[T](
payload: T,
enqueuedAt: Instant = Instant.EPOCH,
messageId: String = "",
metadata: Map[String, String] = Map.empty) {
self =>

def noop: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.unit
override def nack(): IO[Unit] = IO.unit
override def extendLock(): IO[Unit] = IO.unit
}

def failing(t: Exception): MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.raiseError(t)
override def nack(): IO[Unit] = IO.raiseError(t)
override def extendLock(): IO[Unit] = IO.raiseError(t)
}

def canceled: MessageContext[IO, T] = new MessageContext[IO, T] {
override def messageId: String = self.messageId
override def payload: T = self.payload
override def enqueuedAt: Instant = self.enqueuedAt
override def metadata: Map[String, String] = self.metadata
override def ack(): IO[Unit] = IO.canceled
override def nack(): IO[Unit] = IO.canceled
override def extendLock(): IO[Unit] = IO.canceled
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import org.typelevel.otel4s.Attribute

object Attributes {
final val send = Attribute("method", "send")
final val receive = Attribute("method", "receive")
final val create = Attribute("method", "create")
final val delete = Attribute("method", "delete")
final val exist = Attribute("method", "exist")
final val ack = Attribute("method", "ack")
final val nack = Attribute("method", "nack")
final val extendLock = Attribute("method", "extendLock")

final val success = Attribute("outcome", "success")
final val failure = Attribute("outcome", "failure")
final val cancelation = Attribute("outcome", "cancelation")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.Temporal
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.MessageContext
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import java.time.Instant

class MeasuringMessageContext[F[_], T](
underlying: MessageContext[F, T],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends MessageContext[F, T] {

override def messageId: String = underlying.messageId

override def payload: T = underlying.payload

override def enqueuedAt: Instant = underlying.enqueuedAt

override def metadata: Map[String, String] = underlying.metadata

override def ack(): F[Unit] =
tracer
.span("queue.message.ack")
.surround {
underlying.ack()
}
.guaranteeCase(handleOutcome(Attributes.ack, requestCounter))

override def nack(): F[Unit] =
tracer
.span("queue.message.nack")
.surround {
underlying.nack()
}
.guaranteeCase(handleOutcome(Attributes.nack, requestCounter))

override def extendLock(): F[Unit] =
tracer
.span("queue.message.extendLock")
.surround {
underlying.extendLock()
}
.guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.MonadCancel
import cats.effect.syntax.monadCancel._
import com.commercetools.queue.QueueAdministration
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

import scala.concurrent.duration.FiniteDuration

class MeasuringQueueAdministration[F[_]](
underlying: QueueAdministration[F],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: MonadCancel[F, Throwable])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
tracer
.span("queue.create")
.surround {
underlying.create(name, messageTTL, lockTTL)
}
.guaranteeCase(handleOutcome(Attributes.create, requestCounter))

override def delete(name: String): F[Unit] =
tracer
.span("queue.delete")
.surround {
underlying.delete(name)
}
.guaranteeCase(handleOutcome(Attributes.delete, requestCounter))

override def exists(name: String): F[Boolean] =
tracer
.span("queue.exists")
.surround {
underlying.exists(name)
}
.guaranteeCase(handleOutcome(Attributes.exist, requestCounter))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.Temporal
import cats.syntax.functor._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import org.typelevel.otel4s.metrics.{Counter, Meter}
import org.typelevel.otel4s.trace.Tracer

class MeasuringQueueClient[F[_]](
private val underlying: QueueClient[F],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: Temporal[F])
extends QueueClient[F] {

override def administration: QueueAdministration[F] =
new MeasuringQueueAdministration[F](underlying.administration, requestCounter, tracer)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer)

}

/** Wraps a queue client with tracing and/or metrics. */
object MeasuringQueueClient {

final val defaultRequestMetricsName = "queue.service.call"

/** A client tracking only metrics. */
def metricsOnly[F[_]](
inner: QueueClient[F],
requestMetricsName: String = defaultRequestMetricsName
)(implicit
F: Temporal[F],
meter: Meter[F]
): F[QueueClient[F]] =
wrap(inner, requestMetricsName)(F = F, meter = meter, tracer = Tracer.noop)

/** A client tracking only traces. */
def tracesOnly[F[_]](
inner: QueueClient[F]
)(implicit
F: Temporal[F],
tracer: Tracer[F]
): F[QueueClient[F]] =
wrap(inner)(F = F, meter = Meter.noop, tracer = tracer)

/** A client tracking metrics and traces according to the provided `meter` and `tracer`. */
def wrap[F[_]](
inner: QueueClient[F],
requestMetricsName: String = defaultRequestMetricsName
)(implicit
F: Temporal[F],
meter: Meter[F],
tracer: Tracer[F]
): F[QueueClient[F]] =
inner match {
case inner: MeasuringQueueClient[F] => wrap(inner.underlying)
case _ =>
meter
.counter(requestMetricsName)
.withUnit("call")
.withDescription("Counts the calls to the underlying queue service")
.create
.map(new MeasuringQueueClient(inner, _, tracer))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.otel4s

import cats.effect.{MonadCancel, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher}
import org.typelevel.otel4s.metrics.Counter
import org.typelevel.otel4s.trace.Tracer

class MeasuringQueuePublisher[F[_], T](
underlying: QueuePublisher[F, T],
requestCounter: Counter[F, Long],
tracer: Tracer[F]
)(implicit F: MonadCancel[F, Throwable])
extends QueuePublisher[F, T] {

def pusher: Resource[F, QueuePusher[F, T]] =
underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer))

}
Loading