Skip to content

Commit

Permalink
Merge pull request #25 from commercetools/queue-stats
Browse files Browse the repository at this point in the history
Add a way to gather queue statistics
  • Loading branch information
satabin authored Jul 4, 2024
2 parents b6be5e6 + f75a72a commit c5e8c28
Show file tree
Hide file tree
Showing 25 changed files with 626 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.commercetools.queue.aws.sqs
import cats.effect.{Async, Resource}
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
Expand All @@ -41,6 +41,9 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
override def administration: QueueAdministration[F] =
new SQSAdministration(client, getQueueUrl(_))

override def statistics(name: String): QueueStatistics[F] =
new SQSStatistics(name, client, getQueueUrl(name))

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new SQSPublisher(name, client, getQueueUrl(name))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.functor._
import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSStatistics[F[_]](val queueName: String, client: SqsAsyncClient, getQueueUrl: F[String])(implicit F: Async[F])
extends QueueStatistics[F] {

override def fetcher: Resource[F, QueueStatsFetcher[F]] =
Resource.eval(getQueueUrl.map(new SQSStatisticsFetcher[F](queueName, client, _)))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import cats.syntax.option._
import cats.syntax.traverse._
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, QueueStatsFetcher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

import scala.collection.mutable
import scala.jdk.CollectionConverters._

class SQSStatisticsFetcher[F[_]](val queueName: String, client: SqsAsyncClient, queueUrl: String)(implicit F: Async[F])
extends QueueStatsFetcher[F] {

override def fetch: F[QueueStats] =
(for {
response <- F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES,
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED,
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE
)
.build())
}
}
attributes = response.attributes().asScala
messages <- attributeAsInt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)(queueName, "messages")
inflight <- attributeAsIntOpt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE)(
queueName,
"inflight")
delayed <- attributeAsIntOpt(attributes, QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED)(
queueName,
"delayed")
} yield QueueStats(messages, inflight, delayed)).adaptError(makeQueueException(_, queueName))

private def attributeAsIntOpt(
attributes: mutable.Map[QueueAttributeName, String],
attribute: QueueAttributeName
)(queueName: String,
attributeName: String
): F[Option[Int]] =
attributes
.get(attribute)
.traverse(raw => raw.toIntOption.liftTo[F](MalformedQueueConfigurationException(queueName, attributeName, raw)))

private def attributeAsInt(
attributes: mutable.Map[QueueAttributeName, String],
attribute: QueueAttributeName
)(queueName: String,
attributeName: String
): F[Int] =
attributeAsIntOpt(attributes, attribute)(queueName, attributeName).flatMap(
F.fromOption(_, MalformedQueueConfigurationException(queueName, attribute.toString(), "<missing>")))

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.azure.core.credential.TokenCredential
import com.azure.core.util.ClientOptions
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}

class ServiceBusClient[F[_]] private (
clientBuilder: ServiceBusClientBuilder,
Expand All @@ -32,6 +32,9 @@ class ServiceBusClient[F[_]] private (
override def administration: QueueAdministration[F] =
new ServiceBusAdministration(adminBuilder.buildClient())

override def statistics(name: String): QueueStatistics[F] =
new ServiceBusStatistics(name, adminBuilder)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new ServiceBusQueuePublisher[F, T](name, clientBuilder)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher}

class ServiceBusStatistics[F[_]](
val queueName: String,
builder: ServiceBusAdministrationClientBuilder
)(implicit F: Async[F])
extends QueueStatistics[F] {

override def fetcher: Resource[F, QueueStatsFetcher[F]] =
Resource.eval {
F.delay {
new ServiceBusStatsFetcher(queueName, builder.buildClient())
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.commercetools.queue.{QueueStats, QueueStatsFetcher}

class ServiceBusStatsFetcher[F[_]](val queueName: String, client: ServiceBusAdministrationClient)(implicit F: Async[F])
extends QueueStatsFetcher[F] {

override def fetch: F[QueueStats] =
F.blocking(client.getQueueRuntimeProperties(queueName))
.map(props => QueueStats(props.getActiveMessageCount(), None, Some(props.getScheduledMessageCount())))
.adaptError(makeQueueException(_, queueName))

}
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ lazy val core = crossProject(JVMPlatform)
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push")
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePusher.push"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueClient.statistics")
)
)

Expand Down Expand Up @@ -159,7 +160,8 @@ lazy val gcpPubSub = crossProject(JVMPlatform)
.settings(
name := "fs2-queues-gcp-pubsub",
libraryDependencies ++= List(
"com.google.cloud" % "google-cloud-pubsub" % "1.129.3"
"com.google.cloud" % "google-cloud-pubsub" % "1.129.3",
"com.google.cloud" % "google-cloud-monitoring" % "3.47.0"
),
tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0")
)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueueClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ trait QueueClient[F[_]] {
*/
def administration: QueueAdministration[F]

/**
* Gives access to queue statistics API.
*/
def statistics(name: String): QueueStatistics[F]

/**
* Gives access to the publication API.
*/
Expand Down
64 changes: 64 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueueStatistics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import cats.effect.{Resource, Temporal}
import cats.syntax.applicativeError._
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

/**
* The base interface to fetch statistics from a queue.
*/
abstract class QueueStatistics[F[_]](implicit F: Temporal[F]) {

/** The queue name from which to pull statistics. */
def queueName: String

/**
* Returns a way to fetch statistics for a queue.
* This is a low-level construct mainly aiming at integrating with existing
* code bases that require to fetch statistics explicitly.
*
* '''Note:''' Prefer using the `stream` below when possible.
*/
def fetcher: Resource[F, QueueStatsFetcher[F]]

/**
* Stream emitting statistics every configured interval.
* The stream will not fail if an attempt to fetch statistics fails.
* This gives the freedom to the caller to implement its own error handling mechanism.
*
* @param interval the emission interval
*/
def stream(interval: FiniteDuration): Stream[F, Either[Throwable, QueueStats]] =
Stream.resource(fetcher).flatMap { fetcher =>
Stream
.repeatEval(fetcher.fetch.attempt)
.meteredStartImmediately(interval)
}

/**
* Strict version of `stream`, that fails upon the first fetch failure.
*
* @param interval the emission interval
*/
def strictStream(interval: FiniteDuration): Stream[F, QueueStats] =
stream(interval).rethrow

}
26 changes: 26 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueueStats.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

/**
* Basic statistics for the
*
* @param messages The *approximate* number of available messages currently in the queue
* @param inflight The *approximate* number of messages in the queue that are in-flight (messages delivered to a subscriber but not ack'ed yet) if available
* @param delayed The *approximate* number of delayed messages in the queue if available
*/
final case class QueueStats(messages: Int, inflight: Option[Int], delayed: Option[Int])
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

/**
* A queue statistics fetcher allows for fetching statistics a queue individually.
*/
trait QueueStatsFetcher[F[_]] {

/**
* Fetches the current statistics for the queue.
*/
def fetch: F[QueueStats]

}
1 change: 1 addition & 0 deletions docs/getting-started/directory.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ laika.navigationOrder = [
queues.md
publishing.md
subscribing.md
stats.md
administration.md
serialization.md
]
2 changes: 2 additions & 0 deletions docs/getting-started/queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ There are several views possible on a queue:

- as a `QueuePublisher` when you only need to [publish messages](publishing.md) to an existing queue.
- as a `QueueSubscriber` when you only need to [subscribe](subscribing.md) to an existing queue.
- as a `QueueStatistics` when you only need to [gather queue statistics](stats.md) from an existing queue.
- as a `QueueSubscriber` when you only need to [subscribe](subscribing.md) to an existing queue.
- as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...).

The entry point is the `QueueClient` factory for each underlying queue system.
Expand Down
Loading

0 comments on commit c5e8c28

Please sign in to comment.