Skip to content

Commit

Permalink
Merge pull request #28 from commercetools/sealed-interfaces
Browse files Browse the repository at this point in the history
Seal the API traits
  • Loading branch information
satabin authored Jul 16, 2024
2 parents e0a8850 + f726d40 commit 7174f94
Show file tree
Hide file tree
Showing 56 changed files with 210 additions and 181 deletions.
1 change: 1 addition & 0 deletions .scalafix.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ OrganizeImports {
rules = [
OrganizeImports
]
OrganizeImports.targetDialect = Scala2
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import cats.syntax.functorFilter._
import cats.syntax.monadError._
import cats.syntax.option._
import com.commercetools.queue.aws.sqs.makeQueueException
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException}
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueConfiguration, QueueDoesNotExistException, UnsealedQueueAdministration}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
extends QueueAdministration[F] {
private class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
extends UnsealedQueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.commercetools.queue.aws.sqs
import cats.effect.{Async, Resource}
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
Expand All @@ -28,7 +28,7 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest

import java.net.URI

class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends QueueClient[F] {
private class SQSClient[F[_]](client: SqsAsyncClient)(implicit F: Async[F]) extends UnsealedQueueClient[F] {

private def getQueueUrl(name: String): F[String] =
F.fromCompletableFuture {
Expand All @@ -54,13 +54,20 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext

object SQSClient {

/**
* Creates an SQS client for the given `region` and `credentials`.
*
* If `httpClient` is provided, it is not shut down when the resulting resource is released.
* It is up to the caller to ensure that the client is properly managed.
* This is useful if you integrate the library in an existing code base which already uses a client.
*/
def apply[F[_]](
region: Region,
credentials: AwsCredentialsProvider,
endpoint: Option[URI] = None,
httpClient: Option[SdkAsyncHttpClient] = None
)(implicit F: Async[F]
): Resource[F, SQSClient[F]] =
): Resource[F, QueueClient[F]] =
Resource
.fromAutoCloseable {
F.delay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package com.commercetools.queue.aws.sqs
import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Action, MessageContext}
import com.commercetools.queue.{Action, UnsealedMessageContext}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityRequest, DeleteMessageRequest}

import java.time.Instant

class SQSMessageContext[F[_], T](
private class SQSMessageContext[F[_], T](
val payload: F[T],
val rawPayload: String,
val enqueuedAt: Instant,
Expand All @@ -37,7 +37,7 @@ class SQSMessageContext[F[_], T](
queueUrl: String,
client: SqsAsyncClient
)(implicit F: Async[F])
extends MessageContext[F, T] {
extends UnsealedMessageContext[F, T] {

override def ack(): F[Unit] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSPublisher[F[_], T](
private class SQSPublisher[F[_], T](
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
F: Async[F],
serializer: Serializer[T])
extends QueuePublisher[F, T] {
extends UnsealedQueuePublisher[F, T] {

override def pusher: Resource[F, QueuePusher[F, T]] =
Resource.eval(getQueueUrl).map(new SQSPusher(queueName, client, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest}
Expand All @@ -31,15 +31,15 @@ import scala.annotation.nowarn
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPuller[F[_], T](
private class SQSPuller[F[_], T](
val queueName: String,
client: SqsAsyncClient,
queueUrl: String,
lockTTL: Int
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueuePuller[F, T] {
extends UnsealedQueuePuller[F, T] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, SendMes
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class SQSPusher[F[_], T](
private class SQSPusher[F[_], T](
val queueName: String,
client: SqsAsyncClient,
queueUrl: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.functor._
import com.commercetools.queue.{QueueStatistics, QueueStatsFetcher}
import com.commercetools.queue.{QueueStatsFetcher, UnsealedQueueStatistics}
import software.amazon.awssdk.services.sqs.SqsAsyncClient

class SQSStatistics[F[_]](val queueName: String, client: SqsAsyncClient, getQueueUrl: F[String])(implicit F: Async[F])
extends QueueStatistics[F] {
private class SQSStatistics[F[_]](
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit F: Async[F])
extends UnsealedQueueStatistics[F] {

override def fetcher: Resource[F, QueueStatsFetcher[F]] =
Resource.eval(getQueueUrl.map(new SQSStatisticsFetcher[F](queueName, client, _)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import cats.syntax.option._
import cats.syntax.traverse._
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, QueueStatsFetcher}
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueStats, UnsealedQueueStatsFetcher}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

import scala.collection.mutable
import scala.jdk.CollectionConverters._

class SQSStatisticsFetcher[F[_]](val queueName: String, client: SqsAsyncClient, queueUrl: String)(implicit F: Async[F])
extends QueueStatsFetcher[F] {
private class SQSStatisticsFetcher[F[_]](
val queueName: String,
client: SqsAsyncClient,
queueUrl: String
)(implicit F: Async[F])
extends UnsealedQueueStatsFetcher[F] {

override def fetch: F[QueueStats] =
(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ package com.commercetools.queue.aws.sqs

import cats.effect.{Async, Resource}
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{GetQueueAttributesRequest, QueueAttributeName}

class SQSSubscriber[F[_], T](
private class SQSSubscriber[F[_], T](
val queueName: String,
client: SqsAsyncClient,
getQueueUrl: F[String]
)(implicit
F: Async[F],
deserializer: Deserializer[T])
extends QueueSubscriber[F, T] {
extends UnsealedQueueSubscriber[F, T] {

private def getLockTTL(queueUrl: String): F[Int] =
F.fromCompletableFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ import software.amazon.awssdk.services.sqs.model.{QueueDoesNotExistException =>

package object sqs {

def makeQueueException(t: Throwable, queueName: String): QueueException = t match {
private[sqs] def makeQueueException(t: Throwable, queueName: String): QueueException = t match {
case _: AwsQueueDoesNotExistException => QueueDoesNotExistException(queueName, t)
case _: QueueNameExistsException => QueueAlreadyExistException(queueName, t)
case t: QueueException => t
case _ => UnknownQueueException(queueName, t)
}

def makePushQueueException(t: Throwable, queueName: String): QueueException =
private[sqs] def makePushQueueException(t: Throwable, queueName: String): QueueException =
new CannotPushException(queueName, makeQueueException(t, queueName))

def makePullQueueException(t: Throwable, queueName: String): QueueException =
private[sqs] def makePullQueueException(t: Throwable, queueName: String): QueueException =
t match {
case t: QueueException => t
case _ => new CannotPullException(queueName, makeQueueException(t, queueName))
}

def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException =
private[sqs] def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action)
: QueueException =
t match {
case t: QueueException => t
case _ => new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import com.commercetools.queue.{QueueConfiguration, UnsealedQueueAdministration}

import java.time.Duration
import scala.concurrent.duration._

class ServiceBusAdministration[F[_]](
private class ServiceBusAdministration[F[_]](
client: ServiceBusAdministrationClient,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueAdministration[F] {
extends UnsealedQueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import com.azure.core.credential.TokenCredential
import com.azure.core.util.ClientOptions
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}

class ServiceBusClient[F[_]] private (
private class ServiceBusClient[F[_]] private (
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder,
newQueueSettings: NewQueueSettings
)(implicit F: Async[F])
extends QueueClient[F] {
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new ServiceBusAdministration(adminBuilder.buildClient(), newQueueSettings)
Expand All @@ -54,7 +54,7 @@ object ServiceBusClient {
connectionString: String,
newQueueSettings: NewQueueSettings = NewQueueSettings.default
)(implicit F: Async[F]
): Resource[F, ServiceBusClient[F]] =
): Resource[F, QueueClient[F]] =
for {
clientBuilder <- Resource.eval {
F.delay {
Expand All @@ -79,7 +79,7 @@ object ServiceBusClient {
newQueueSettings: NewQueueSettings = NewQueueSettings.default,
options: Option[ClientOptions] = None
)(implicit F: Async[F]
): Resource[F, ServiceBusClient[F]] =
): Resource[F, QueueClient[F]] =
for {
clientBuilder <- Resource.eval {
F.delay {
Expand Down Expand Up @@ -107,7 +107,7 @@ object ServiceBusClient {
adminBuilder: ServiceBusAdministrationClientBuilder,
newQueueSettings: NewQueueSettings = NewQueueSettings.default
)(implicit F: Async[F]
): ServiceBusClient[F] =
): QueueClient[F] =
new ServiceBusClient(clientBuilder, adminBuilder, newQueueSettings)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package com.commercetools.queue.azure.servicebus
import cats.effect.Async
import cats.syntax.functor._
import com.azure.messaging.servicebus.{ServiceBusReceivedMessage, ServiceBusReceiverClient}
import com.commercetools.queue.MessageContext
import com.commercetools.queue.UnsealedMessageContext

import java.time.Instant
import scala.jdk.CollectionConverters.MapHasAsScala

class ServiceBusMessageContext[F[_], T](
private class ServiceBusMessageContext[F[_], T](
val payload: F[T],
val underlying: ServiceBusReceivedMessage,
receiver: ServiceBusReceiverClient
)(implicit F: Async[F])
extends MessageContext[F, T] {
extends UnsealedMessageContext[F, T] {

override def rawPayload: String = underlying.getBody().toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPuller[F[_], Data](
private class ServiceBusPuller[F[_], Data](
val queueName: String,
receiver: ServiceBusReceiverClient
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
extends QueuePuller[F, Data] {
extends UnsealedQueuePuller[F, Data] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = F
.blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusPusher[F[_], Data](
private class ServiceBusPusher[F[_], Data](
val queueName: String,
sender: ServiceBusSenderClient
)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package com.commercetools.queue.azure.servicebus

import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}

class ServiceBusQueuePublisher[F[_], Data](
private class ServiceBusQueuePublisher[F[_], Data](
val queueName: String,
clientBuilder: ServiceBusClientBuilder
)(implicit
F: Async[F],
serializer: Serializer[Data])
extends QueuePublisher[F, Data] {
extends UnsealedQueuePublisher[F, Data] {

override def pusher: Resource[F, QueuePusher[F, Data]] =
Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.{Async, Resource}
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode
import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber}
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}

class ServiceBusQueueSubscriber[F[_], Data](
private class ServiceBusQueueSubscriber[F[_], Data](
val queueName: String,
builder: ServiceBusClientBuilder
)(implicit
F: Async[F],
deserializer: Deserializer[Data])
extends QueueSubscriber[F, Data] {
extends UnsealedQueueSubscriber[F, Data] {

override def puller: Resource[F, QueuePuller[F, Data]] = Resource
.fromAutoCloseable {
Expand Down
Loading

0 comments on commit 7174f94

Please sign in to comment.