-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
673 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
45 changes: 45 additions & 0 deletions
45
core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package com.commercetools.queue.testing | ||
|
||
import cats.effect.IO | ||
import com.commercetools.queue.MessageContext | ||
|
||
import java.time.Instant | ||
|
||
case class TestingMessageContext[T]( | ||
payload: T, | ||
enqueuedAt: Instant = Instant.EPOCH, | ||
messageId: String = "", | ||
metadata: Map[String, String] = Map.empty) { | ||
self => | ||
|
||
def noop: MessageContext[IO, T] = new MessageContext[IO, T] { | ||
override def messageId: String = self.messageId | ||
override def payload: T = self.payload | ||
override def enqueuedAt: Instant = self.enqueuedAt | ||
override def metadata: Map[String, String] = self.metadata | ||
override def ack(): IO[Unit] = IO.unit | ||
override def nack(): IO[Unit] = IO.unit | ||
override def extendLock(): IO[Unit] = IO.unit | ||
} | ||
|
||
def failing(t: Exception): MessageContext[IO, T] = new MessageContext[IO, T] { | ||
override def messageId: String = self.messageId | ||
override def payload: T = self.payload | ||
override def enqueuedAt: Instant = self.enqueuedAt | ||
override def metadata: Map[String, String] = self.metadata | ||
override def ack(): IO[Unit] = IO.raiseError(t) | ||
override def nack(): IO[Unit] = IO.raiseError(t) | ||
override def extendLock(): IO[Unit] = IO.raiseError(t) | ||
} | ||
|
||
def canceled: MessageContext[IO, T] = new MessageContext[IO, T] { | ||
override def messageId: String = self.messageId | ||
override def payload: T = self.payload | ||
override def enqueuedAt: Instant = self.enqueuedAt | ||
override def metadata: Map[String, String] = self.metadata | ||
override def ack(): IO[Unit] = IO.canceled | ||
override def nack(): IO[Unit] = IO.canceled | ||
override def extendLock(): IO[Unit] = IO.canceled | ||
} | ||
|
||
} |
34 changes: 34 additions & 0 deletions
34
otel4s/src/main/scala/com/commercetools/queue/otel4s/Attributes.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2024 Commercetools GmbH | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.commercetools.queue.otel4s | ||
|
||
import org.typelevel.otel4s.Attribute | ||
|
||
object Attributes { | ||
final val send = Attribute("method", "send") | ||
final val receive = Attribute("method", "receive") | ||
final val create = Attribute("method", "create") | ||
final val delete = Attribute("method", "delete") | ||
final val exist = Attribute("method", "exist") | ||
final val ack = Attribute("method", "ack") | ||
final val nack = Attribute("method", "nack") | ||
final val extendLock = Attribute("method", "extendLock") | ||
|
||
final val success = Attribute("outcome", "success") | ||
final val failure = Attribute("outcome", "failure") | ||
final val cancelation = Attribute("outcome", "cancelation") | ||
} |
49 changes: 49 additions & 0 deletions
49
otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageContext.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package com.commercetools.queue.otel4s | ||
|
||
import com.commercetools.queue.MessageContext | ||
import org.typelevel.otel4s.metrics.Counter | ||
import org.typelevel.otel4s.trace.Tracer | ||
import cats.effect.Temporal | ||
import java.time.Instant | ||
import cats.effect.syntax.monadCancel._ | ||
|
||
class MeasuringMessageContext[F[_], T]( | ||
underlying: MessageContext[F, T], | ||
requestCounter: Counter[F, Long], | ||
tracer: Tracer[F] | ||
)(implicit F: Temporal[F]) | ||
extends MessageContext[F, T] { | ||
|
||
override def messageId: String = underlying.messageId | ||
|
||
override def payload: T = underlying.payload | ||
|
||
override def enqueuedAt: Instant = underlying.enqueuedAt | ||
|
||
override def metadata: Map[String, String] = underlying.metadata | ||
|
||
override def ack(): F[Unit] = | ||
tracer | ||
.span("queue.message.ack") | ||
.surround { | ||
underlying.ack() | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.ack, requestCounter)) | ||
|
||
override def nack(): F[Unit] = | ||
tracer | ||
.span("queue.message.nack") | ||
.surround { | ||
underlying.nack() | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.nack, requestCounter)) | ||
|
||
override def extendLock(): F[Unit] = | ||
tracer | ||
.span("queue.message.extendLock") | ||
.surround { | ||
underlying.extendLock() | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.extendLock, requestCounter)) | ||
|
||
} |
57 changes: 57 additions & 0 deletions
57
otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2024 Commercetools GmbH | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.commercetools.queue.otel4s | ||
|
||
import cats.effect.MonadCancel | ||
import cats.effect.syntax.monadCancel._ | ||
import com.commercetools.queue.QueueAdministration | ||
import org.typelevel.otel4s.metrics.Counter | ||
import org.typelevel.otel4s.trace.Tracer | ||
|
||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class MeasuringQueueAdministration[F[_]]( | ||
underlying: QueueAdministration[F], | ||
requestCounter: Counter[F, Long], | ||
tracer: Tracer[F] | ||
)(implicit F: MonadCancel[F, Throwable]) | ||
extends QueueAdministration[F] { | ||
|
||
override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = | ||
tracer | ||
.span("queue.create") | ||
.surround { | ||
underlying.create(name, messageTTL, lockTTL) | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.create, requestCounter)) | ||
|
||
override def delete(name: String): F[Unit] = | ||
tracer | ||
.span("queue.delete") | ||
.surround { | ||
underlying.delete(name) | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.delete, requestCounter)) | ||
|
||
override def exists(name: String): F[Boolean] = | ||
tracer | ||
.span("queue.exists") | ||
.surround { | ||
underlying.exists(name) | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.exist, requestCounter)) | ||
} |
87 changes: 87 additions & 0 deletions
87
otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright 2024 Commercetools GmbH | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.commercetools.queue.otel4s | ||
|
||
import cats.effect.Temporal | ||
import cats.syntax.functor._ | ||
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} | ||
import org.typelevel.otel4s.metrics.{Counter, Meter} | ||
import org.typelevel.otel4s.trace.Tracer | ||
|
||
class MeasuringQueueClient[F[_]]( | ||
private val underlying: QueueClient[F], | ||
requestCounter: Counter[F, Long], | ||
tracer: Tracer[F] | ||
)(implicit F: Temporal[F]) | ||
extends QueueClient[F] { | ||
|
||
override def administration: QueueAdministration[F] = | ||
new MeasuringQueueAdministration[F](underlying.administration, requestCounter, tracer) | ||
|
||
override def publish[T: Serializer](name: String): QueuePublisher[F, T] = | ||
new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer) | ||
|
||
override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = | ||
new MeasuringQueueSubscriber[F, T](underlying.subscribe(name), requestCounter, tracer) | ||
|
||
} | ||
|
||
/** Wraps a queue client with tracing and/or metrics. */ | ||
object MeasuringQueueClient { | ||
|
||
final val defaultRequestMetricsName = "queue.service.call" | ||
|
||
/** A client tracking only metrics. */ | ||
def metricsOnly[F[_]]( | ||
inner: QueueClient[F], | ||
requestMetricsName: String = defaultRequestMetricsName | ||
)(implicit | ||
F: Temporal[F], | ||
meter: Meter[F] | ||
): F[QueueClient[F]] = | ||
wrap(inner, requestMetricsName)(F = F, meter = meter, tracer = Tracer.noop) | ||
|
||
/** A client tracking only traces. */ | ||
def tracesOnly[F[_]]( | ||
inner: QueueClient[F] | ||
)(implicit | ||
F: Temporal[F], | ||
tracer: Tracer[F] | ||
): F[QueueClient[F]] = | ||
wrap(inner)(F = F, meter = Meter.noop, tracer = tracer) | ||
|
||
/** A client tracking metrics and traces according to the provided `meter` and `tracer`. */ | ||
def wrap[F[_]]( | ||
inner: QueueClient[F], | ||
requestMetricsName: String = defaultRequestMetricsName | ||
)(implicit | ||
F: Temporal[F], | ||
meter: Meter[F], | ||
tracer: Tracer[F] | ||
): F[QueueClient[F]] = | ||
inner match { | ||
case inner: MeasuringQueueClient[F] => wrap(inner.underlying) | ||
case _ => | ||
meter | ||
.counter(requestMetricsName) | ||
.withUnit("call") | ||
.withDescription("Counts the calls to the underlying queue service") | ||
.create | ||
.map(new MeasuringQueueClient(inner, _, tracer)) | ||
} | ||
|
||
} |
34 changes: 34 additions & 0 deletions
34
otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePublisher.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2024 Commercetools GmbH | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.commercetools.queue.otel4s | ||
|
||
import cats.effect.{MonadCancel, Resource} | ||
import com.commercetools.queue.{QueuePublisher, QueuePusher} | ||
import org.typelevel.otel4s.metrics.Counter | ||
import org.typelevel.otel4s.trace.Tracer | ||
|
||
class MeasuringQueuePublisher[F[_], T]( | ||
underlying: QueuePublisher[F, T], | ||
requestCounter: Counter[F, Long], | ||
tracer: Tracer[F] | ||
)(implicit F: MonadCancel[F, Throwable]) | ||
extends QueuePublisher[F, T] { | ||
|
||
def pusher: Resource[F, QueuePusher[F, T]] = | ||
underlying.pusher.map(new MeasuringQueuePusher(_, requestCounter, tracer)) | ||
|
||
} |
46 changes: 46 additions & 0 deletions
46
otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright 2024 Commercetools GmbH | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.commercetools.queue.otel4s | ||
|
||
import cats.effect.Temporal | ||
import cats.effect.syntax.monadCancel._ | ||
import cats.syntax.functor._ | ||
import com.commercetools.queue.{MessageContext, QueuePuller} | ||
import fs2.Chunk | ||
import org.typelevel.otel4s.metrics.Counter | ||
import org.typelevel.otel4s.trace.Tracer | ||
|
||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class MeasuringQueuePuller[F[_], T]( | ||
underlying: QueuePuller[F, T], | ||
requestCounter: Counter[F, Long], | ||
tracer: Tracer[F] | ||
)(implicit F: Temporal[F]) | ||
extends QueuePuller[F, T] { | ||
|
||
override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = | ||
tracer | ||
.span("queue.pullBatch") | ||
.surround { | ||
underlying | ||
.pullBatch(batchSize, waitingTime) | ||
.map(_.map(new MeasuringMessageContext[F, T](_, requestCounter, tracer)).widen[MessageContext[F, T]]) | ||
} | ||
.guaranteeCase(handleOutcome(Attributes.receive, requestCounter)) | ||
|
||
} |
Oops, something went wrong.