From 6a4e65d63cd678c754af4755b6249be8c6697825 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 21 May 2024 16:58:21 +0200 Subject: [PATCH 01/10] Add support for GCP Pub/Sub --- build.sbt | 24 +++- docker-compose.yml | 11 +- gcp/pubsub/emulator/start.sh | 1 + .../queue/pubsub/PubSubClientSuite.scala | 14 +++ .../gcp/pubsub/PubSubAdministration.scala | 109 ++++++++++++++++++ .../queue/gcp/pubsub/PubSubClient.scala | 52 +++++++++ .../gcp/pubsub/PubSubMessageContext.scala | 76 ++++++++++++ .../queue/gcp/pubsub/PubSubPublisher.scala | 37 ++++++ .../queue/gcp/pubsub/PubSubPuller.scala | 79 +++++++++++++ .../queue/gcp/pubsub/PubSubPusher.scala | 56 +++++++++ .../queue/gcp/pubsub/PubSubSubscriber.scala | 46 ++++++++ .../queue/gcp/pubsub/package.scala | 61 ++++++++++ .../queue/testkit/QueueClientSuite.scala | 33 +++++- 13 files changed, 595 insertions(+), 4 deletions(-) create mode 100644 gcp/pubsub/emulator/start.sh create mode 100644 gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala create mode 100644 gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala diff --git a/build.sbt b/build.sbt index aa5157e..47b2dce 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,8 @@ ThisBuild / scalaVersion := Scala213 ThisBuild / tlSonatypeUseLegacyHost := true -lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, circe, otel4s, unidocs) +lazy val root = + tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, gcpPubSub, gcpPubSubIt, circe, otel4s, unidocs) ThisBuild / tlSitePublishBranch := Some("main") @@ -129,6 +130,24 @@ lazy val awsSqsIt = project .settings(commonSettings) .dependsOn(awsSQS.jvm % Test, testkit.jvm % Test) +lazy val gcpPubSub = crossProject(JVMPlatform) + .crossType(CrossType.Pure) + .in(file("gcp/pubsub")) + .settings(commonSettings) + .settings( + name := "fs2-queues-gcp-pubsub", + libraryDependencies ++= List( + "com.google.cloud" % "google-cloud-pubsub" % "1.129.3" + ) + ) + .dependsOn(core) + +lazy val gcpPubSubIt = project + .in(file("gcp/pubsub/integration")) + .enablePlugins(NoPublishPlugin) + .settings(commonSettings) + .dependsOn(gcpPubSub.jvm % Test, testkit.jvm % Test) + lazy val docs = project .in(file("site")) .enablePlugins(TypelevelSitePlugin) @@ -150,7 +169,7 @@ lazy val docs = project "com.azure" % "azure-identity" % "1.11.1" ) ) - .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, otel4s.jvm) + .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm) lazy val unidocs = project .in(file("unidocs")) @@ -162,5 +181,6 @@ lazy val unidocs = project circe.jvm, azureServiceBus.jvm, awsSQS.jvm, + gcpPubSub.jvm, otel4s.jvm) ) diff --git a/docker-compose.yml b/docker-compose.yml index f04417b..9b4d660 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,9 +2,18 @@ version: "3.8" services: localstack: - container_name: sqs + container_name: fs2-queues-sqs image: localstack/localstack:latest ports: - "127.0.0.1:4566:4566" environment: - SERVICES=sqs + pubsub: + container_name: fs2-queues-pubsub + # https://console.cloud.google.com/gcr/images/google.com:cloudsdktool/global/google-cloud-cli + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators + command: "bash /root/scripts/start.sh" + ports: + - "8042:8042" + volumes: + - ./gcp/pubsub/emulator:/root/scripts diff --git a/gcp/pubsub/emulator/start.sh b/gcp/pubsub/emulator/start.sh new file mode 100644 index 0000000..a5a947e --- /dev/null +++ b/gcp/pubsub/emulator/start.sh @@ -0,0 +1 @@ +gcloud beta emulators pubsub start --host-port 0.0.0.0:8042 --project=test-project diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala new file mode 100644 index 0000000..88451f6 --- /dev/null +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -0,0 +1,14 @@ +package com.commercetools.queue.pubsub + +import cats.effect.{IO, Resource} +import com.commercetools.queue.QueueClient +import com.commercetools.queue.gcp.pubsub.PubSubClient +import com.commercetools.queue.testkit.QueueClientSuite +import com.google.api.gax.core.NoCredentialsProvider + +class PubSubClientSuite extends QueueClientSuite { + + override def client: Resource[IO, QueueClient[IO]] = + PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042")) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala new file mode 100644 index 0000000..a9fcf9b --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -0,0 +1,109 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.{Async, Resource} +import cats.syntax.all._ +import com.commercetools.queue.QueueAdministration +import com.google.api.gax.core.CredentialsProvider +import com.google.api.gax.rpc.TransportChannelProvider +import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings} +import com.google.protobuf.Duration +import com.google.pubsub.v1.{ExpirationPolicy, Subscription, SubscriptionName, Topic, TopicName} + +import scala.concurrent.duration.FiniteDuration +import com.google.pubsub.v1.DeleteTopicRequest +import com.google.pubsub.v1.DeleteSubscriptionRequest +import com.google.pubsub.v1.GetTopicRequest +import com.google.api.gax.rpc.NotFoundException + +class PubSubAdministration[F[_]]( + project: String, + channelProvider: TransportChannelProvider, + credentials: CredentialsProvider, + endpoint: Option[String] +)(implicit F: Async[F]) + extends QueueAdministration[F] { + + private val adminClient = Resource.fromAutoCloseable(F.delay { + val builder = TopicAdminSettings + .newHttpJsonBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) + endpoint.foreach(builder.setEndpoint(_)) + TopicAdminClient.create(builder.build()) + }) + + private val subscriptionClient = Resource.fromAutoCloseable(F.delay { + val builder = SubscriptionAdminSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) + endpoint.foreach(builder.setEndpoint(_)) + SubscriptionAdminClient.create(builder.build()) + }) + + override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] = { + val topicName = TopicName.of(project, name) + val ttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + adminClient.use { client => + wrapFuture(F.delay { + client + .createTopicCallable() + .futureCall(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(ttl).build()) + }) + } *> subscriptionClient.use { client => + wrapFuture(F.delay { + client + .createSubscriptionCallable() + .futureCall( + Subscription + .newBuilder() + .setTopic(topicName.toString()) + .setName(SubscriptionName.of(project, s"fs2-queue-$name").toString()) + .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .setMessageRetentionDuration(ttl) + // An empty expiration policy (no TTL set) ensures the subscription is never deleted + .setExpirationPolicy(ExpirationPolicy.newBuilder().build()) + .build()) + }) + }.void + } + .adaptError(makeQueueException(_, name)) + + override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = ??? + + override def delete(name: String): F[Unit] = { + adminClient.use { client => + wrapFuture(F.delay { + client + .deleteTopicCallable() + .futureCall(DeleteTopicRequest.newBuilder().setTopic(TopicName.of(project, name).toString()).build()) + }) + } *> subscriptionClient.use { client => + wrapFuture(F.delay { + client + .deleteSubscriptionCallable() + .futureCall( + DeleteSubscriptionRequest + .newBuilder() + .setSubscription(SubscriptionName.of(project, s"fs2-queue-$name").toString()) + .build()) + }) + }.void + }.adaptError(makeQueueException(_, name)) + + override def exists(name: String): F[Boolean] = + adminClient + .use { client => + wrapFuture(F.delay { + client + .getTopicCallable() + .futureCall(GetTopicRequest.newBuilder().setTopic(TopicName.of(project, name).toString()).build()) + }) + .as(true) + .recover { case _: NotFoundException => + false + } + } + .adaptError(makeQueueException(_, name)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala new file mode 100644 index 0000000..9b2b707 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -0,0 +1,52 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.{Async, Resource} +import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} +import com.google.api.gax.core.CredentialsProvider +import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel} +import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannel, TransportChannelProvider} +import com.google.pubsub.v1.{SubscriptionName, TopicName} + +class PubSubClient[F[_]: Async] private ( + project: String, + channelProvider: TransportChannelProvider, + credentials: CredentialsProvider, + endpoint: Option[String]) + extends QueueClient[F] { + + override def administration: QueueAdministration[F] = + new PubSubAdministration[F](project, channelProvider, credentials, endpoint) + + override def publish[T: Serializer](name: String): QueuePublisher[F, T] = + new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint) + + override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = + new PubSubSubscriber[F, T]( + name, + SubscriptionName.of(project, s"fs2-queue-$name"), + channelProvider, + credentials, + endpoint) + +} + +object PubSubClient { + + private def makeDefaultTransportChannel(endpoint: Option[String]): TransportChannel = + HttpJsonTransportChannel.create( + ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build()) + + def apply[F[_]]( + project: String, + credentials: CredentialsProvider, + endpoint: Option[String] = None, + mkTransportChannel: Option[String] => TransportChannel = makeDefaultTransportChannel _ + )(implicit F: Async[F] + ): Resource[F, PubSubClient[F]] = + Resource + .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) + .map { channel => + new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint) + } + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala new file mode 100644 index 0000000..6d0b5ca --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala @@ -0,0 +1,76 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.Async +import cats.syntax.functor._ +import cats.syntax.monadError._ +import com.commercetools.queue.{Action, MessageContext} +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.{AcknowledgeRequest, ModifyAckDeadlineRequest, ReceivedMessage, SubscriptionName} + +import java.time.Instant +import scala.jdk.CollectionConverters._ + +class PubSubMessageContext[F[_], T]( + subscriber: SubscriberStub, + subscriptionName: SubscriptionName, + underlying: ReceivedMessage, + lockDurationSeconds: Int, + val payload: F[T], + queueName: String +)(implicit F: Async[F]) + extends MessageContext[F, T] { + + override def messageId: String = underlying.getAckId() + + override def rawPayload: String = underlying.getMessage().getData().toStringUtf8() + + override lazy val enqueuedAt: Instant = + Instant.ofEpochSecond( + underlying.getMessage().getPublishTime().getSeconds(), + underlying.getMessage.getPublishTime().getNanos().toLong) + + override lazy val metadata: Map[String, String] = + underlying.getMessage().getAttributesMap().asScala.toMap + + override def ack(): F[Unit] = + wrapFuture( + F.delay( + subscriber + .acknowledgeCallable() + .futureCall( + AcknowledgeRequest + .newBuilder() + .setSubscription(subscriptionName.toString()) + .addAckIds(underlying.getAckId()) + .build()))).void + .adaptError(makeMessageException(_, queueName, messageId, Action.Ack)) + + override def nack(): F[Unit] = + wrapFuture( + F.delay( + subscriber + .modifyAckDeadlineCallable() + .futureCall( + ModifyAckDeadlineRequest + .newBuilder() + .setSubscription(subscriptionName.toString()) + .setAckDeadlineSeconds(0) + .addAckIds(underlying.getAckId()) + .build()))).void + .adaptError(makeMessageException(_, queueName, messageId, Action.Ack)) + + override def extendLock(): F[Unit] = + wrapFuture( + F.delay( + subscriber + .modifyAckDeadlineCallable() + .futureCall( + ModifyAckDeadlineRequest + .newBuilder() + .setSubscription(subscriptionName.toString()) + .setAckDeadlineSeconds(lockDurationSeconds) + .addAckIds(underlying.getAckId()) + .build()))).void + .adaptError(makeMessageException(_, queueName, messageId, Action.Ack)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala new file mode 100644 index 0000000..270c5d2 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala @@ -0,0 +1,37 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.{Async, Resource} +import com.commercetools.queue.{QueuePublisher, QueuePusher, Serializer} +import com.google.api.gax.core.CredentialsProvider +import com.google.api.gax.rpc.TransportChannelProvider +import com.google.cloud.pubsub.v1.stub.{HttpJsonPublisherStub, PublisherStubSettings} +import com.google.pubsub.v1.TopicName + +class PubSubPublisher[F[_], T]( + val queueName: String, + topicName: TopicName, + channelProvider: TransportChannelProvider, + credentials: CredentialsProvider, + endpoint: Option[String] +)(implicit + F: Async[F], + serializer: Serializer[T]) + extends QueuePublisher[F, T] { + + override def pusher: Resource[F, QueuePusher[F, T]] = + Resource + .fromAutoCloseable { + F.blocking { + val builder = + PublisherStubSettings + .newHttpJsonBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) + endpoint.foreach(builder.setEndpoint(_)) + HttpJsonPublisherStub.create(builder.build()) + + } + } + .map(new PubSubPusher[F, T](queueName, topicName, _)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala new file mode 100644 index 0000000..a0a1555 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -0,0 +1,79 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.Async +import cats.effect.syntax.concurrent._ +import cats.syntax.all._ +import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller} +import com.google.api.gax.httpjson.HttpJsonCallContext +import com.google.api.gax.retrying.RetrySettings +import com.google.api.gax.rpc.DeadlineExceededException +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.{ModifyAckDeadlineRequest, PullRequest, ReceivedMessage, SubscriptionName} +import fs2.Chunk +import org.threeten.bp.Duration + +import java.time +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +class PubSubPuller[F[_], T]( + val queueName: String, + subscriptionName: SubscriptionName, + subscriber: SubscriberStub, + lockTTLSeconds: Int +)(implicit + F: Async[F], + deserializer: Deserializer[T]) + extends QueuePuller[F, T] { + + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = + wrapFuture(F.delay { + subscriber + .pullCallable() + .withDefaultCallContext( + HttpJsonCallContext + .createDefault() + .withRetrySettings( + RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())) + .futureCall( + PullRequest.newBuilder().setMaxMessages(batchSize).setSubscription(subscriptionName.toString()).build()) + }).map(response => Chunk.from(response.getReceivedMessagesList().asScala)) + .recover { case _: DeadlineExceededException => + // no messages were available during the configured waiting time + Chunk.empty + } + .flatMap { (msgs: Chunk[ReceivedMessage]) => + msgs.traverseFilter[F, ReceivedMessage] { msg => + val attrs = msg.getMessage().getAttributesMap().asScala + F.realTimeInstant.flatMap { now => + attrs.get(delayAttribute) match { + case Some(ToInstant(until)) if until.isAfter(now) => + wrapFuture( + F.delay( + subscriber + .modifyAckDeadlineCallable() + .futureCall( + ModifyAckDeadlineRequest + .newBuilder() + .addAckIds(msg.getAckId()) + .setSubscription(subscriptionName.toString()) + .setAckDeadlineSeconds(time.Duration.between(now, until).getSeconds().toInt) + .build()))).as(None) + case _ => F.pure(Some(msg)) + } + } + } + } + .flatMap { (msgs: Chunk[ReceivedMessage]) => + msgs + .traverse { msg => + deserializer + .deserializeF[F](msg.getMessage().getData().toStringUtf8()) + .memoize + .map(new PubSubMessageContext(subscriber, subscriptionName, msg, lockTTLSeconds, _, queueName)) + } + } + .widen[Chunk[MessageContext[F, T]]] + .adaptError(makePullQueueException(_, queueName)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala new file mode 100644 index 0000000..3f8ae12 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala @@ -0,0 +1,56 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.Async +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.monadError._ +import cats.syntax.traverse._ +import com.commercetools.queue.{QueuePusher, Serializer} +import com.google.cloud.pubsub.v1.stub.PublisherStub +import com.google.protobuf.ByteString +import com.google.pubsub.v1.{PublishRequest, PubsubMessage, TopicName} + +import java.time.Instant +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +class PubSubPusher[F[_], T]( + val queueName: String, + topicName: TopicName, + publisher: PublisherStub +)(implicit + F: Async[F], + serializer: Serializer[T]) + extends QueuePusher[F, T] { + + private def makeMessage(payload: T, waitUntil: Option[Instant]): F[PubsubMessage] = + F.delay { + val builder = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(serializer.serialize(payload))) + waitUntil.foreach(waitUntil => builder.putAttributes(delayAttribute, waitUntil.toString())) + builder.build + } + + override def push(message: T, delay: Option[FiniteDuration]): F[Unit] = + (for { + waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis))) + msg <- makeMessage(message, waitUntil) + _ <- wrapFuture( + F.delay( + publisher + .publishCallable() + .futureCall(PublishRequest.newBuilder().addMessages(msg).setTopic(topicName.toString()).build()))) + } yield ()) + .adaptError(makePushQueueException(_, queueName)) + + override def push(messages: List[T], delay: Option[FiniteDuration]): F[Unit] = + (for { + waitUntil <- delay.traverse(delay => F.realTimeInstant.map(_.plusMillis(delay.toMillis))) + msgs <- messages.traverse(makeMessage(_, waitUntil)) + _ <- wrapFuture( + F.delay(publisher + .publishCallable() + .futureCall(PublishRequest.newBuilder().addAllMessages(msgs.asJava).setTopic(topicName.toString()).build()))) + } yield ()) + .adaptError(makePushQueueException(_, queueName)) + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala new file mode 100644 index 0000000..ba51303 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -0,0 +1,46 @@ +package com.commercetools.queue.gcp.pubsub + +import cats.effect.{Async, Resource} +import cats.syntax.functor._ +import com.commercetools.queue.{Deserializer, QueuePuller, QueueSubscriber} +import com.google.api.gax.core.CredentialsProvider +import com.google.api.gax.rpc.TransportChannelProvider +import com.google.cloud.pubsub.v1.stub.{HttpJsonSubscriberStub, SubscriberStubSettings} +import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName} + +class PubSubSubscriber[F[_], T]( + val queueName: String, + subscriptionName: SubscriptionName, + channelProvider: TransportChannelProvider, + credentials: CredentialsProvider, + endpoint: Option[String] +)(implicit + F: Async[F], + deserializer: Deserializer[T]) + extends QueueSubscriber[F, T] { + + override def puller: Resource[F, QueuePuller[F, T]] = + Resource + .fromAutoCloseable { + F.blocking { + val builder = + SubscriberStubSettings + .newHttpJsonBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) + endpoint.foreach(builder.setEndpoint(_)) + HttpJsonSubscriberStub.create(builder.build()) + } + } + .evalMap { subscriber => + wrapFuture( + F.delay(subscriber + .getSubscriptionCallable() + .futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build()))).map( + sub => (subscriber, sub)) + } + .map { case (subscriber, subscription) => + new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) + } + +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala new file mode 100644 index 0000000..6db1653 --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala @@ -0,0 +1,61 @@ +package com.commercetools.queue.gcp + +import cats.effect.Async +import cats.syntax.functor._ +import com.commercetools.queue.{Action, CannotPullException, CannotPushException, MessageException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, UnknownQueueException} +import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures} +import com.google.api.gax.rpc.{AlreadyExistsException, NotFoundException} +import com.google.common.util.concurrent.MoreExecutors +import java.time.Instant +import cats.syntax.either._ + +package object pubsub { + + private[pubsub] object ToInstant { + def unapply(s: String): Option[Instant] = + Either.catchNonFatal(Instant.parse(s)).toOption + } + + final private[pubsub] val delayAttribute = "com.commercetools.queue.delay" + + private[pubsub] def wrapFuture[F[_], T](future: F[ApiFuture[T]])(implicit F: Async[F]): F[T] = + F.async { cb => + future.map { future => + ApiFutures.addCallback( + future, + new ApiFutureCallback[T] { + + override def onFailure(t: Throwable): Unit = cb(Left(t)) + + override def onSuccess(result: T): Unit = cb(Right(result)) + + }, + MoreExecutors.directExecutor() + ) + Some(F.delay(future.cancel(false)).void) + } + } + + def makeQueueException(t: Throwable, queueName: String): QueueException = t match { + case _: NotFoundException => QueueDoesNotExistException(queueName, t) + case _: AlreadyExistsException => QueueAlreadyExistException(queueName, t) + case t: QueueException => t + case _ => UnknownQueueException(queueName, t) + } + + def makePushQueueException(t: Throwable, queueName: String): QueueException = + new CannotPushException(queueName, makeQueueException(t, queueName)) + + def makePullQueueException(t: Throwable, queueName: String): QueueException = + t match { + case t: QueueException => t + case _ => new CannotPullException(queueName, makeQueueException(t, queueName)) + } + + def makeMessageException(t: Throwable, queueName: String, msgId: String, action: Action): QueueException = + t match { + case t: QueueException => t + case _ => new MessageException(msgId = msgId, action = action, inner = makeQueueException(t, queueName)) + } + +} diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala index fb908dd..cb74d06 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -3,7 +3,7 @@ package com.commercetools.queue.testkit import cats.effect.std.Random import cats.effect.{IO, Ref, Resource} import com.commercetools.queue.QueueClient -import fs2.Stream +import fs2.{Chunk, Stream} import munit.CatsEffectSuite import scala.concurrent.duration._ @@ -54,4 +54,35 @@ abstract class QueueClientSuite extends CatsEffectSuite { } yield () } + withQueue.test("puller returns no messages if none is available during the configured duration") { queueName => + val client = clientFixture() + client.subscribe(queueName).puller.use { puller => + assertIO(puller.pullBatch(10, 2.seconds), Chunk.empty) + } + } + + withQueue.test("existing queue should be indicated as such") { queueName => + val client = clientFixture() + assertIO(client.administration.exists(queueName), true) + } + + test("non existing queue should be indicated as such") { + val client = clientFixture() + assertIO(client.administration.exists("not-existing"), false) + } + + withQueue.test("delayed messages should not be pulled before deadline") { queueName => + val client = clientFixture() + client.publish(queueName).pusher.use { pusher => + pusher.push("delayed message", Some(2.seconds)) + } *> client.subscribe(queueName).puller.use { puller => + for { + _ <- assertIO(puller.pullBatch(1, 1.second), Chunk.empty) + _ <- IO.sleep(2.seconds) + _ <- assertIO(puller.pullBatch(1, 1.second).map(_.map(_.rawPayload)), Chunk("delayed message")) + } yield () + + } + } + } From 9e44990b8f33dba7ffd775d1ce1558983133cc69 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 21 May 2024 16:58:42 +0200 Subject: [PATCH 02/10] Fix existence check for SQS --- .../com/commercetools/queue/aws/sqs/SQSAdministration.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index 584618d..e2a3ad7 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -21,10 +21,11 @@ import cats.syntax.all._ import com.commercetools.queue.QueueAdministration import com.commercetools.queue.aws.sqs.makeQueueException import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, QueueDoesNotExistException, SetQueueAttributesRequest} +import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, SetQueueAttributesRequest} import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ +import com.commercetools.queue.QueueDoesNotExistException class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F]) extends QueueAdministration[F] { From 6f08a386e11829302f5b326b2aa6419ec459dbe9 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 08:41:14 +0200 Subject: [PATCH 03/10] Make IT test work in CI --- .github/workflows/ci.yml | 15 ++++++++++++--- build.sbt | 9 ++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f75df03..a03653a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,12 +52,21 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update - - env: + - name: Install localstack + env: SERVICES: sqs uses: LocalStack/setup-localstack@main with: image-tag: latest + - name: Install gcloud + uses: google-github-actions/setup-gcloud@v2 + with: + install_components: beta,pubsub-emulator + + - name: Run PubSub emulator + run: ./gcp/pubsub/emulator/start.sh & + - name: Check that workflows are up to date run: sbt githubWorkflowCheck @@ -82,11 +91,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target + run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target + run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index 47b2dce..ac39c83 100644 --- a/build.sbt +++ b/build.sbt @@ -67,9 +67,16 @@ lazy val testkit = crossProject(JVMPlatform) ThisBuild / githubWorkflowBuildPreamble := List( WorkflowStep.Use( UseRef.Public(owner = "LocalStack", repo = "setup-localstack", ref = "main"), + name = Some("Install localstack"), params = Map("image-tag" -> "latest"), env = Map("SERVICES" -> "sqs") - ) + ), + WorkflowStep.Use( + UseRef.Public(owner = "google-github-actions", repo = "setup-gcloud", ref = "v2"), + name = Some("Install gcloud"), + params = Map("install_components" -> "beta,pubsub-emulator") + ), + WorkflowStep.Run(commands = List("./gcp/pubsub/emulator/start.sh &"), name = Some("Run PubSub emulator")) ) lazy val otel4s = crossProject(JVMPlatform) From 9fd5aef8386545f44a8775936ab144efd61b9e3c Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 10:21:30 +0200 Subject: [PATCH 04/10] Prepare PR --- .../queue/aws/sqs/SQSAdministration.scala | 3 +-- .../queue/pubsub/PubSubClientSuite.scala | 16 +++++++++++++ .../gcp/pubsub/PubSubAdministration.scala | 24 ++++++++++++++----- .../queue/gcp/pubsub/PubSubClient.scala | 16 +++++++++++++ .../gcp/pubsub/PubSubMessageContext.scala | 16 +++++++++++++ .../queue/gcp/pubsub/PubSubPublisher.scala | 16 +++++++++++++ .../queue/gcp/pubsub/PubSubPuller.scala | 16 +++++++++++++ .../queue/gcp/pubsub/PubSubPusher.scala | 16 +++++++++++++ .../queue/gcp/pubsub/PubSubSubscriber.scala | 16 +++++++++++++ .../queue/gcp/pubsub/package.scala | 19 ++++++++++++++- 10 files changed, 149 insertions(+), 9 deletions(-) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index e2a3ad7..f7227e1 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -18,14 +18,13 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.all._ -import com.commercetools.queue.QueueAdministration import com.commercetools.queue.aws.sqs.makeQueueException +import com.commercetools.queue.{QueueAdministration, QueueDoesNotExistException} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, SetQueueAttributesRequest} import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -import com.commercetools.queue.QueueDoesNotExistException class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F]) extends QueueAdministration[F] { diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index 88451f6..b53691a 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.pubsub import cats.effect.{IO, Resource} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index a9fcf9b..4017c20 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -1,19 +1,31 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} import cats.syntax.all._ import com.commercetools.queue.QueueAdministration import com.google.api.gax.core.CredentialsProvider -import com.google.api.gax.rpc.TransportChannelProvider +import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider} import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings} import com.google.protobuf.Duration -import com.google.pubsub.v1.{ExpirationPolicy, Subscription, SubscriptionName, Topic, TopicName} +import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName} import scala.concurrent.duration.FiniteDuration -import com.google.pubsub.v1.DeleteTopicRequest -import com.google.pubsub.v1.DeleteSubscriptionRequest -import com.google.pubsub.v1.GetTopicRequest -import com.google.api.gax.rpc.NotFoundException class PubSubAdministration[F[_]]( project: String, diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index 9b2b707..b4eac2e 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala index 6d0b5ca..ed945a5 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.Async diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala index 270c5d2..91a435b 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala index a0a1555..f16c899 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.Async diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala index 3f8ae12..288629c 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.Async diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala index ba51303..6e665c0 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala index 6db1653..99cfa3d 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/package.scala @@ -1,13 +1,30 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.commercetools.queue.gcp import cats.effect.Async +import cats.syntax.either._ import cats.syntax.functor._ import com.commercetools.queue.{Action, CannotPullException, CannotPushException, MessageException, QueueAlreadyExistException, QueueDoesNotExistException, QueueException, UnknownQueueException} import com.google.api.core.{ApiFuture, ApiFutureCallback, ApiFutures} import com.google.api.gax.rpc.{AlreadyExistsException, NotFoundException} import com.google.common.util.concurrent.MoreExecutors + import java.time.Instant -import cats.syntax.either._ package object pubsub { From 83424ab23fa061a2bee7e1e5250415406f45edb5 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 10:26:35 +0200 Subject: [PATCH 05/10] Make pubsub start script executable --- gcp/pubsub/emulator/start.sh | 2 ++ 1 file changed, 2 insertions(+) mode change 100644 => 100755 gcp/pubsub/emulator/start.sh diff --git a/gcp/pubsub/emulator/start.sh b/gcp/pubsub/emulator/start.sh old mode 100644 new mode 100755 index a5a947e..a759817 --- a/gcp/pubsub/emulator/start.sh +++ b/gcp/pubsub/emulator/start.sh @@ -1 +1,3 @@ +#!/bin/sh + gcloud beta emulators pubsub start --host-port 0.0.0.0:8042 --project=test-project From 41d11c0e3a6a31c1666eb31fb58d92b02b6476b7 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 11:13:04 +0200 Subject: [PATCH 06/10] Mark PubSub module as newly introduced --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index ac39c83..97bbe46 100644 --- a/build.sbt +++ b/build.sbt @@ -145,7 +145,8 @@ lazy val gcpPubSub = crossProject(JVMPlatform) name := "fs2-queues-gcp-pubsub", libraryDependencies ++= List( "com.google.cloud" % "google-cloud-pubsub" % "1.129.3" - ) + ), + tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0") ) .dependsOn(core) From c3b129af57d3e6728fdc2fe84f60a2e9cae6c2fd Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 11:13:27 +0200 Subject: [PATCH 07/10] Implement queue update for PubSub --- .../gcp/pubsub/PubSubAdministration.scala | 93 ++++++++++++++++++- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index 4017c20..81590ea 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -22,8 +22,8 @@ import com.commercetools.queue.QueueAdministration import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider} import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings} -import com.google.protobuf.Duration -import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName} +import com.google.protobuf.{Duration, FieldMask} +import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName, UpdateSubscriptionRequest, UpdateTopicRequest} import scala.concurrent.duration.FiniteDuration @@ -81,7 +81,94 @@ class PubSubAdministration[F[_]]( } .adaptError(makeQueueException(_, name)) - override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = ??? + override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = { + val topicName = TopicName.of(project, name) + val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") + val (updateTopicRequest, updateSubscriptionRequest) = + (messageTTL, lockTTL) match { + case (Some(messageTTL), Some(lockTTL)) => + val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + ( + Some( + UpdateTopicRequest + .newBuilder() + .setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build()) + .setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build()) + .build()), + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription + .newBuilder() + .setTopic(topicName.toString()) + .setName(subscriptionName.toString()) + .setMessageRetentionDuration(mttl) + .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .build()) + .setUpdateMask( + FieldMask + .newBuilder() + .addPaths("message_retention_duration") + .addPaths("ack_deadline_seconds") + .build()) + .build())) + case (Some(messageTTL), None) => + val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + ( + Some( + UpdateTopicRequest + .newBuilder() + .setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build()) + .setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build()) + .build()), + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription + .newBuilder() + .setTopic(topicName.toString()) + .setName(subscriptionName.toString()) + .setMessageRetentionDuration(mttl) + .build()) + .setUpdateMask(FieldMask + .newBuilder() + .addPaths("message_retention_duration") + .build()) + .build())) + case (None, Some(lockTTL)) => + ( + None, + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription + .newBuilder() + .setTopic(topicName.toString()) + .setName(subscriptionName.toString()) + .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .build()) + .setUpdateMask(FieldMask + .newBuilder() + .addPaths("ack_deadline_seconds") + .build()) + .build())) + case (None, None) => + (None, None) + } + updateTopicRequest.traverse_ { req => + adminClient.use { client => + wrapFuture(F.delay(client.updateTopicCallable().futureCall(req))) + } + } *> + updateSubscriptionRequest.traverse_ { req => + subscriptionClient.use { client => + wrapFuture(F.delay(client.updateSubscriptionCallable().futureCall(req))) + } + } + } override def delete(name: String): F[Unit] = { adminClient.use { client => From 778f8eca160edfb3ff0ebbd5a2f67e93450db951 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 12:51:16 +0200 Subject: [PATCH 08/10] Support queue update (with tests) --- .../queue/aws/sqs/SQSAdministration.scala | 50 +++++- .../servicebus/ServiceBusAdministration.scala | 12 +- .../queue/QueueAdministration.scala | 3 + .../queue/QueueConfiguration.scala | 21 +++ .../com/commercetools/queue/errors.scala | 3 + .../queue/pubsub/PubSubClientSuite.scala | 2 + .../gcp/pubsub/PubSubAdministration.scala | 160 +++++++++--------- .../otel4s/MeasuringQueueAdministration.scala | 10 +- .../queue/otel4s/QueueMetrics.scala | 1 + .../queue/testkit/QueueClientSuite.scala | 22 ++- 10 files changed, 191 insertions(+), 93 deletions(-) create mode 100644 core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index f7227e1..a4c7295 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -17,13 +17,18 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async -import cats.syntax.all._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import cats.syntax.functorFilter._ +import cats.syntax.monadError._ +import cats.syntax.option._ import com.commercetools.queue.aws.sqs.makeQueueException -import com.commercetools.queue.{QueueAdministration, QueueDoesNotExistException} +import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException} import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, SetQueueAttributesRequest} +import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F]) @@ -63,6 +68,43 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S } .adaptError(makeQueueException(_, name)) + override def configuration(name: String): F[QueueConfiguration] = + getQueueUrl(name) + .flatMap { queueUrl => + F.fromCompletableFuture { + F.delay { + client.getQueueAttributes( + GetQueueAttributesRequest + .builder() + .queueUrl(queueUrl) + .attributeNames(QueueAttributeName.MESSAGE_RETENTION_PERIOD, QueueAttributeName.VISIBILITY_TIMEOUT) + .build()) + } + } + } + .flatMap { response => + val attributes = response.attributes().asScala + for { + messageTTL <- + attributes + .get(QueueAttributeName.MESSAGE_RETENTION_PERIOD) + .liftTo[F](MalformedQueueConfigurationException(name, "messageTTL", "")) + .flatMap(ttl => + ttl.toIntOption + .map(_.seconds) + .liftTo[F](MalformedQueueConfigurationException(name, "messageTTL", ttl))) + lockTTL <- + attributes + .get(QueueAttributeName.VISIBILITY_TIMEOUT) + .liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", "")) + .flatMap(ttl => + ttl.toIntOption + .map(_.seconds) + .liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", ttl))) + } yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) + } + .adaptError(makeQueueException(_, name)) + override def delete(name: String): F[Unit] = getQueueUrl(name) .flatMap { queueUrl => diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala index 7241e1d..1372eac 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala @@ -21,10 +21,10 @@ import cats.syntax.functor._ import cats.syntax.monadError._ import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient import com.azure.messaging.servicebus.administration.models.CreateQueueOptions -import com.commercetools.queue.QueueAdministration +import com.commercetools.queue.{QueueAdministration, QueueConfiguration} import java.time.Duration -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F]) extends QueueAdministration[F] { @@ -47,6 +47,14 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp val _ = client.updateQueue(properties) } + override def configuration(name: String): F[QueueConfiguration] = + F.blocking { + val properties = client.getQueue(name) + val messageTTL = properties.getDefaultMessageTimeToLive().toMillis.millis + val lockTTL = properties.getLockDuration().toMillis.millis + QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) + } + override def delete(name: String): F[Unit] = F.blocking(client.deleteQueue(name)) .void diff --git a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala index ac6c601..947bf54 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala @@ -32,6 +32,9 @@ trait QueueAdministration[F[_]] { */ def update(name: String, messageTTL: Option[FiniteDuration] = None, lockTTL: Option[FiniteDuration] = None): F[Unit] + /** Returns the current configuration settings for the queue. */ + def configuration(name: String): F[QueueConfiguration] + /** Deletes the queue with the given name. */ def delete(name: String): F[Unit] diff --git a/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala b/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala new file mode 100644 index 0000000..f46a0ee --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/QueueConfiguration.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import scala.concurrent.duration.FiniteDuration + +final case class QueueConfiguration(messageTTL: FiniteDuration, lockTTL: FiniteDuration) diff --git a/core/src/main/scala/com/commercetools/queue/errors.scala b/core/src/main/scala/com/commercetools/queue/errors.scala index bd1fa8c..a529967 100644 --- a/core/src/main/scala/com/commercetools/queue/errors.scala +++ b/core/src/main/scala/com/commercetools/queue/errors.scala @@ -45,3 +45,6 @@ case class MessageException(msgId: String, action: Action, inner: Throwable) case class UnknownQueueException(name: String, inner: Throwable) extends QueueException(show"Something went wrong when interacting with queue $name", inner) + +case class MalformedQueueConfigurationException(name: String, attribute: String, raw: String, inner: Throwable = null) + extends QueueException(show"Attribute $attribute of queue $name is malformed: $raw", inner) diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index b53691a..2df944d 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -24,6 +24,8 @@ import com.google.api.gax.core.NoCredentialsProvider class PubSubClientSuite extends QueueClientSuite { + override val queueUpdateSupported = false + override def client: Resource[IO, QueueClient[IO]] = PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042")) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index 81590ea..234c24a 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -18,14 +18,14 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} import cats.syntax.all._ -import com.commercetools.queue.QueueAdministration +import com.commercetools.queue.{QueueAdministration, QueueConfiguration} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.{NotFoundException, TransportChannelProvider} import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings, TopicAdminClient, TopicAdminSettings} import com.google.protobuf.{Duration, FieldMask} -import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName, UpdateSubscriptionRequest, UpdateTopicRequest} +import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, ExpirationPolicy, GetSubscriptionRequest, GetTopicRequest, Subscription, SubscriptionName, Topic, TopicName, UpdateSubscriptionRequest} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ class PubSubAdministration[F[_]]( project: String, @@ -60,7 +60,7 @@ class PubSubAdministration[F[_]]( wrapFuture(F.delay { client .createTopicCallable() - .futureCall(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(ttl).build()) + .futureCall(Topic.newBuilder().setName(topicName.toString()).build()) }) } *> subscriptionClient.use { client => wrapFuture(F.delay { @@ -82,94 +82,86 @@ class PubSubAdministration[F[_]]( .adaptError(makeQueueException(_, name)) override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = { - val topicName = TopicName.of(project, name) val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") - val (updateTopicRequest, updateSubscriptionRequest) = - (messageTTL, lockTTL) match { - case (Some(messageTTL), Some(lockTTL)) => - val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() - ( - Some( - UpdateTopicRequest - .newBuilder() - .setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build()) - .setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build()) - .build()), - Some( - UpdateSubscriptionRequest + val updateSubscriptionRequest = (messageTTL, lockTTL) match { + case (Some(messageTTL), Some(lockTTL)) => + val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription .newBuilder() - .setSubscription( - Subscription - .newBuilder() - .setTopic(topicName.toString()) - .setName(subscriptionName.toString()) - .setMessageRetentionDuration(mttl) - .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) - .build()) - .setUpdateMask( - FieldMask - .newBuilder() - .addPaths("message_retention_duration") - .addPaths("ack_deadline_seconds") - .build()) - .build())) - case (Some(messageTTL), None) => - val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() - ( - Some( - UpdateTopicRequest + .setName(subscriptionName.toString()) + .setMessageRetentionDuration(mttl) + .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .build()) + .setUpdateMask( + FieldMask .newBuilder() - .setTopic(Topic.newBuilder().setName(topicName.toString()).setMessageRetentionDuration(mttl).build()) - .setUpdateMask(FieldMask.newBuilder().addPaths("message_retention_duration").build()) - .build()), - Some( - UpdateSubscriptionRequest + .addPaths("message_retention_duration") + .addPaths("ack_deadline_seconds") + .build()) + .build()) + case (Some(messageTTL), None) => + val mttl = Duration.newBuilder().setSeconds(messageTTL.toSeconds).build() + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription .newBuilder() - .setSubscription( - Subscription - .newBuilder() - .setTopic(topicName.toString()) - .setName(subscriptionName.toString()) - .setMessageRetentionDuration(mttl) - .build()) - .setUpdateMask(FieldMask - .newBuilder() - .addPaths("message_retention_duration") - .build()) - .build())) - case (None, Some(lockTTL)) => - ( - None, - Some( - UpdateSubscriptionRequest + .setName(subscriptionName.toString()) + .setMessageRetentionDuration(mttl) + .build()) + .setUpdateMask(FieldMask + .newBuilder() + .addPaths("message_retention_duration") + .build()) + .build()) + case (None, Some(lockTTL)) => + Some( + UpdateSubscriptionRequest + .newBuilder() + .setSubscription( + Subscription .newBuilder() - .setSubscription( - Subscription - .newBuilder() - .setTopic(topicName.toString()) - .setName(subscriptionName.toString()) - .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) - .build()) - .setUpdateMask(FieldMask - .newBuilder() - .addPaths("ack_deadline_seconds") - .build()) - .build())) - case (None, None) => - (None, None) - } - updateTopicRequest.traverse_ { req => - adminClient.use { client => - wrapFuture(F.delay(client.updateTopicCallable().futureCall(req))) - } - } *> - updateSubscriptionRequest.traverse_ { req => - subscriptionClient.use { client => - wrapFuture(F.delay(client.updateSubscriptionCallable().futureCall(req))) - } + .setName(subscriptionName.toString()) + .setAckDeadlineSeconds(lockTTL.toSeconds.toInt) + .build()) + .setUpdateMask(FieldMask + .newBuilder() + .addPaths("ack_deadline_seconds") + .build()) + .build()) + case (None, None) => + None + } + updateSubscriptionRequest.traverse_ { req => + subscriptionClient.use { client => + wrapFuture(F.delay(client.updateSubscriptionCallable().futureCall(req))) } + } } + override def configuration(name: String): F[QueueConfiguration] = + subscriptionClient.use { client => + wrapFuture[F, Subscription](F.delay { + val subscriptionName = SubscriptionName.of(project, s"fs2-queue-$name") + client + .getSubscriptionCallable() + .futureCall(GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName.toString()).build()) + }).map { (sub: Subscription) => + val messageTTL = + sub.getMessageRetentionDuration().getSeconds.seconds + + sub.getMessageRetentionDuration().getNanos().nanos + val lockTTL = + sub.getAckDeadlineSeconds().seconds + QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL) + } + } + override def delete(name: String): F[Unit] = { adminClient.use { client => wrapFuture(F.delay { diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala index 8543a02..4713e94 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala @@ -18,7 +18,7 @@ package com.commercetools.queue.otel4s import cats.effect.MonadCancel import cats.effect.syntax.monadCancel._ -import com.commercetools.queue.QueueAdministration +import com.commercetools.queue.{QueueAdministration, QueueConfiguration} import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.metrics.Counter import org.typelevel.otel4s.trace.Tracer @@ -48,6 +48,14 @@ class MeasuringQueueAdministration[F[_]]( } .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.update, requestCounter)) + override def configuration(name: String): F[QueueConfiguration] = + tracer + .span("queue.configuration") + .surround { + underlying.configuration(name) + } + .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.configuration, requestCounter)) + override def delete(name: String): F[Unit] = tracer .span("queue.delete") diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala index 6163576..3fdb68b 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala @@ -45,6 +45,7 @@ private object QueueMetrics { // queue management attributes final val create = Attribute("method", "create") final val update = Attribute("method", "update") + final val configuration = Attribute("method", "configuration") final val delete = Attribute("method", "delete") final val exist = Attribute("method", "exist") diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala index cb74d06..5781a3b 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -2,7 +2,7 @@ package com.commercetools.queue.testkit import cats.effect.std.Random import cats.effect.{IO, Ref, Resource} -import com.commercetools.queue.QueueClient +import com.commercetools.queue.{QueueClient, QueueConfiguration} import fs2.{Chunk, Stream} import munit.CatsEffectSuite @@ -15,11 +15,16 @@ import scala.concurrent.duration._ */ abstract class QueueClientSuite extends CatsEffectSuite { + val queueUpdateSupported: Boolean = true + /** Provide a way to acquire a queue client for the provider under test. */ def client: Resource[IO, QueueClient[IO]] val clientFixture = ResourceSuiteLocalFixture("queue-client", client) + val originalMessageTTL = 10.minutes + val originalLockTTL = 2.minutes + override def munitFixtures = List(clientFixture) val withQueue = @@ -29,7 +34,7 @@ abstract class QueueClientSuite extends CatsEffectSuite { .map(uuid => s"queue-$uuid") .flatTap { queueName => clientFixture().administration - .create(queueName, 10.minutes, 2.minutes) + .create(queueName, originalMessageTTL, originalLockTTL) })(queueName => clientFixture().administration.delete(queueName))) withQueue.test("published messages are received by a processor") { queueName => @@ -85,4 +90,17 @@ abstract class QueueClientSuite extends CatsEffectSuite { } } + withQueue.test("configuration can be updated") { queueName => + assume(queueUpdateSupported, "The test environment does not support queue update") + val client = clientFixture() + val admin = client.administration + for { + _ <- assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL)) + _ <- admin.update(queueName, Some(originalMessageTTL + 1.minute), Some(originalLockTTL + 10.seconds)) + _ <- assertIO( + admin.configuration(queueName), + QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds)) + } yield () + } + } From fb2489a972e34f9e9b1fb2e9dd5317b697906632 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 13:27:28 +0200 Subject: [PATCH 09/10] Allow for unmanaged channel provider The custom channel provider might be HTTP or GRPC based, it is up to the user to indicate which is used. --- .../queue/gcp/pubsub/PubSubClient.scala | 20 +++++++++--- .../queue/gcp/pubsub/PubSubPuller.scala | 32 +++++++++++++++---- .../queue/gcp/pubsub/PubSubSubscriber.scala | 3 +- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index b4eac2e..252d586 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -20,12 +20,13 @@ import cats.effect.{Async, Resource} import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel} -import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannel, TransportChannelProvider} +import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} import com.google.pubsub.v1.{SubscriptionName, TopicName} class PubSubClient[F[_]: Async] private ( project: String, channelProvider: TransportChannelProvider, + useGrpc: Boolean, credentials: CredentialsProvider, endpoint: Option[String]) extends QueueClient[F] { @@ -39,6 +40,7 @@ class PubSubClient[F[_]: Async] private ( override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new PubSubSubscriber[F, T]( name, + useGrpc, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, @@ -48,7 +50,7 @@ class PubSubClient[F[_]: Async] private ( object PubSubClient { - private def makeDefaultTransportChannel(endpoint: Option[String]): TransportChannel = + private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel = HttpJsonTransportChannel.create( ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build()) @@ -56,13 +58,23 @@ object PubSubClient { project: String, credentials: CredentialsProvider, endpoint: Option[String] = None, - mkTransportChannel: Option[String] => TransportChannel = makeDefaultTransportChannel _ + mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _ )(implicit F: Async[F] ): Resource[F, PubSubClient[F]] = Resource .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) .map { channel => - new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint) + new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint) } + def unmanaged[F[_]]( + project: String, + credentials: CredentialsProvider, + channelProvider: TransportChannelProvider, + useGrpc: Boolean, + endpoint: Option[String] = None + )(implicit F: Async[F] + ): PubSubClient[F] = + new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint) + } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala index f16c899..c4b035c 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -20,9 +20,10 @@ import cats.effect.Async import cats.effect.syntax.concurrent._ import cats.syntax.all._ import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller} +import com.google.api.gax.grpc.GrpcCallContext import com.google.api.gax.httpjson.HttpJsonCallContext import com.google.api.gax.retrying.RetrySettings -import com.google.api.gax.rpc.DeadlineExceededException +import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException} import com.google.cloud.pubsub.v1.stub.SubscriberStub import com.google.pubsub.v1.{ModifyAckDeadlineRequest, PullRequest, ReceivedMessage, SubscriptionName} import fs2.Chunk @@ -34,6 +35,7 @@ import scala.jdk.CollectionConverters._ class PubSubPuller[F[_], T]( val queueName: String, + useGrpc: Boolean, subscriptionName: SubscriptionName, subscriber: SubscriberStub, lockTTLSeconds: Int @@ -42,15 +44,23 @@ class PubSubPuller[F[_], T]( deserializer: Deserializer[T]) extends QueuePuller[F, T] { + private def callContext(waitingTime: FiniteDuration): ApiCallContext = + if (useGrpc) + GrpcCallContext + .createDefault() + .withRetrySettings( + RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) + else + HttpJsonCallContext + .createDefault() + .withRetrySettings( + RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = wrapFuture(F.delay { subscriber .pullCallable() - .withDefaultCallContext( - HttpJsonCallContext - .createDefault() - .withRetrySettings( - RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())) + .withDefaultCallContext(callContext(waitingTime)) .futureCall( PullRequest.newBuilder().setMaxMessages(batchSize).setSubscription(subscriptionName.toString()).build()) }).map(response => Chunk.from(response.getReceivedMessagesList().asScala)) @@ -59,6 +69,16 @@ class PubSubPuller[F[_], T]( Chunk.empty } .flatMap { (msgs: Chunk[ReceivedMessage]) => + // PubSub does not support delayed messages + // Instead in this case, we set a custom attribute when publishing + // with a delay. The attribute contains the earliest delivery date + // according to the delay. + // If the `Puller` gets a message with this attribute, we check wether + // it is in the future. + // If it is the case, the ack deadline is modified to be the amount of + // remaining seconds until this date. + // This way, the message will not be delivered until this expires. + // The message is ignored (filtered out of the batch) msgs.traverseFilter[F, ReceivedMessage] { msg => val attrs = msg.getMessage().getAttributesMap().asScala F.realTimeInstant.flatMap { now => diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala index 6e665c0..e33842c 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -26,6 +26,7 @@ import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName} class PubSubSubscriber[F[_], T]( val queueName: String, + useGrpc: Boolean, subscriptionName: SubscriptionName, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -56,7 +57,7 @@ class PubSubSubscriber[F[_], T]( sub => (subscriber, sub)) } .map { case (subscriber, subscription) => - new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) + new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) } } From 568306875b7cca617f93471ae7c5071724af6b3d Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 23 May 2024 13:29:20 +0200 Subject: [PATCH 10/10] Add PubSub documentation --- .github/workflows/ci.yml | 4 ++-- build.sbt | 11 ++++++---- docs/getting-started/queues.md | 2 +- docs/integrations/circe.md | 2 +- docs/integrations/otel4s.md | 2 +- docs/systems/directory.conf | 1 + docs/systems/index.md | 37 ++++++++++++++++++++++++++++++++++ docs/systems/pubsub.md | 29 ++++++++++++++++++++++++++ docs/systems/service-bus.md | 2 +- docs/systems/sqs.md | 2 +- 10 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 docs/systems/pubsub.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a03653a..081aeda 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,11 +91,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target + run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target + run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index 97bbe46..6209c2f 100644 --- a/build.sbt +++ b/build.sbt @@ -44,22 +44,24 @@ lazy val core = crossProject(JVMPlatform) .settings(commonSettings) .settings( name := "fs2-queues-core", + // TODO: Remove once 0.2 is published mimaBinaryIssueFilters ++= List( - ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload") + ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration") ) ) lazy val testkit = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("testkit")) - .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( name := "fs2-queues-testkit", libraryDependencies ++= List( "org.scalameta" %%% "munit" % Versions.munit, "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect - ) + ), + tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0") ) .dependsOn(core) @@ -125,6 +127,7 @@ lazy val awsSQS = crossProject(JVMPlatform) libraryDependencies ++= List( "software.amazon.awssdk" % "sqs" % "2.25.50" ), + // TODO: Remove once 0.2 is published mimaBinaryIssueFilters ++= List( ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSMessageContext.this") ) @@ -177,7 +180,7 @@ lazy val docs = project "com.azure" % "azure-identity" % "1.11.1" ) ) - .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm) + .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm, testkit.jvm) lazy val unidocs = project .in(file("unidocs")) diff --git a/docs/getting-started/queues.md b/docs/getting-started/queues.md index 6fe5b67..570b3aa 100644 --- a/docs/getting-started/queues.md +++ b/docs/getting-started/queues.md @@ -3,7 +3,7 @@ The common abstractions are defined in the core module. To use it, add the following to your build. ```scala -libraryDependencies += "com.commercetools" %% "fs2-queues-core" % "@VERSION@" +libraryDependencies += "com.commercetools" %% "fs2-queues-core" % "@SNAPSHOT_VERSION@" ``` The library provides both low and high level APIs, making it possible to have fine grained control over queue pulling, or just focusing on processing, delegating message management to the library. diff --git a/docs/integrations/circe.md b/docs/integrations/circe.md index 8b623b1..a0f1fc4 100644 --- a/docs/integrations/circe.md +++ b/docs/integrations/circe.md @@ -3,7 +3,7 @@ The circe module provides integration with the [circe][circe] library. ```scala -libraryDependencies += "com.commercetools" %% "fs2-queues-circe" % "@VERSION@" +libraryDependencies += "com.commercetools" %% "fs2-queues-circe" % "@SNAPSHOT_VERSION@" ``` It provides: diff --git a/docs/integrations/otel4s.md b/docs/integrations/otel4s.md index f4240a2..4e8807c 100644 --- a/docs/integrations/otel4s.md +++ b/docs/integrations/otel4s.md @@ -3,7 +3,7 @@ The otel4s provides an integration with the [otel4s][otel4s] library. ```scala -libraryDependencies += "com.commercetools" %% "fs2-queues-otel4s" % "@VERSION@" +libraryDependencies += "com.commercetools" %% "fs2-queues-otel4s" % "@SNAPSHOT_VERSION@" ``` It allows you to wrap an existing @:api(com.commercetools.queue.QueueClient) into a @:api(com.commercetools.queue.otel4s.MeasuringQueueClient), which adds [tracing][otel4s-tracing] and [metrics][otel4s-metrics] on every call to the underlying queue system. diff --git a/docs/systems/directory.conf b/docs/systems/directory.conf index 97fb442..247a617 100644 --- a/docs/systems/directory.conf +++ b/docs/systems/directory.conf @@ -4,4 +4,5 @@ laika.navigationOrder = [ index.md sqs.md service-bus.md + pubsub.md ] diff --git a/docs/systems/index.md b/docs/systems/index.md index fefcfdf..5b68aab 100644 --- a/docs/systems/index.md +++ b/docs/systems/index.md @@ -3,3 +3,40 @@ `fs2-queues` comes with several queue system implementations. Each of them implements the @:api(com.commercetools.queue.QueueClient) abstraction with the various interfaces it gives access to. Each implementations comes with its own way to get access to a client, depending on the underlying SDK. Please have a look at the provider documentation to see the different ways to instantiate the clients. + +## Add your own provider + +To add a new queue system, you need to implement the @:api(com.commercetools.queue.QueueClient) abstraction and all abstractions it gives access to. +To validate your implementation, we provide a testkit, which runs a series of tests that need to pass to ensure the abstraction behavior is working. All what is needed to implement the integration tests in the testkit is to implement the @:api(com.commercetools.queue.testkit.QueueClientSuite) class and provide a way to instantiate your client as a `Resource`. + +```scala mdoc:compile-only +import cats.effect.{IO, Resource} +import com.commercetools.queue.{ + Deserializer, + QueueClient, + QueueAdministration, + QueuePublisher, + QueueSubscriber, + Serializer +} +import com.commercetools.queue.testkit.QueueClientSuite + +class MyQueueClient[F[_]] extends QueueClient[F] { + def administration: QueueAdministration[F] = ??? + def publish[T: Serializer](name: String): QueuePublisher[F,T] = ??? + def subscribe[T: Deserializer](name: String): QueueSubscriber[F,T] = ??? + +} + +object MyQueueClient { + def apply[F[_]](): Resource[F, MyQueueClient[F]] = ??? +} + +// Running this test suite will run all the testkit test +class MyQueueClientSuite extends QueueClientSuite { + + override def client: Resource[IO, QueueClient[IO]] = + MyQueueClient[IO]() + +} +``` diff --git a/docs/systems/pubsub.md b/docs/systems/pubsub.md new file mode 100644 index 0000000..1f3e1d2 --- /dev/null +++ b/docs/systems/pubsub.md @@ -0,0 +1,29 @@ +# GCP PubSub + +You can create a client to service bus queues by using the [GCP PubSub][pubsub] module. + +```scala +libraryDependencies += "com.commercetools" %% "fs2-queues-gcp-pubsub" % "@SNAPSHOT_VERSION@" +``` + +For instance you can create a managed client via a region and credentials as follows. + +```scala mdoc:compile-only +import cats.effect.IO +import com.commercetools.queue.gcp.pubsub._ +import com.google.api.gax.core.GoogleCredentialsProvider + +val project = "my-project" // your project +val credentials = GoogleCredentialsProvider.newBuilder().build() // however you want to authenticate + +PubSubClient[IO](project, credentials).use { client => + ??? +} +``` + +The client is managed, meaning that it uses a dedicated HTTP connection pool that will get shut down upon resource release. + +If integrating with an existing code base where you already have an instance of `TransportChannelProvider` that you would like to share, you can use the `unmanaged` construtor. +In this case, it is up to you to manage the channel provider life cycle. + +[pubsub]: https://cloud.google.com/pubsub/ diff --git a/docs/systems/service-bus.md b/docs/systems/service-bus.md index 9f80310..2d8c3df 100644 --- a/docs/systems/service-bus.md +++ b/docs/systems/service-bus.md @@ -3,7 +3,7 @@ You can create a client to service bus queues by using the [Azure Service Bus][service-bus] module. ```scala -libraryDependencies += "com.commercetools" %% "fs2-queues-azure-service-bus" % "@VERSION@" +libraryDependencies += "com.commercetools" %% "fs2-queues-azure-service-bus" % "@SNAPSHOT_VERSION@" ``` For instance, you can create a managed client via a namespace and credentials as follows. diff --git a/docs/systems/sqs.md b/docs/systems/sqs.md index 13eec50..0cdc060 100644 --- a/docs/systems/sqs.md +++ b/docs/systems/sqs.md @@ -3,7 +3,7 @@ You can create a client to service bus queues by using the [AWS SQS][sqs] module. ```scala -libraryDependencies += "com.commercetools" %% "fs2-queues-aws-sqs" % "@VERSION@" +libraryDependencies += "com.commercetools" %% "fs2-queues-aws-sqs" % "@SNAPSHOT_VERSION@" ``` For instance you can create a managed client via a region and credentials as follows.