diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala index 4f82290..506f3bc 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSClient.scala @@ -19,7 +19,7 @@ package com.commercetools.queue.aws.sqs import cats.effect.{Async, Resource} import cats.syntax.functor._ import cats.syntax.monadError._ -import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer} import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import software.amazon.awssdk.http.async.SdkAsyncHttpClient import software.amazon.awssdk.regions.Region @@ -41,6 +41,9 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext override def administration: QueueAdministration[F] = new SQSAdministration(client, getQueueUrl(_)) + override def statistics(name: String): QueueStatistics[F] = + new SQSStatistics(name, client, getQueueUrl(name)) + override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new SQSPublisher(name, client, getQueueUrl(name)) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatistics.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatistics.scala new file mode 100644 index 0000000..42c425c --- /dev/null +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatistics.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.aws.sqs + +import cats.effect.{Async, Resource} +import cats.syntax.functor._ +import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher} +import software.amazon.awssdk.services.sqs.SqsAsyncClient + +class SQSStatistics[F[_]](val queueName: String, client: SqsAsyncClient, getQueueUrl: F[String])(implicit F: Async[F]) + extends QueueStatistics[F] { + + override def fetcher: Resource[F, QueueStatsFetcher[F]] = + Resource.eval(getQueueUrl.map(new SQSStatisticsFetcher[F](queueName, client, _))) + +} diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatisticsFetcher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatisticsFetcher.scala new file mode 100644 index 0000000..0129e93 --- /dev/null +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSStatisticsFetcher.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.aws.sqs + +import cats.effect.Async +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.monadError._ +import cats.syntax.option._ +import cats.syntax.traverse._ +import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, QueueStatsFetcher} +import software.amazon.awssdk.services.sqs.SqsAsyncClient +import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName} + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +class SQSStatisticsFetcher[F[_]](val queueName: String, client: SqsAsyncClient, queueUrl: String)(implicit F: Async[F]) + extends QueueStatsFetcher[F] { + + override def fetch: F[QueueStats] = + (for { + response <- F.fromCompletableFuture { + F.delay { + client.getQueueAttributes( + GetQueueAttributesRequest + .builder() + .queueUrl(queueUrl) + .attributeNames( + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE + ) + .build()) + } + } + attributes = response.attributes().asScala + messages <- attributeAsInt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)(queueName, "messages") + inflight <- attributeAsIntOpt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE)( + queueName, + "inflight") + delayed <- attributeAsIntOpt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED)( + queueName, + "delayed") + } yield QueueStats(messages, inflight, delayed)).adaptError(makeQueueException(_, queueName)) + + private def attributeAsIntOpt( + attributes: mutable.Map[QueueAttributeName, String], + attribute: QueueAttributeName + )(queueName: String, + attributeName: String + ): F[Option[Int]] = + attributes + .get(attribute) + .traverse(raw => raw.toIntOption.liftTo[F](MalformedQueueConfigurationException(queueName, attributeName, raw))) + + private def attributeAsInt( + attributes: mutable.Map[QueueAttributeName, String], + attribute: QueueAttributeName + )(queueName: String, + attributeName: String + ): F[Int] = + attributeAsIntOpt(attributes, attribute)(queueName, attributeName).flatMap( + F.fromOption(_, MalformedQueueConfigurationException(queueName, attribute.toString(), ""))) + +} diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala index 3358ee4..ecd9ea9 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala @@ -21,7 +21,7 @@ import com.azure.core.credential.TokenCredential import com.azure.core.util.ClientOptions import com.azure.messaging.servicebus.ServiceBusClientBuilder import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder -import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer} class ServiceBusClient[F[_]] private ( clientBuilder: ServiceBusClientBuilder, @@ -32,6 +32,9 @@ class ServiceBusClient[F[_]] private ( override def administration: QueueAdministration[F] = new ServiceBusAdministration(adminBuilder.buildClient()) + override def statistics(name: String): QueueStatistics[F] = + new ServiceBusStatistics(name, adminBuilder) + override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new ServiceBusQueuePublisher[F, T](name, clientBuilder) diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatistics.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatistics.scala new file mode 100644 index 0000000..363759a --- /dev/null +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatistics.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.azure.servicebus + +import cats.effect.{Async, Resource} +import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder +import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher} + +class ServiceBusStatistics[F[_]]( + val queueName: String, + builder: ServiceBusAdministrationClientBuilder +)(implicit F: Async[F]) + extends QueueStatistics[F] { + + override def fetcher: Resource[F, QueueStatsFetcher[F]] = + Resource.eval { + F.delay { + new ServiceBusStatsFetcher(queueName, builder.buildClient()) + } + } + +} diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatsFetcher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatsFetcher.scala new file mode 100644 index 0000000..4fc8b97 --- /dev/null +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusStatsFetcher.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.azure.servicebus + +import cats.effect.Async +import cats.syntax.functor._ +import cats.syntax.monadError._ +import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient +import com.commercetools.queue.{QueueStats, QueueStatsFetcher} + +class ServiceBusStatsFetcher[F[_]](val queueName: String, client: ServiceBusAdministrationClient)(implicit F: Async[F]) + extends QueueStatsFetcher[F] { + + override def fetch: F[QueueStats] = + F.blocking(client.getQueueRuntimeProperties(queueName)) + .map(props => QueueStats(props.getActiveMessageCount(), None, Some(props.getScheduledMessageCount()))) + .adaptError(makeQueueException(_, queueName)) + +} diff --git a/build.sbt b/build.sbt index 80920d8..60cd14b 100644 --- a/build.sbt +++ b/build.sbt @@ -49,7 +49,8 @@ lazy val core = crossProject(JVMPlatform) ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"), ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration"), ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push") + ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueClient.statistics") ) ) @@ -159,7 +160,8 @@ lazy val gcpPubSub = crossProject(JVMPlatform) .settings( name := "fs2-queues-gcp-pubsub", libraryDependencies ++= List( - "com.google.cloud" % "google-cloud-pubsub" % "1.129.3" + "com.google.cloud" % "google-cloud-pubsub" % "1.129.3", + "com.google.cloud" % "google-cloud-monitoring" % "3.47.0" ), tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0") ) diff --git a/core/src/main/scala/com/commercetools/queue/QueueClient.scala b/core/src/main/scala/com/commercetools/queue/QueueClient.scala index e2ed53f..240cebc 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueClient.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueClient.scala @@ -28,6 +28,11 @@ trait QueueClient[F[_]] { */ def administration: QueueAdministration[F] + /** + * Gives access to queue statistics API. + */ + def statistics(name: String): QueueStatistics[F] + /** * Gives access to the publication API. */ diff --git a/core/src/main/scala/com/commercetools/queue/QueueStatistics.scala b/core/src/main/scala/com/commercetools/queue/QueueStatistics.scala new file mode 100644 index 0000000..2de7d3b --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/QueueStatistics.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import cats.effect.{Resource, Temporal} +import cats.syntax.applicativeError._ +import fs2.Stream + +import scala.concurrent.duration.FiniteDuration + +/** + * The base interface to fetch statistics from a queue. + */ +abstract class QueueStatistics[F[_]](implicit F: Temporal[F]) { + + /** The queue name from which to pull statistics. */ + def queueName: String + + /** + * Returns a way to fetch statistics for a queue. + * This is a low-level construct mainly aiming at integrating with existing + * code bases that require to fetch statistics explicitly. + * + * '''Note:''' Prefer using the `stream` below when possible. + */ + def fetcher: Resource[F, QueueStatsFetcher[F]] + + /** + * Stream emitting statistics every configured interval. + * The stream will not fail if an attempt to fetch statistics fails. + * This gives the freedom to the caller to implement its own error handling mechanism. + * + * @param interval the emission interval + */ + def stream(interval: FiniteDuration): Stream[F, Either[Throwable, QueueStats]] = + Stream.resource(fetcher).flatMap { fetcher => + Stream + .repeatEval(fetcher.fetch.attempt) + .meteredStartImmediately(interval) + } + + /** + * Strict version of `stream`, that fails upon the first fetch failure. + * + * @param interval the emission interval + */ + def strictStream(interval: FiniteDuration): Stream[F, QueueStats] = + stream(interval).rethrow + +} diff --git a/core/src/main/scala/com/commercetools/queue/QueueStats.scala b/core/src/main/scala/com/commercetools/queue/QueueStats.scala new file mode 100644 index 0000000..b049d7c --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/QueueStats.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +/** + * Basic statistics for the + * + * @param messages The *approximate* number of available messages currently in the queue + * @param inflight The *approximate* number of messages in the queue that are in-flight (messages delivered to a subscriber but not ack'ed yet) if available + * @param delayed The *approximate* number of delayed messages in the queue if available + */ +final case class QueueStats(messages: Int, inflight: Option[Int], delayed: Option[Int]) diff --git a/core/src/main/scala/com/commercetools/queue/QueueStatsFetcher.scala b/core/src/main/scala/com/commercetools/queue/QueueStatsFetcher.scala new file mode 100644 index 0000000..1cded61 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/QueueStatsFetcher.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +/** + * A queue statistics fetcher allows for fetching statistics a queue individually. + */ +trait QueueStatsFetcher[F[_]] { + + /** + * Fetches the current statistics for the queue. + */ + def fetch: F[QueueStats] + +} diff --git a/docs/getting-started/directory.conf b/docs/getting-started/directory.conf index d3c9e98..3f0d235 100644 --- a/docs/getting-started/directory.conf +++ b/docs/getting-started/directory.conf @@ -4,6 +4,7 @@ laika.navigationOrder = [ queues.md publishing.md subscribing.md + stats.md administration.md serialization.md ] diff --git a/docs/getting-started/queues.md b/docs/getting-started/queues.md index 570b3aa..6766c2b 100644 --- a/docs/getting-started/queues.md +++ b/docs/getting-started/queues.md @@ -13,6 +13,8 @@ There are several views possible on a queue: - as a `QueuePublisher` when you only need to [publish messages](publishing.md) to an existing queue. - as a `QueueSubscriber` when you only need to [subscribe](subscribing.md) to an existing queue. + - as a `QueueStatistics` when you only need to [gather queue statistics](stats.md) from an existing queue. + - as a `QueueSubscriber` when you only need to [subscribe](subscribing.md) to an existing queue. - as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...). The entry point is the `QueueClient` factory for each underlying queue system. diff --git a/docs/getting-started/stats.md b/docs/getting-started/stats.md new file mode 100644 index 0000000..d1f27a6 --- /dev/null +++ b/docs/getting-started/stats.md @@ -0,0 +1,61 @@ +{% nav = true %} +# Queue Statistics + +The library exposes a simple interface to access basic queue statistics through a @:api(com.commercetools.queue.QueueStatistics). + +@:callout(warning) +The numbers reported by the statistics are approximate numbers. Depending on the underlying system, there might be some delay in data availability. + +When in doubt, refer to the queue provider documentation. +@:@ + +```scala mdoc +import cats.effect.IO + +import com.commercetools.queue.{QueueClient, QueueStatistics} + +def client: QueueClient[IO] = ??? + +// returns a statistics accessor for the queue named `my-queue` +def stats: QueueStatistics[IO] = + client.statistics("my-queue") +``` + +## Building a stream of statistics + +This interface expoes a way to build a stream of statistics, polling them at some configured interval. For instance, this code will print the statistics polled every 20 seconds. + +```scala mdoc:compile-only +import scala.concurrent.duration._ + +stats + .stream(interval = 20.seconds) + .evalMap { + case Right(stats) => IO.println(stats) + case Left(t) => IO.println(s"Statistics failed to be retrieved: ${t.getMessage()}") + } + .drain +``` + +If you want the stream to fail upon the first fetching error, you can use the `strictStream` variant. + +## Explicit fetch + +If you are integrating this library with an existing code base that performs explicit fetches for queue statistics, you can access the @:api(com.commercetools.queue.QueueStatsFetcher) lower level API, which exposes a way to fetch statistics explicitly. + +A `QueueStatsFetcher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly. + +The explicitly fetch and report statistics every 20 seconds, one can use this approach (errors are not properly handled, and the stream approach above should be preferred): + +```scala mdoc:compile-only +import scala.concurrent.duration._ + +stats.fetcher.use { statsFetcher => + (statsFetcher + .fetch + .flatMap(IO.println(_)) *> + IO.sleep(20.seconds)).foreverM +} +``` + +[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource diff --git a/docs/systems/index.md b/docs/systems/index.md index 5b68aab..ee76f88 100644 --- a/docs/systems/index.md +++ b/docs/systems/index.md @@ -16,6 +16,7 @@ import com.commercetools.queue.{ QueueClient, QueueAdministration, QueuePublisher, + QueueStatistics, QueueSubscriber, Serializer } @@ -23,6 +24,7 @@ import com.commercetools.queue.testkit.QueueClientSuite class MyQueueClient[F[_]] extends QueueClient[F] { def administration: QueueAdministration[F] = ??? + def statistics(name: String): QueueStatistics[F] = ??? def publish[T: Serializer](name: String): QueuePublisher[F,T] = ??? def subscribe[T: Deserializer](name: String): QueueSubscriber[F,T] = ??? diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index 234c24a..cde0e80 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -28,6 +28,7 @@ import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, Expi import scala.concurrent.duration._ class PubSubAdministration[F[_]]( + useGrpc: Boolean, project: String, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -36,8 +37,12 @@ class PubSubAdministration[F[_]]( extends QueueAdministration[F] { private val adminClient = Resource.fromAutoCloseable(F.delay { - val builder = TopicAdminSettings - .newHttpJsonBuilder() + val builder = + if (useGrpc) + TopicAdminSettings.newBuilder() + else + TopicAdminSettings.newHttpJsonBuilder() + builder .setCredentialsProvider(credentials) .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) @@ -45,8 +50,12 @@ class PubSubAdministration[F[_]]( }) private val subscriptionClient = Resource.fromAutoCloseable(F.delay { - val builder = SubscriptionAdminSettings - .newBuilder() + val builder = + if (useGrpc) + SubscriptionAdminSettings.newBuilder() + else + SubscriptionAdminSettings.newHttpJsonBuilder() + builder .setCredentialsProvider(credentials) .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index 252d586..8d8a798 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -17,7 +17,7 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} -import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel} import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} @@ -32,10 +32,13 @@ class PubSubClient[F[_]: Async] private ( extends QueueClient[F] { override def administration: QueueAdministration[F] = - new PubSubAdministration[F](project, channelProvider, credentials, endpoint) + new PubSubAdministration[F](useGrpc, project, channelProvider, credentials, endpoint) + + override def statistics(name: String): QueueStatistics[F] = + new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = - new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint) + new PubSubPublisher[F, T](name, useGrpc, TopicName.of(project, name), channelProvider, credentials, endpoint) override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new PubSubSubscriber[F, T]( diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala index 91a435b..5f30cf8 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala @@ -20,11 +20,12 @@ import cats.effect.{Async, Resource} import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.TransportChannelProvider -import com.google.cloud.pubsub.v1.stub.{HttpJsonPublisherStub, PublisherStubSettings} +import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, HttpJsonPublisherStub, PublisherStubSettings} import com.google.pubsub.v1.TopicName class PubSubPublisher[F[_], T]( val queueName: String, + useGrpc: Boolean, topicName: TopicName, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -39,12 +40,18 @@ class PubSubPublisher[F[_], T]( .fromAutoCloseable { F.blocking { val builder = - PublisherStubSettings - .newHttpJsonBuilder() - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + if (useGrpc) + PublisherStubSettings.newBuilder() + else + PublisherStubSettings.newHttpJsonBuilder() + builder + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) - HttpJsonPublisherStub.create(builder.build()) + if (useGrpc) + GrpcPublisherStub.create(builder.build()) + else + HttpJsonPublisherStub.create(builder.build()) } } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala new file mode 100644 index 0000000..a445297 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.gcp.pubsub + +import cats.effect.{Async, Resource} +import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher} +import com.google.api.gax.core.CredentialsProvider +import com.google.api.gax.rpc.TransportChannelProvider +import com.google.cloud.monitoring.v3.stub.{GrpcMetricServiceStub, MetricServiceStubSettings} +import com.google.pubsub.v1.SubscriptionName + +class PubSubStatistics[F[_]]( + val queueName: String, + subscriptionName: SubscriptionName, + channelProvider: TransportChannelProvider, + credentials: CredentialsProvider, + endpoint: Option[String] +)(implicit F: Async[F]) + extends QueueStatistics[F] { + + override def fetcher: Resource[F, QueueStatsFetcher[F]] = + Resource + .fromAutoCloseable { + F.blocking { + val builder = MetricServiceStubSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) + endpoint.foreach(builder.setEndpoint(_)) + GrpcMetricServiceStub.create(builder.build()) + } + } + .map(new PubSubStatsFetcher(queueName, subscriptionName, _)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala new file mode 100644 index 0000000..0394dfe --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatsFetcher.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.gcp.pubsub + +import cats.effect.Async +import cats.syntax.flatMap._ +import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, QueueStatsFetcher} +import com.google.cloud.monitoring.v3.stub.MetricServiceStub +import com.google.monitoring.v3.{ListTimeSeriesRequest, Point, TimeInterval} +import com.google.protobuf.Timestamp +import com.google.pubsub.v1.SubscriptionName + +import scala.jdk.CollectionConverters._ + +class PubSubStatsFetcher[F[_]]( + val queueName: String, + subscriptionName: SubscriptionName, + client: MetricServiceStub +)(implicit F: Async[F]) + extends QueueStatsFetcher[F] { + + override def fetch: F[QueueStats] = + F.realTime + .flatMap { now => + wrapFuture(F.delay { + client + .listTimeSeriesCallable() + .futureCall( + ListTimeSeriesRequest + .newBuilder() + .setName(s"projects/${subscriptionName.getProject()}") + // we need to query at least one minute + // https://stackoverflow.com/questions/68546947/google-cloud-metrics-get-pub-sub-metric-filtered-by-subscription-id + .setInterval( + TimeInterval + .newBuilder() + .setStartTime(Timestamp.newBuilder().setSeconds(now.toSeconds - 61).build()) + .setEndTime(Timestamp.newBuilder().setSeconds(now.toSeconds).build()) + .build()) + .setFilter( + s"""metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages" AND resource.label.subscription_id = "${subscriptionName + .getSubscription()}"""") + .build()) + + }) + } + .flatMap { response => + val datapoints: List[Point] = response.getTimeSeries(0).getPointsList().asScala.toList + datapoints.sortBy(-_.getInterval().getEndTime().getSeconds()).headOption match { + case Some(value) => + F.pure(QueueStats(value.getValue().getInt64Value().toInt, None, None)) + case None => + F.raiseError(MalformedQueueConfigurationException(queueName, "messages", "")) + } + } + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala index e33842c..09fa912 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -21,7 +21,7 @@ import cats.syntax.functor._ import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.TransportChannelProvider -import com.google.cloud.pubsub.v1.stub.{HttpJsonSubscriberStub, SubscriberStubSettings} +import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, HttpJsonSubscriberStub, SubscriberStubSettings} import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName} class PubSubSubscriber[F[_], T]( @@ -41,12 +41,18 @@ class PubSubSubscriber[F[_], T]( .fromAutoCloseable { F.blocking { val builder = - SubscriberStubSettings - .newHttpJsonBuilder() - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + if (useGrpc) + SubscriberStubSettings.newBuilder() + else + SubscriberStubSettings.newHttpJsonBuilder() + builder + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) - HttpJsonSubscriberStub.create(builder.build()) + if (useGrpc) + GrpcSubscriberStub.create(builder.build()) + else + HttpJsonSubscriberStub.create(builder.build()) } } .evalMap { subscriber => diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala index c3f9ed8..f95a4aa 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueClient.scala @@ -18,7 +18,7 @@ package com.commercetools.queue.otel4s import cats.effect.Temporal import cats.syntax.functor._ -import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer} import org.typelevel.otel4s.metrics.{Counter, Meter} import org.typelevel.otel4s.trace.Tracer @@ -32,6 +32,9 @@ class MeasuringQueueClient[F[_]]( override def administration: QueueAdministration[F] = new MeasuringQueueAdministration[F](underlying.administration, requestCounter, tracer) + override def statistics(name: String): QueueStatistics[F] = + new MeasuringQueueStatistics[F](underlying.statistics(name), requestCounter, tracer) + override def publish[T: Serializer](name: String): QueuePublisher[F, T] = new MeasuringQueuePublisher[F, T](underlying.publish(name), requestCounter, tracer) diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatistics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatistics.scala new file mode 100644 index 0000000..b130f62 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatistics.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.{Resource, Temporal} +import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher} +import org.typelevel.otel4s.metrics.Counter +import org.typelevel.otel4s.trace.Tracer + +class MeasuringQueueStatistics[F[_]]( + underlying: QueueStatistics[F], + requestCounter: Counter[F, Long], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends QueueStatistics[F] { + + override def queueName: String = underlying.queueName + + override def fetcher: Resource[F, QueueStatsFetcher[F]] = + underlying.fetcher.map(new MeasuringQueueStatsFetcher(_, new QueueMetrics[F](queueName, requestCounter), tracer)) +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatsFetcher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatsFetcher.scala new file mode 100644 index 0000000..e681b65 --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueStatsFetcher.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.MonadCancel +import cats.effect.syntax.monadCancel._ +import com.commercetools.queue.{QueueStats, QueueStatsFetcher} +import org.typelevel.otel4s.trace.Tracer + +class MeasuringQueueStatsFetcher[F[_]]( + underlying: QueueStatsFetcher[F], + metrics: QueueMetrics[F], + tracer: Tracer[F] +)(implicit F: MonadCancel[F, Throwable]) + extends QueueStatsFetcher[F] { + + override def fetch: F[QueueStats] = + tracer + .span("queue.fetchStats") + .surround { + underlying.fetch + } + .guaranteeCase(metrics.stats) + +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala index 3fdb68b..17b1307 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala @@ -30,6 +30,8 @@ private class QueueMetrics[F[_]](queueName: String, requestCounter: Counter[F, L final val nack: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.nack, requestCounter) final val extendLock: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.extendLock, requestCounter) + final val stats: Outcome[F, Throwable, _] => F[Unit] = + QueueMetrics.increment(queue, QueueMetrics.stats, requestCounter) } @@ -41,6 +43,7 @@ private object QueueMetrics { final val ack = Attribute("method", "ack") final val nack = Attribute("method", "nack") final val extendLock = Attribute("method", "extendLock") + final val stats = Attribute("method", "stats") // queue management attributes final val create = Attribute("method", "create")