Skip to content

Commit

Permalink
Seal the API traits
Browse files Browse the repository at this point in the history
Sealing the traits gives us more freedom regarding binary compatibility.
We can add new methods later on without taking the risk to break some
extra implementation done in some using code.
It gives us the flexibility to have several modules implementing our
traits, from within the module, without opening it to the outside.

This prevents easily adding new queue providers from outside the
library, which is fine as the library makes some asumption on the
underlying systems and the way they are used to make the higher level
features work properly.

This approach is borrowed from fs2, e.g. for the `Files` and `Processes`
API.
  • Loading branch information
satabin committed Jul 16, 2024
1 parent e0a8850 commit 2f8fc89
Show file tree
Hide file tree
Showing 41 changed files with 87 additions and 110 deletions.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ OrganizeImports {
rules = [
OrganizeImports
]
OrganizeImports.targetDialect = Scala2
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import cats.syntax.functorFilter._
import cats.syntax.monadError._
import cats.syntax.option._
import com.commercetools.queue.aws.sqs.makeQueueException
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException}
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueConfiguration, QueueDoesNotExistException, UnsealedQueueAdministration}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
extends QueueAdministration[F] {
extends UnsealedQueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.commercetools.queue.aws.sqs
import cats.effect.{Async, Resource}
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
Expand All @@ -28,7 +28,7 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest

import java.net.URI

class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends QueueClient[F] {
class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends UnsealedQueueClient[F] {

private def getQueueUrl(name: String): F[String] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSPublisher[F[_], T](
Expand All @@ -27,7 +27,7 @@ class SQSPublisher[F[_], T](
)(implicit
F: Async[F],
serializer: Serializer[T])
extends QueuePublisher[F, T] {
extends UnsealedQueuePublisher[F, T] {

override def pusher: Resource[F, QueuePusher[F, T]] =
Resource.eval(getQueueUrl).map(new SQSPusher(queueName, client, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest}
Expand All @@ -39,7 +39,7 @@ class SQSPuller[F[_], T](
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueuePuller[F, T] {
extends UnsealedQueuePuller[F, T] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.functor._
import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher}
import com.commercetools.queue.{QueueStatsFetcher, UnsealedQueueStatistics}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSStatistics[F[_]](val queueName: String, client: SqsAsyncClient, getQueueUrl: F[String])(implicit F: Async[F])
extends QueueStatistics[F] {
extends UnsealedQueueStatistics[F] {

override def fetcher: Resource[F, QueueStatsFetcher[F]] =
Resource.eval(getQueueUrl.map(new SQSStatisticsFetcher[F](queueName, client, _)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import cats.syntax.option._
import cats.syntax.traverse._
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, QueueStatsFetcher}
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, UnsealedQueueStatsFetcher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

import scala.collection.mutable
import scala.jdk.CollectionConverters._

class SQSStatisticsFetcher[F[_]](val queueName: String, client: SqsAsyncClient, queueUrl: String)(implicit F: Async[F])
extends QueueStatsFetcher[F] {
extends UnsealedQueueStatsFetcher[F] {

override def fetch: F[QueueStats] =
(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

Expand All @@ -29,7 +29,7 @@ class SQSSubscriber[F[_], T](
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueueSubscriber[F, T] {
extends UnsealedQueueSubscriber[F, T] {

private def getLockTTL(queueUrl: String): F[Int] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import com.commercetools.queue.{QueueConfiguration, UnsealedQueueAdministration}

import java.time.Duration
import scala.concurrent.duration._
Expand All @@ -30,7 +30,7 @@ class ServiceBusAdministration[F[_]](
client: ServiceBusAdministrationClient,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueAdministration[F] {
extends UnsealedQueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import com.azure.core.credential.TokenCredential
import com.azure.core.util.ClientOptions
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}

class ServiceBusClient[F[_]] private (
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueClient[F] {
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new ServiceBusAdministration(adminBuilder.buildClient(), newQueueSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import java.time.Duration
Expand All @@ -35,7 +35,7 @@ class ServiceBusPuller[F[_], Data](
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
extends QueuePuller[F, Data] {
extends UnsealedQueuePuller[F, Data] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = F
.blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package com.commercetools.queue.azure.servicebus

import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}

class ServiceBusQueuePublisher[F[_], Data](
val queueName: String,
clientBuilder: ServiceBusClientBuilder
)(implicit
F: Async[F],
serializer: Serializer[Data])
extends QueuePublisher[F, Data] {
extends UnsealedQueuePublisher[F, Data] {

override def pusher: Resource[F, QueuePusher[F, Data]] =
Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package com.commercetools.queue.azure.servicebus
import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}

class ServiceBusQueueSubscriber[F[_], Data](
val queueName: String,
builder: ServiceBusClientBuilder
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
extends QueueSubscriber[F, Data] {
extends UnsealedQueueSubscriber[F, Data] {

override def puller: Resource[F, QueuePuller[F, Data]] = Resource
.fromAutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package com.commercetools.queue.azure.servicebus

import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher}
import com.commercetools.queue.{QueueStatsFetcher, UnsealedQueueStatistics}

class ServiceBusStatistics[F[_]](
val queueName: String,
builder: ServiceBusAdministrationClientBuilder
)(implicit F: Async[F])
extends QueueStatistics[F] {
extends UnsealedQueueStatistics[F] {

override def fetcher: Resource[F, QueueStatsFetcher[F]] =
Resource.eval {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.commercetools.queue.{QueueStats, QueueStatsFetcher}
import com.commercetools.queue.{QueueStats, UnsealedQueueStatsFetcher}

class ServiceBusStatsFetcher[F[_]](val queueName: String, client: ServiceBusAdministrationClient)(implicit F: Async[F])
extends QueueStatsFetcher[F] {
extends UnsealedQueueStatsFetcher[F] {

override def fetch: F[QueueStats] =
F.blocking(client.getQueueRuntimeProperties(queueName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.duration.FiniteDuration
/**
* Interface that gives access to the queue administration capabilities.
*/
trait QueueAdministration[F[_]] {
sealed trait QueueAdministration[F[_]] {

/** Creates a queue with the given name, message TTL and lock TTL. */
def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit]
Expand All @@ -42,3 +42,5 @@ trait QueueAdministration[F[_]] {
def exists(name: String): F[Boolean]

}

private[queue] trait UnsealedQueueAdministration[F[_]] extends QueueAdministration[F]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package com.commercetools.queue
* A client will manage connection pools and has knowledge of the underlying queue system.
* A client should be managed as a resource to cleanup connections when not need anymore.
*/
trait QueueClient[F[_]] {
sealed trait QueueClient[F[_]] {

/**
* Gives access to adminsitrative API.
Expand All @@ -44,3 +44,5 @@ trait QueueClient[F[_]] {
def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T]

}

private[queue] trait UnsealedQueueClient[F[_]] extends QueueClient[F]
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fs2.Stream
/**
* The interface to publish to a queue.
*/
abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {
sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {

/** The queue name to which this publisher publishes. */
def queueName: String
Expand Down Expand Up @@ -50,6 +50,9 @@ abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable]) {

}

abstract private[queue] class UnsealedQueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwable])
extends QueuePublisher[F, T]

object QueuePublisher {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration
/**
* A queue puller allows for pulling batches of elements from a queue individually.
*/
trait QueuePuller[F[_], T] {
sealed trait QueuePuller[F[_], T] {

/** The queue name from which this puller is pulling. */
def queueName: String
Expand All @@ -44,3 +44,5 @@ trait QueuePuller[F[_], T] {
def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]]

}

private[queue] trait UnsealedQueuePuller[F[_], T] extends QueuePuller[F, T]
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.duration.FiniteDuration
/**
* The base interface to fetch statistics from a queue.
*/
abstract class QueueStatistics[F[_]](implicit F: Temporal[F]) {
sealed abstract class QueueStatistics[F[_]](implicit F: Temporal[F]) {

/** The queue name from which to pull statistics. */
def queueName: String
Expand Down Expand Up @@ -62,3 +62,5 @@ abstract class QueueStatistics[F[_]](implicit F: Temporal[F]) {
stream(interval).rethrow

}

abstract private[queue] class UnsealedQueueStatistics[F[_]](implicit F: Temporal[F]) extends QueueStatistics[F]
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package com.commercetools.queue
/**
* A queue statistics fetcher allows for fetching statistics a queue individually.
*/
trait QueueStatsFetcher[F[_]] {
sealed trait QueueStatsFetcher[F[_]] {

/**
* Fetches the current statistics for the queue.
*/
def fetch: F[QueueStats]

}

private[queue] trait UnsealedQueueStatsFetcher[F[_]] extends QueueStatsFetcher[F]
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.duration.FiniteDuration
/**
* The base interface to subscribe to a queue.
*/
abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
sealed abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {

/** The queue name to which this subscriber subscribes. */
def queueName: String
Expand Down Expand Up @@ -175,3 +175,5 @@ abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
process[Res](batchSize, waitingTime, QueuePublisher.noop)((msg: Message[F, T]) =>
handler.handle(msg).widen[Decision[Res]])
}

abstract private[queue] class UnsealedQueueSubscriber[F[_], T](implicit F: Concurrent[F]) extends QueueSubscriber[F, T]
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.commercetools.queue.testing

import cats.effect.{IO, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher}
import com.commercetools.queue.{QueuePusher, UnsealedQueuePublisher}

class TestQueuePublisher[T](queue: TestQueue[T]) extends QueuePublisher[IO, T] {
class TestQueuePublisher[T](queue: TestQueue[T]) extends UnsealedQueuePublisher[IO, T] {

override val queueName = queue.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package com.commercetools.queue.testing

import cats.effect.IO
import com.commercetools.queue.{MessageContext, QueuePuller}
import com.commercetools.queue.{MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import scala.concurrent.duration.FiniteDuration

class TestQueuePuller[T](queue: TestQueue[T]) extends QueuePuller[IO, T] {
class TestQueuePuller[T](queue: TestQueue[T]) extends UnsealedQueuePuller[IO, T] {

override val queueName: String = queue.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.commercetools.queue.testing

import cats.effect.{IO, Resource}
import com.commercetools.queue.{QueuePuller, QueueSubscriber}
import com.commercetools.queue.{QueuePuller, UnsealedQueueSubscriber}

class TestQueueSubscriber[T](queue: TestQueue[T]) extends QueueSubscriber[IO, T] {
class TestQueueSubscriber[T](queue: TestQueue[T]) extends UnsealedQueueSubscriber[IO, T] {

override val queueName: String = queue.name

Expand Down
Loading

0 comments on commit 2f8fc89

Please sign in to comment.