Skip to content

Commit

Permalink
Merge pull request #20 from commercetools/pubsub/bootstrap
Browse files Browse the repository at this point in the history
Add support for GCP Pub/Sub
  • Loading branch information
satabin authored May 23, 2024
2 parents 40a1abf + 5683068 commit 46f5792
Show file tree
Hide file tree
Showing 29 changed files with 1,064 additions and 25 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ jobs:
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
run: sbt +update

- env:
- name: Install localstack
env:
SERVICES: sqs
uses: LocalStack/setup-localstack@main
with:
image-tag: latest

- name: Install gcloud
uses: google-github-actions/setup-gcloud@v2
with:
install_components: beta,pubsub-emulator

- name: Run PubSub emulator
run: ./gcp/pubsub/emulator/start.sh &

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

Expand All @@ -82,11 +91,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target
run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target
run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.all._
import com.commercetools.queue.QueueAdministration
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.functorFilter._
import cats.syntax.monadError._
import cats.syntax.option._
import com.commercetools.queue.aws.sqs.makeQueueException
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, QueueDoesNotExistException, SetQueueAttributesRequest}
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
Expand Down Expand Up @@ -63,6 +68,43 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
}
.adaptError(makeQueueException(_, name))

override def configuration(name: String): F[QueueConfiguration] =
getQueueUrl(name)
.flatMap { queueUrl =>
F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.MESSAGE_RETENTION_PERIOD, QueueAttributeName.VISIBILITY_TIMEOUT)
.build())
}
}
}
.flatMap { response =>
val attributes = response.attributes().asScala
for {
messageTTL <-
attributes
.get(QueueAttributeName.MESSAGE_RETENTION_PERIOD)
.liftTo[F](MalformedQueueConfigurationException(name, "messageTTL", "<missing>"))
.flatMap(ttl =>
ttl.toIntOption
.map(_.seconds)
.liftTo[F](MalformedQueueConfigurationException(name, "messageTTL", ttl)))
lockTTL <-
attributes
.get(QueueAttributeName.VISIBILITY_TIMEOUT)
.liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", "<missing>"))
.flatMap(ttl =>
ttl.toIntOption
.map(_.seconds)
.liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", ttl)))
} yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
}
.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
getQueueUrl(name)
.flatMap { queueUrl =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.QueueAdministration
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}

import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F])
extends QueueAdministration[F] {
Expand All @@ -47,6 +47,14 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp
val _ = client.updateQueue(properties)
}

override def configuration(name: String): F[QueueConfiguration] =
F.blocking {
val properties = client.getQueue(name)
val messageTTL = properties.getDefaultMessageTimeToLive().toMillis.millis
val lockTTL = properties.getLockDuration().toMillis.millis
QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
}

override def delete(name: String): F[Unit] =
F.blocking(client.deleteQueue(name))
.void
Expand Down
43 changes: 37 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ ThisBuild / scalaVersion := Scala213

ThisBuild / tlSonatypeUseLegacyHost := true

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, circe, otel4s, unidocs)
lazy val root =
tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, gcpPubSub, gcpPubSubIt, circe, otel4s, unidocs)

ThisBuild / tlSitePublishBranch := Some("main")

Expand All @@ -43,32 +44,41 @@ lazy val core = crossProject(JVMPlatform)
.settings(commonSettings)
.settings(
name := "fs2-queues-core",
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload")
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.Message.rawPayload"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueueAdministration.configuration")
)
)

lazy val testkit = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("testkit"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
name := "fs2-queues-testkit",
libraryDependencies ++= List(
"org.scalameta" %%% "munit" % Versions.munit,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect
)
),
tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0")
)
.dependsOn(core)

// for sqs integration test, start a localstack with sqs
ThisBuild / githubWorkflowBuildPreamble := List(
WorkflowStep.Use(
UseRef.Public(owner = "LocalStack", repo = "setup-localstack", ref = "main"),
name = Some("Install localstack"),
params = Map("image-tag" -> "latest"),
env = Map("SERVICES" -> "sqs")
)
),
WorkflowStep.Use(
UseRef.Public(owner = "google-github-actions", repo = "setup-gcloud", ref = "v2"),
name = Some("Install gcloud"),
params = Map("install_components" -> "beta,pubsub-emulator")
),
WorkflowStep.Run(commands = List("./gcp/pubsub/emulator/start.sh &"), name = Some("Run PubSub emulator"))
)

lazy val otel4s = crossProject(JVMPlatform)
Expand Down Expand Up @@ -117,6 +127,7 @@ lazy val awsSQS = crossProject(JVMPlatform)
libraryDependencies ++= List(
"software.amazon.awssdk" % "sqs" % "2.25.50"
),
// TODO: Remove once 0.2 is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.aws.sqs.SQSMessageContext.this")
)
Expand All @@ -129,6 +140,25 @@ lazy val awsSqsIt = project
.settings(commonSettings)
.dependsOn(awsSQS.jvm % Test, testkit.jvm % Test)

lazy val gcpPubSub = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("gcp/pubsub"))
.settings(commonSettings)
.settings(
name := "fs2-queues-gcp-pubsub",
libraryDependencies ++= List(
"com.google.cloud" % "google-cloud-pubsub" % "1.129.3"
),
tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0")
)
.dependsOn(core)

lazy val gcpPubSubIt = project
.in(file("gcp/pubsub/integration"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.dependsOn(gcpPubSub.jvm % Test, testkit.jvm % Test)

lazy val docs = project
.in(file("site"))
.enablePlugins(TypelevelSitePlugin)
Expand All @@ -150,7 +180,7 @@ lazy val docs = project
"com.azure" % "azure-identity" % "1.11.1"
)
)
.dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, otel4s.jvm)
.dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm, testkit.jvm)

lazy val unidocs = project
.in(file("unidocs"))
Expand All @@ -162,5 +192,6 @@ lazy val unidocs = project
circe.jvm,
azureServiceBus.jvm,
awsSQS.jvm,
gcpPubSub.jvm,
otel4s.jvm)
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ trait QueueAdministration[F[_]] {
*/
def update(name: String, messageTTL: Option[FiniteDuration] = None, lockTTL: Option[FiniteDuration] = None): F[Unit]

/** Returns the current configuration settings for the queue. */
def configuration(name: String): F[QueueConfiguration]

/** Deletes the queue with the given name. */
def delete(name: String): F[Unit]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import scala.concurrent.duration.FiniteDuration

final case class QueueConfiguration(messageTTL: FiniteDuration, lockTTL: FiniteDuration)
3 changes: 3 additions & 0 deletions core/src/main/scala/com/commercetools/queue/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ case class MessageException(msgId: String, action: Action, inner: Throwable)

case class UnknownQueueException(name: String, inner: Throwable)
extends QueueException(show"Something went wrong when interacting with queue $name", inner)

case class MalformedQueueConfigurationException(name: String, attribute: String, raw: String, inner: Throwable = null)
extends QueueException(show"Attribute $attribute of queue $name is malformed: $raw", inner)
11 changes: 10 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ version: "3.8"

services:
localstack:
container_name: sqs
container_name: fs2-queues-sqs
image: localstack/localstack:latest
ports:
- "127.0.0.1:4566:4566"
environment:
- SERVICES=sqs
pubsub:
container_name: fs2-queues-pubsub
# https://console.cloud.google.com/gcr/images/google.com:cloudsdktool/global/google-cloud-cli
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
command: "bash /root/scripts/start.sh"
ports:
- "8042:8042"
volumes:
- ./gcp/pubsub/emulator:/root/scripts
2 changes: 1 addition & 1 deletion docs/getting-started/queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
The common abstractions are defined in the core module. To use it, add the following to your build.

```scala
libraryDependencies += "com.commercetools" %% "fs2-queues-core" % "@VERSION@"
libraryDependencies += "com.commercetools" %% "fs2-queues-core" % "@SNAPSHOT_VERSION@"
```

The library provides both low and high level APIs, making it possible to have fine grained control over queue pulling, or just focusing on processing, delegating message management to the library.
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/circe.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
The circe module provides integration with the [circe][circe] library.

```scala
libraryDependencies += "com.commercetools" %% "fs2-queues-circe" % "@VERSION@"
libraryDependencies += "com.commercetools" %% "fs2-queues-circe" % "@SNAPSHOT_VERSION@"
```

It provides:
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/otel4s.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
The otel4s provides an integration with the [otel4s][otel4s] library.

```scala
libraryDependencies += "com.commercetools" %% "fs2-queues-otel4s" % "@VERSION@"
libraryDependencies += "com.commercetools" %% "fs2-queues-otel4s" % "@SNAPSHOT_VERSION@"
```

It allows you to wrap an existing @:api(com.commercetools.queue.QueueClient) into a @:api(com.commercetools.queue.otel4s.MeasuringQueueClient), which adds [tracing][otel4s-tracing] and [metrics][otel4s-metrics] on every call to the underlying queue system.
Expand Down
1 change: 1 addition & 0 deletions docs/systems/directory.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ laika.navigationOrder = [
index.md
sqs.md
service-bus.md
pubsub.md
]
37 changes: 37 additions & 0 deletions docs/systems/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,40 @@
`fs2-queues` comes with several queue system implementations. Each of them implements the @:api(com.commercetools.queue.QueueClient) abstraction with the various interfaces it gives access to.

Each implementations comes with its own way to get access to a client, depending on the underlying SDK. Please have a look at the provider documentation to see the different ways to instantiate the clients.

## Add your own provider

To add a new queue system, you need to implement the @:api(com.commercetools.queue.QueueClient) abstraction and all abstractions it gives access to.
To validate your implementation, we provide a testkit, which runs a series of tests that need to pass to ensure the abstraction behavior is working. All what is needed to implement the integration tests in the testkit is to implement the @:api(com.commercetools.queue.testkit.QueueClientSuite) class and provide a way to instantiate your client as a `Resource`.

```scala mdoc:compile-only
import cats.effect.{IO, Resource}
import com.commercetools.queue.{
Deserializer,
QueueClient,
QueueAdministration,
QueuePublisher,
QueueSubscriber,
Serializer
}
import com.commercetools.queue.testkit.QueueClientSuite

class MyQueueClient[F[_]] extends QueueClient[F] {
def administration: QueueAdministration[F] = ???
def publish[T: Serializer](name: String): QueuePublisher[F,T] = ???
def subscribe[T: Deserializer](name: String): QueueSubscriber[F,T] = ???

}

object MyQueueClient {
def apply[F[_]](): Resource[F, MyQueueClient[F]] = ???
}

// Running this test suite will run all the testkit test
class MyQueueClientSuite extends QueueClientSuite {

override def client: Resource[IO, QueueClient[IO]] =
MyQueueClient[IO]()

}
```
Loading

0 comments on commit 46f5792

Please sign in to comment.