From fbe01a851b7967309446bb4fa3080253b124a16d Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 29 Mar 2024 09:41:53 +0100 Subject: [PATCH 01/11] Bootstrap website --- .github/workflows/ci.yml | 54 ++++++++++- CONTRIBUTING.md | 12 +-- NOTICE | 4 +- .../azure/servicebus/ServiceBusClient.scala | 7 ++ build.sbt | 38 +++++--- docs/README.md | 84 ------------------ docs/directory.conf | 6 ++ docs/favicon-32x32.png | Bin 0 -> 1269 bytes docs/getting-started/administration.md | 1 + docs/getting-started/directory.conf | 8 ++ docs/getting-started/publishing.md | 1 + docs/getting-started/queues.md | 13 +++ docs/getting-started/subscribing.md | 1 + docs/index.md | 4 + docs/integrations/circe.md | 1 + docs/integrations/directory.conf | 1 + docs/integrations/otel4s.md | 1 + docs/systems/directory.conf | 1 + docs/systems/service-bus.md | 22 +++++ docs/systems/sqs.md | 23 +++++ project/Website.scala | 28 ++++++ project/plugins.sbt | 6 +- 22 files changed, 208 insertions(+), 108 deletions(-) delete mode 100644 docs/README.md create mode 100644 docs/directory.conf create mode 100644 docs/favicon-32x32.png create mode 100644 docs/getting-started/administration.md create mode 100644 docs/getting-started/directory.conf create mode 100644 docs/getting-started/publishing.md create mode 100644 docs/getting-started/queues.md create mode 100644 docs/getting-started/subscribing.md create mode 100644 docs/index.md create mode 100644 docs/integrations/circe.md create mode 100644 docs/integrations/directory.conf create mode 100644 docs/integrations/otel4s.md create mode 100644 docs/systems/directory.conf create mode 100644 docs/systems/service-bus.md create mode 100644 docs/systems/sqs.md create mode 100644 project/Website.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a862516..b0a0c7e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,11 +76,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p project/target + run: mkdir -p unidocs/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar project/target + run: tar cf targets.tar unidocs/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') @@ -160,3 +160,53 @@ jobs: SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} SONATYPE_CREDENTIAL_HOST: ${{ secrets.SONATYPE_CREDENTIAL_HOST }} run: sbt tlCiRelease + + site: + name: Generate Site + strategy: + matrix: + os: [ubuntu-latest] + java: [temurin@11] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout current branch (full) + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup Java (temurin@8) + id: setup-java-temurin-8 + if: matrix.java == 'temurin@8' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 8 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' + run: sbt +update + + - name: Setup Java (temurin@11) + id: setup-java-temurin-11 + if: matrix.java == 'temurin@11' + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 11 + cache: sbt + + - name: sbt update + if: matrix.java == 'temurin@11' && steps.setup-java-temurin-11.outputs.cache-hit == 'false' + run: sbt +update + + - name: Generate site + run: sbt docs/tlSite + + - name: Publish site + if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' + uses: peaceiris/actions-gh-pages@v3.9.3 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: site/target/docs/site + keep_files: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 657b11b..93cff04 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,6 +1,6 @@ -# Contributing to `cloud-queues` +# Contributing to `fs2-queues` -There a several ways you can contribute to `cloud-queues`: +There a several ways you can contribute to `fs2-queues`: - You found a bug? You can [open an issue][open-issue]. - If you have an idea, found something missing, or just a question, you can also [open an issue][open-issue]. - Code contributions are also welcome, you can [open a pull request][open-pr]. @@ -28,7 +28,7 @@ $ sbt readme/mdoc ## Code formatting -`cloud-queues` uses [scalafmt][scalafmt] to format its code and defines some [scalafix][scalafix] rules. Before submitting code contribution, be sure to have proper formatting by running +`fs2-queues` uses [scalafmt][scalafmt] to format its code and defines some [scalafix][scalafix] rules. Before submitting code contribution, be sure to have proper formatting by running ```shell $ sbt prePR @@ -38,10 +38,10 @@ and check the result. ## Licensing -`cloud-queues` is licensed under the Apache Software License 2.0. Opening a pull request is to be considered affirmative consent to incorporate your changes into the project, granting an unrestricted license to the Commercetools GmbH to distribute and derive new work from your changes, as per the contribution terms of ASL 2.0. You also affirm that you own the rights to the code you are contributing. All contributors retain the copyright to their own work. +`fs2-queues` is licensed under the Apache Software License 2.0. Opening a pull request is to be considered affirmative consent to incorporate your changes into the project, granting an unrestricted license to the Commercetools GmbH to distribute and derive new work from your changes, as per the contribution terms of ASL 2.0. You also affirm that you own the rights to the code you are contributing. All contributors retain the copyright to their own work. -[open-issue]: https://github.com/commercetools/cloud-queues/issues/new/choose -[open-pr]: https://github.com/commercetools/cloud-queues/pull/new/main +[open-issue]: https://github.com/commercetools/fs2-queues/issues/new/choose +[open-pr]: https://github.com/commercetools/fs2-queues/pull/new/main [scalafmt]: https://scalameta.org/scalafmt/ [scalafix]: https://scalacenter.github.io/scalafix/ [mdoc]: https://scalameta.org/mdoc/ diff --git a/NOTICE b/NOTICE index 2875805..ac0d6f9 100644 --- a/NOTICE +++ b/NOTICE @@ -1,3 +1,3 @@ -cloud-queues -Copyright 2023 Commercetools GmbH +fs2-queues +Copyright 2024 Commercetools GmbH Licensed under Apache License 2.0 (see LICENSE) diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala index 56977e0..3358ee4 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusClient.scala @@ -78,4 +78,11 @@ object ServiceBusClient { } } yield new ServiceBusClient(clientBuilder, adminBuilder) + def unmanaged[F[_]]( + clientBuilder: ServiceBusClientBuilder, + adminBuilder: ServiceBusAdministrationClientBuilder + )(implicit F: Async[F] + ): ServiceBusClient[F] = + new ServiceBusClient(clientBuilder, adminBuilder) + } diff --git a/build.sbt b/build.sbt index 19e478b..3224025 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,5 @@ +import laika.config.PrettyURLs + ThisBuild / tlBaseVersion := "0.0" ThisBuild / organization := "com.commercetools" @@ -15,6 +17,8 @@ ThisBuild / scalaVersion := Scala213 lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s) +ThisBuild / tlSitePublishBranch := Some("main") + val commonSettings = List( libraryDependencies ++= Seq( "co.fs2" %%% "fs2-core" % Versions.fs2, @@ -35,7 +39,7 @@ lazy val core = crossProject(JVMPlatform) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( - name := "cloud-queues-core" + name := "fs2-queues-core" ) lazy val otel4s = crossProject(JVMPlatform) @@ -44,7 +48,7 @@ lazy val otel4s = crossProject(JVMPlatform) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( - name := "cloud-queues-otel4s", + name := "fs2-queues-otel4s", description := "Support for metrics and tracing using otel4s", libraryDependencies ++= List( "org.typelevel" %%% "otel4s-core" % "0.4.0" @@ -58,7 +62,7 @@ lazy val circe = crossProject(JVMPlatform) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( - name := "cloud-queues-circe", + name := "fs2-queues-circe", libraryDependencies ++= List( "io.circe" %%% "circe-parser" % Versions.circe ) @@ -71,7 +75,7 @@ lazy val azureServiceBus = crossProject(JVMPlatform) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( - name := "cloud-queues-azure-service-bus", + name := "fs2-queues-azure-service-bus", libraryDependencies ++= List( "com.azure" % "azure-messaging-servicebus" % "7.15.1" ) @@ -84,19 +88,31 @@ lazy val awsSQS = crossProject(JVMPlatform) .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( - name := "cloud-queues-aws-sqs", + name := "fs2-queues-aws-sqs", libraryDependencies ++= List( "software.amazon.awssdk" % "sqs" % "2.18.35" ) ) .dependsOn(core) -lazy val readme = project - .in(file("readme")) - .enablePlugins(MdocPlugin, NoPublishPlugin) +lazy val docs = project + .in(file("site")) + .enablePlugins(TypelevelSitePlugin) .settings( - mdocOut := file("."), + tlSiteApiPackage := Some("com.commercetools.queue"), + tlSiteHelium := CTTheme(tlSiteHelium.value), + laikaExtensions += PrettyURLs, + tlFatalWarnings := false, libraryDependencies ++= List( "com.azure" % "azure-identity" % "1.11.1" - )) - .dependsOn(azureServiceBus.jvm, awsSQS.jvm) + ) + ) + .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, otel4s.jvm) + +lazy val unidocs = project + .in(file("unidocs")) + .enablePlugins(TypelevelUnidocPlugin) + .settings( + name := "fs2-queues-docs", + ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(core.jvm, circe.jvm, azureServiceBus.jvm, awsSQS.jvm) + ) diff --git a/docs/README.md b/docs/README.md deleted file mode 100644 index dc6c90d..0000000 --- a/docs/README.md +++ /dev/null @@ -1,84 +0,0 @@ -# Common Cloud Client Tools - -Aims at providing a unified way of working with cloud queues (SQS, PubSub, Service Bus, ...) across all CT scala services. - -## Common queue interface - -The library offers both low and high level possibilities, making it possible to have fine grained control over queue pulling, or just focusing on processing, delegating message management to the library. - -The design of the API is the result of the common usage patterns at CT and how the various client SDKs are designed. -There are several views possible on a queue: - - as a `QueuePublisher` when you only need to publish messages to an existing queue. - - as a `QueueSubscriber` when you only need to subscribe to an existing queue. - - as a `QueueAdministration` when you need to manage queues (creation, deletion, ...). - -The entry point is the `QueueClient` factory for each underlying queue system. - -In the examples below, we will use the following publisher and subscriber streams: - -```scala mdoc -import fs2.Stream -import cats.effect.IO -import cats.effect.std.Random -import scala.concurrent.duration._ - -import com.commercetools.queue._ - -def publishStream(publisher: QueuePublisher[IO, String]): Stream[IO, Nothing] = - Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random => - Stream - // repeatedly emit a random string of length 10 - .repeatEval(random.nextString(10)) - // every 100 milliseconds - .metered(100.millis) - // and publish in batch of 10 - .through(publisher.sink(batchSize = 10)) - } - -def subscribeStream(subscriber: QueueSubscriber[IO, String]): Stream[IO, Nothing] = - subscriber - // receives messages in batches of 5, - // waiting max for 20 seconds - // print every received message, - // and ack automatically - .processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload)) - // results are non important - .drain - -def program(client: QueueClient[IO]): IO[Unit] = { - val queueName = "my-queue" - // subscribe and publish concurrently - subscribeStream(client.subscribe[String](queueName)) - // concurrently publish messages - .concurrently(publishStream(client.publish[String](queueName))) - .compile - // runs forever - .drain -} -``` - -## Working with Azure Service Bus queues - -```scala mdoc:compile-only -import com.commercetools.queue.azure.servicebus._ -import com.azure.identity.DefaultAzureCredentialBuilder - -val namespace = "{namespace}.servicebus.windows.net" // your namespace -val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate - -ServiceBusClient[IO](namespace, credentials).use(program(_)) -``` - -## Working with AWS SQS - - -```scala mdoc:compile-only -import com.commercetools.queue.aws.sqs._ -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider - -val region = Region.US_EAST_1 // your region -val credentials = DefaultCredentialsProvider.create() // however you want to authenticate - -SQSClient[IO](region, credentials).use(program(_)) -``` diff --git a/docs/directory.conf b/docs/directory.conf new file mode 100644 index 0000000..e2c16e5 --- /dev/null +++ b/docs/directory.conf @@ -0,0 +1,6 @@ +laika.navigationOrder = [ + index.md + getting-started + systems + integrations +] diff --git a/docs/favicon-32x32.png b/docs/favicon-32x32.png new file mode 100644 index 0000000000000000000000000000000000000000..1beabe5878a2c1d7e80313923d41d762940dd8ea GIT binary patch literal 1269 zcmZ`$drVtZ7(bo-3!UQ^b!p;>R_n`DOpcDq6vF|p%7vkXj4PzeKI7Zpr=QFh|r)+_Q;6M_6*~=P6yHa zh(cZlj+?XuHt67!b`Zk&?i&L1R~js;oP&VhGGnAL%5M6GWPVCVXMD=Hi;djy+a8R5 z!=6sUFUEeb5c_pyOIGG+Ym~3&W!m+Dw8h4d&{OVW@(*PJ3Ej8q0*7Rm<9I>>jcj_oQY_olQ)Eik{BBq<&3C{Pye!2(kI8o7SXPC=mM=D2cq+X*@@ z(Y+dp1|pRxvLp?>#kW_`gKm~-ssRYtBsS@fRU-)%sTO&*l)6d^3=HzuhqZE$CrK{J z$a}@pa0V+Ni#XDcK385!Z#uu*QM0ksHMp-K&@nnV>*!%lBLFdWYA`+!dKS zFY2q5$VXy7RJ9#bPs?2|p4TgymrcLt%{-Xp+&z#Ly6id;k?-mo?3R^x>|D_?yII!D zaUr=B$4^&dyEcDOJ0A&p`OWIHBSUYr1EsED{WxdI8TVcmIQFQ&f52xS9R2d#hF^Ni zwVxNcpVs~T_^%x{k?^_w1+UEs0E-PnoxB$q!X$CZevy2^Z+xNu-t)Dj(%CC^Ve0zf z!epM`X~hIqO{&CdzAs8Ix^tS(FE5vJ%=-qX>JK&824{Bqf}#ZVQ?`_Qvvn^8=j7NG z*V8)9?^l|x++S-8+e*C#BXH)g_a~=f&OH2cYcqaiESR?*eev*}rHyE}xWbm-*wKVO zXsktAXff(7nAlK-p$jAlladpJq6ATLo+w!?+A9_&?-2^cLZLQP9{wLgS%uM5TKoTo zJ&K22$Pi9+I9O3?vFfWZV6|EkP37-Y8}w!@v7+i|`-8LyBq}(WS7=e`t$cG;MXBjH z#<$d(F}}%cF#z!Hg}a+9T4laK;C{>Fl`pFRIfj{NU~*%)igO`kE?;6xo-&eSzRTl# nZZFJEd#59(sUDl`(wm83EiBlwnYp(awE%Kyj^vxnBd7iW(Q+!n literal 0 HcmV?d00001 diff --git a/docs/getting-started/administration.md b/docs/getting-started/administration.md new file mode 100644 index 0000000..eac9611 --- /dev/null +++ b/docs/getting-started/administration.md @@ -0,0 +1 @@ +# Managing Queues diff --git a/docs/getting-started/directory.conf b/docs/getting-started/directory.conf new file mode 100644 index 0000000..2d87703 --- /dev/null +++ b/docs/getting-started/directory.conf @@ -0,0 +1,8 @@ +laika.title = "Getting Started" + +laika.navigationOrder = [ + queues.md + publishing.md + subscribing.md + administration.md +] diff --git a/docs/getting-started/publishing.md b/docs/getting-started/publishing.md new file mode 100644 index 0000000..3b4e052 --- /dev/null +++ b/docs/getting-started/publishing.md @@ -0,0 +1 @@ +# Publishing Data diff --git a/docs/getting-started/queues.md b/docs/getting-started/queues.md new file mode 100644 index 0000000..04597bc --- /dev/null +++ b/docs/getting-started/queues.md @@ -0,0 +1,13 @@ +# Common Queue Interface + +The library offers both low and high level possibilities, making it possible to have fine grained control over queue pulling, or just focusing on processing, delegating message management to the library. + +The design of the API is the result of the common usage patterns and how the various client SDKs are designed. +There are several views possible on a queue: + + - as a `QueuePublisher` when you only need to [publish messages](publishing.md) to an existing queue. + - as a `QueueSubscriber` when you only need to [subscribe](subscribing.md) to an existing queue. + - as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...). + +The entry point is the `QueueClient` factory for each underlying queue system. +For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/sqs.md) section to see how. diff --git a/docs/getting-started/subscribing.md b/docs/getting-started/subscribing.md new file mode 100644 index 0000000..64294b8 --- /dev/null +++ b/docs/getting-started/subscribing.md @@ -0,0 +1 @@ +# Receiving Data diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..6dd013e --- /dev/null +++ b/docs/index.md @@ -0,0 +1,4 @@ +# `fs2-queues` + +Cloud Queues is a library that provides interfaces for working with queue systems. +It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md) diff --git a/docs/integrations/circe.md b/docs/integrations/circe.md new file mode 100644 index 0000000..2b60567 --- /dev/null +++ b/docs/integrations/circe.md @@ -0,0 +1 @@ +# Circe diff --git a/docs/integrations/directory.conf b/docs/integrations/directory.conf new file mode 100644 index 0000000..4c169cb --- /dev/null +++ b/docs/integrations/directory.conf @@ -0,0 +1 @@ +laika.title = "Library Integrations" diff --git a/docs/integrations/otel4s.md b/docs/integrations/otel4s.md new file mode 100644 index 0000000..a950740 --- /dev/null +++ b/docs/integrations/otel4s.md @@ -0,0 +1 @@ +# Otel4s diff --git a/docs/systems/directory.conf b/docs/systems/directory.conf new file mode 100644 index 0000000..6154fe6 --- /dev/null +++ b/docs/systems/directory.conf @@ -0,0 +1 @@ +laika.title = "Qeueue Systems" diff --git a/docs/systems/service-bus.md b/docs/systems/service-bus.md new file mode 100644 index 0000000..cdd66be --- /dev/null +++ b/docs/systems/service-bus.md @@ -0,0 +1,22 @@ +# Azure Service Bus Queues + +You can create a client to service bus queues by using the `fs2-queues-azure-service-bus` module. + +For instance, you can create a managed client via a namespace and credentials as follows. + +```scala mdoc:compile-only +import cats.effect.IO +import com.commercetools.queue.azure.servicebus._ +import com.azure.identity.DefaultAzureCredentialBuilder + +val namespace = "{namespace}.servicebus.windows.net" // your namespace +val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate + +ServiceBusClient[IO](namespace, credentials).use { client => + ??? +} +``` + +The client is managed, meaning that it uses a conection pool that will get shut down upon resource release. + +If integrating with an existing code base where you already have builders that you would like to share, you can use the `unmanaged` variant. diff --git a/docs/systems/sqs.md b/docs/systems/sqs.md new file mode 100644 index 0000000..6f4b3b8 --- /dev/null +++ b/docs/systems/sqs.md @@ -0,0 +1,23 @@ +# AWS SQS + +You can create a client to service bus queues by using the `fs2-queues-aws-sqs` module. + +For instance you can create a managed client via a region and credentials as follows. + +```scala mdoc:compile-only +import cats.effect.IO +import com.commercetools.queue.aws.sqs._ +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider + +val region = Region.US_EAST_1 // your region +val credentials = DefaultCredentialsProvider.create() // however you want to authenticate + +SQSClient[IO](region, credentials).use { client => + ??? +} +``` + +The client is managed, meaning that it uses a dedicated HTTP connection pool that will get shut down upon resource release. + +If integrating with an existing code base where you already have an instance of `SdkAsyncHttpClient` that you would like to share, you can pass the optional `httpClient` parameter. If passed explicitly, the client is not closed when the resource is released, and it is up to the caller to manage it. diff --git a/project/Website.scala b/project/Website.scala new file mode 100644 index 0000000..5cd6d91 --- /dev/null +++ b/project/Website.scala @@ -0,0 +1,28 @@ +import laika.ast.{Path, SpanLink, TemplateString} +import laika.helium.Helium +import laika.helium.config.{Favicon, TextLink} +import laika.theme.config.Color + +object CTTheme { + def apply(base: Helium) = + base.site + .themeColors( + primary = Color.hex("007c99"), + secondary = Color.hex("6359ff"), + // primaryMedium = Color.hex("0bbfbf"), + primaryMedium = Color.hex("ffc806"), + primaryLight = Color.hex("f7f2ea"), + text = Color.hex("1a1a1a"), + background = Color.hex("ffffff"), + bgGradient = (Color.hex("ffc806"), Color.hex("f7f2ea")) + ) + .site + .darkMode + .disabled + .site + .topNavigationBar(homeLink = TextLink.internal(Path.Root / "index.md", "fs2-queues")) + .site + .favIcons(Favicon.internal(Path.Root / "favicon-32x32.png", sizes = "32x32")) + .site + .footer(TemplateString("Copyright © "), SpanLink.external("https://commercetools.com/")("commercetools")) +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 5da95be..85d429c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.6.5") -addSbtPlugin("org.typelevel" % "sbt-typelevel-scalafix" % "0.6.5") +addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.6.7") +addSbtPlugin("org.typelevel" % "sbt-typelevel-site" % "0.6.7") +addSbtPlugin("org.typelevel" % "sbt-typelevel-scalafix" % "0.6.7") addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.2") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1") From 3d45534c50daebb18ef8b41392caf9024337573a Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 10:46:07 +0200 Subject: [PATCH 02/11] Add `QueuePublisher` documentation --- build.sbt | 15 +++- docs/getting-started/publishing.md | 86 +++++++++++++++++++++ docs/getting-started/queues.md | 2 +- docs/helium/templates/pageNav.template.html | 13 ++++ docs/index.md | 2 +- docs/systems/directory.conf | 6 ++ docs/systems/index.md | 5 ++ 7 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 docs/helium/templates/pageNav.template.html create mode 100644 docs/systems/index.md diff --git a/build.sbt b/build.sbt index 3224025..44299fc 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,6 @@ import laika.config.PrettyURLs +import laika.config.LinkConfig +import laika.config.ApiLinks ThisBuild / tlBaseVersion := "0.0" @@ -15,7 +17,7 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3") ThisBuild / scalaVersion := Scala213 -lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s) +lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs) ThisBuild / tlSitePublishBranch := Some("main") @@ -101,6 +103,10 @@ lazy val docs = project .settings( tlSiteApiPackage := Some("com.commercetools.queue"), tlSiteHelium := CTTheme(tlSiteHelium.value), + laikaConfig := tlSiteApiUrl.value.fold(laikaConfig.value) { apiUrl => + laikaConfig.value.withConfigValue(LinkConfig.empty + .addApiLinks(ApiLinks(baseUri = apiUrl.toString().dropRight("index.html".size)))) + }, laikaExtensions += PrettyURLs, tlFatalWarnings := false, libraryDependencies ++= List( @@ -114,5 +120,10 @@ lazy val unidocs = project .enablePlugins(TypelevelUnidocPlugin) .settings( name := "fs2-queues-docs", - ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(core.jvm, circe.jvm, azureServiceBus.jvm, awsSQS.jvm) + ScalaUnidoc / unidoc / unidocProjectFilter := inProjects( + core.jvm, + circe.jvm, + azureServiceBus.jvm, + awsSQS.jvm, + otel4s.jvm) ) diff --git a/docs/getting-started/publishing.md b/docs/getting-started/publishing.md index 3b4e052..8d29854 100644 --- a/docs/getting-started/publishing.md +++ b/docs/getting-started/publishing.md @@ -1 +1,87 @@ +{% nav = true %} # Publishing Data + +Publishing data is made using a @:api(com.commercetools.queue.QueuePublisher). You can acquire one through a @:api(com.commercetools.queue.QueueClient) by using the `publish()` method. A `QueuePublisher` is associated to a specific queue, which is provided when creating the publisher. + +```scala mdoc +import cats.effect.IO + +import com.commercetools.queue.{QueueClient, QueuePublisher} + +def client: QueueClient[IO] = ??? + +// returns a publisher for the queue named `my-queue`, +// which can publish messages of type String +def publisher: QueuePublisher[IO, String] = + client.publish[String]("my-queue") +``` + +## Pipe a stream through the publisher sink + +The @:api(com.commercetools.queue.QueuePublisher) abstraction provides a `sink()` pipe, through which you can make your publishing source stream go. +The pipe takes a parameter allowing for batching publications. + +```scala mdoc:compile-only +import fs2.{Pipe, Stream} + +val input: Stream[IO, String] = ??? + +// messages are published in batch of 10 +val publicationSink: Pipe[IO, String, Nothing] = publisher.sink(batchSize = 10) + +// pipe the message producing stream through the publication sink +input.through(publicationSink) +``` + +@:callout(info) +Several `Stream`s can safely publish to the same sink concurrently, so you can reuse the `publicationSink` variable. +@:@ + +## Explicit publish + +If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes a way to publish a single message or a single batch. +This abstraction comes in handy when the messages you produces do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above. + +A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly. + +```scala mdoc:compile-only +publisher.pusher.use { queuePusher => + val produceMessages: IO[List[String]] = ??? + + // produce a batch + produceMessages + .flatMap { messages => + // push the batch + queuePusher.push(messages, None) + } + // repeat forever + .foreverM +} +``` + +@:callout(warning) +Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using the a closed resource after the `use` block returns. +If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested withing the `queuePusher` one. + +```scala mdoc:compile-only +import cats.effect.std.Supervisor + +publisher.pusher.use { queuePusher => + val produceMessages: IO[List[String]] = ??? + + // create a supervisor that waits for supervised spawn fibers + // to finish before being released + Supervisor[IO](await = true).use { supervisor => + // produce a batch + produceMessages + .flatMap { messages => + // push the batch in the background + supervisor.supervise(queuePusher.push(messages, None)).void + } + } +} +``` +@:@ + +[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource +[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor diff --git a/docs/getting-started/queues.md b/docs/getting-started/queues.md index 04597bc..8eb5112 100644 --- a/docs/getting-started/queues.md +++ b/docs/getting-started/queues.md @@ -10,4 +10,4 @@ There are several views possible on a queue: - as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...). The entry point is the `QueueClient` factory for each underlying queue system. -For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/sqs.md) section to see how. +For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/index.md) section to see how. diff --git a/docs/helium/templates/pageNav.template.html b/docs/helium/templates/pageNav.template.html new file mode 100644 index 0000000..c3f4737 --- /dev/null +++ b/docs/helium/templates/pageNav.template.html @@ -0,0 +1,13 @@ +@:if(nav) + +@:@ diff --git a/docs/index.md b/docs/index.md index 6dd013e..3c9f369 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,4 +1,4 @@ -# `fs2-queues` +# Home Cloud Queues is a library that provides interfaces for working with queue systems. It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md) diff --git a/docs/systems/directory.conf b/docs/systems/directory.conf index 6154fe6..97fb442 100644 --- a/docs/systems/directory.conf +++ b/docs/systems/directory.conf @@ -1 +1,7 @@ laika.title = "Qeueue Systems" + +laika.navigationOrder = [ + index.md + sqs.md + service-bus.md +] diff --git a/docs/systems/index.md b/docs/systems/index.md new file mode 100644 index 0000000..fefcfdf --- /dev/null +++ b/docs/systems/index.md @@ -0,0 +1,5 @@ +# Providers + +`fs2-queues` comes with several queue system implementations. Each of them implements the @:api(com.commercetools.queue.QueueClient) abstraction with the various interfaces it gives access to. + +Each implementations comes with its own way to get access to a client, depending on the underlying SDK. Please have a look at the provider documentation to see the different ways to instantiate the clients. From e7457e7104b5d8b4b61b0ed304a8b1cde8fe0dcb Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 14:10:28 +0200 Subject: [PATCH 03/11] Add documentation about receiving data --- docs/getting-started/directory.conf | 1 + docs/getting-started/publishing.md | 11 +- docs/getting-started/serialization.md | 6 + docs/getting-started/subscribing.md | 156 ++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 docs/getting-started/serialization.md diff --git a/docs/getting-started/directory.conf b/docs/getting-started/directory.conf index 2d87703..d3c9e98 100644 --- a/docs/getting-started/directory.conf +++ b/docs/getting-started/directory.conf @@ -5,4 +5,5 @@ laika.navigationOrder = [ publishing.md subscribing.md administration.md + serialization.md ] diff --git a/docs/getting-started/publishing.md b/docs/getting-started/publishing.md index 8d29854..f2b19d3 100644 --- a/docs/getting-started/publishing.md +++ b/docs/getting-started/publishing.md @@ -3,6 +3,8 @@ Publishing data is made using a @:api(com.commercetools.queue.QueuePublisher). You can acquire one through a @:api(com.commercetools.queue.QueueClient) by using the `publish()` method. A `QueuePublisher` is associated to a specific queue, which is provided when creating the publisher. +The publisher also requires a [data serializer][doc-serializer] upon creation for the type of data you want to publish to it. + ```scala mdoc import cats.effect.IO @@ -39,8 +41,8 @@ Several `Stream`s can safely publish to the same sink concurrently, so you can r ## Explicit publish -If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes a way to publish a single message or a single batch. -This abstraction comes in handy when the messages you produces do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above. +If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes ways to publish a single message or a single batch. +This abstraction comes in handy when the messages you produce do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above. A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly. @@ -60,8 +62,8 @@ publisher.pusher.use { queuePusher => ``` @:callout(warning) -Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using the a closed resource after the `use` block returns. -If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested withing the `queuePusher` one. +Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using a closed resource after the `use` block returns. +If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested within the `queuePusher` one. ```scala mdoc:compile-only import cats.effect.std.Supervisor @@ -85,3 +87,4 @@ publisher.pusher.use { queuePusher => [cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource [cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor +[doc-serializer]: serialization.md#data-serializer diff --git a/docs/getting-started/serialization.md b/docs/getting-started/serialization.md new file mode 100644 index 0000000..bf8fb7c --- /dev/null +++ b/docs/getting-started/serialization.md @@ -0,0 +1,6 @@ +{% nav = true %} +# Data Serialization + +## Data `Serializer` + +## Data `Deserializer` diff --git a/docs/getting-started/subscribing.md b/docs/getting-started/subscribing.md index 64294b8..9426818 100644 --- a/docs/getting-started/subscribing.md +++ b/docs/getting-started/subscribing.md @@ -1 +1,157 @@ +{% nav = true %} # Receiving Data + +Receiving data is achieved through a @:api(com.commercetools.queue.QueueSubscriber). You can acquire one throuh a @:api(com.commercetools.queue.QueueClient) by using the `subscribe()` method. A `QueueSubscriber` is associated with a specific queue, which is provided when creating the subscriber. + +The subscriber also requires a [data deserializer][doc-deserializer] upon creation for the of data you want to receive from the queue. + +```scala mdoc +import cats.effect.IO + +import com.commercetools.queue.{QueueClient, QueueSubscriber} + +def client: QueueClient[IO] = ??? + +// returns a subscriber for the queue named `my-queue`, +// which can receive messages of type String +def subscriber: QueueSubscriber[IO, String] = + client.subscribe[String]("my-queue") +``` + +When a message is received by a subscriber, it is _locked_ (or _leased_) for a queue level configured amount of time (see more on the [Managing Queues] page). This means that only one subscriber receives (and can process) a given message in this amount of time. A message needs to be settled once processed (or if processing fails) to ensure it is either removed from the queue or made available again. This is part of the message control flow. + +In the following, we explain what kind of control flow handling is provided by the library. + +## Processors + +The @:api(com.commercetools.queue.QueueSubscriber) abstraction provides a `processWithAutoAck()` method, which automatically handles the control flow part for you. You only need to provide the processing function, allowing you to focus on your business logic. + +```scala mdoc:compile-only +import scala.concurrent.duration._ + +subscriber.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds) { message => + IO.println(s"Received ${message.payload}").as(message.messageId) +} +``` + +The processing function receives a @:api(com.commercetools.queue.Message), which gives access to the content and some metadata of the received messages. + +The result is a `Stream` of the processing results, emitted in the order the messages where received. Only the successfully processed messages are emitted down-stream. The stream is failed upon the first failed processing. + +The `processWithAutoAck` method performs automatic acking/nacking for you depending on the processing outcome. It comes in handy to implement at-most once delivery startegies, releasing the message to be reprocessed by another subscriber in case of error. + +If you wish to implement a stream that does not fail upon error, you can use the `attemptProcessWithAutoAck()` methods, which emits the results of the processing as an `Either[Throwable, T]`. The resulting stream does not fail if some processing fails. Otherwise it has the same behavior as the stream above. + +## Raw message stream + +If you want more fine-grained tuning of the control flow part, you can resort to the `messages()` stream available via the `QueueSubscriber`. +The stream emits a @:api(com.commercetools.queue.MessageContext) for each received message, giving access to the message content and metadata, as well as to message control methods. + +Using this stream, you can implement your processing strategy. + +@:callout(warning) +Using this stream, you need to explicitly settle the message (either by acking or nacking it) once processed. Failing to do so, results in the message being made available to other subscribers once the lock expires. + +It is up to you to ensure that all control flow paths lead to explicit settlement of the messages you received. + +The recommendation is to only resort to this stream if you need to implement complex custom control flow for messages. +@:@ + +A simplified `processWithAutoAck` method described above could be implemented this way. + +@:callout(info) +The real implementation is more complex to ensure that all successfully processed messages are actually emitted down-stream and is more efficient. +@:@ + +```scala mdoc:compile-only +import cats.effect.Outcome + +import scala.concurrent.duration._ + +subscriber + .messages(batchSize = 10, waitingTime = 20.seconds) + .evalMap { context => + IO.println(s"Received ${context.payload}") + .as(context.messageId) + .guaranteeCase { + case Outcome.Succeeded(_) => context.ack() + case _ => context.nack() + } + } +``` + +### `MessageContext` control flow + +There are three different methods that can be used to control the message lifecycle from the subscriber point of view: + + 1. `MessageContext.ack()` acknowledges the message, and marks it as successfully processed in the queue system. It will be permanently removed and no other subscriber will ever receive it. + 2. `MessageContext.nack()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process. + 3. `MessageContext.extendLock()` extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system. + +## Explicit pull + +If you are integrated the library with an existing code base that performs explicit pulls from the queue, you can access the @:api(com.commercetools.queue.QueuePuller) lower level API, which exposes ways to pull batch of messages. +This abstraction comes in handy when your processing code is based on a callback approach and is not implemented as a `Stream`, otherwise you should prefer the streams presented above. + +A `QueuePuller` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly. + +```scala mdoc:compile-only +import cats.effect.Outcome + +import scala.concurrent.duration._ + +import cats.syntax.foldable._ + +subscriber.puller.use { queuePuller => + + queuePuller + .pullBatch(batchSize = 10, waitingTime = 20.seconds) + .flatMap { chunk => + chunk.traverse_ { context => + IO.println(s"Received ${context.payload}").guaranteeCase { + case Outcome.Succeeded(_) => context.ack() + case _ => context.nack() + } + } + } + +} +``` + +@:callout(warning) +Make sure that `IO`s pulling from the `queuePuller` do not outlive the `use` scope, otherwise you will be using a closed resource after the `use` block returns. +If you need to spawn background fibers using the `queuePuller`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested within the `queuePuller` one. + +```scala mdoc:compile-only +import cats.effect.Outcome +import cats.effect.std.Supervisor + +import scala.concurrent.duration._ + +import cats.syntax.foldable._ + +subscriber.puller.use { queuePuller => + // create a supervisor that waits for supervised spawn fibers + // to finish before being released + Supervisor[IO](await = true).use { supervisor => + queuePuller + .pullBatch(batchSize = 10, waitingTime = 20.seconds) + .flatMap { chunk => + chunk.traverse_ { context => + supervisor.supervise { + IO.println(s"Received ${context.payload}") + .guaranteeCase { + case Outcome.Succeeded(_) => context.ack() + case _ => context.nack() + } + } + } + } + } +} +``` +@:@ + +[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource +[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor +[doc-deserializer]: serialization.md#data-deserializer From 51052934681eeb9ff0c384775653f920354a5198 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 16:12:55 +0200 Subject: [PATCH 04/11] Add queue update method --- .../queue/aws/sqs/SQSAdministration.scala | 22 ++++++++++++++++++- .../servicebus/ServiceBusAdministration.scala | 8 +++++++ .../queue/QueueAdministration.scala | 12 ++++++++++ .../otel4s/MeasuringQueueAdministration.scala | 8 +++++++ .../queue/otel4s/QueueMetrics.scala | 5 +++-- 5 files changed, 52 insertions(+), 3 deletions(-) diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala index f5fbf64..584618d 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSAdministration.scala @@ -19,8 +19,9 @@ package com.commercetools.queue.aws.sqs import cats.effect.Async import cats.syntax.all._ import com.commercetools.queue.QueueAdministration +import com.commercetools.queue.aws.sqs.makeQueueException import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, QueueDoesNotExistException} +import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, QueueAttributeName, QueueDoesNotExistException, SetQueueAttributesRequest} import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ @@ -43,6 +44,25 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S }.void .adaptError(makeQueueException(_, name)) + override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = + getQueueUrl(name) + .flatMap { queueUrl => + F.fromCompletableFuture { + F.delay { + client.setQueueAttributes( + SetQueueAttributesRequest + .builder() + .queueUrl(queueUrl) + .attributes(Map( + QueueAttributeName.MESSAGE_RETENTION_PERIOD -> messageTTL.map(_.toSeconds.toString()), + QueueAttributeName.VISIBILITY_TIMEOUT -> lockTTL.map(_.toSeconds.toString()) + ).flattenOption.asJava) + .build()) + } + }.void + } + .adaptError(makeQueueException(_, name)) + override def delete(name: String): F[Unit] = getQueueUrl(name) .flatMap { queueUrl => diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala index 00707e8..7241e1d 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusAdministration.scala @@ -39,6 +39,14 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp .void .adaptError(makeQueueException(_, name)) + override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = + F.blocking { + val properties = client.getQueue(name) + messageTTL.foreach(ttl => properties.setDefaultMessageTimeToLive(Duration.ofMillis(ttl.toMillis))) + lockTTL.foreach(ttl => properties.setLockDuration(Duration.ofMillis(ttl.toMillis))) + val _ = client.updateQueue(properties) + } + override def delete(name: String): F[Unit] = F.blocking(client.deleteQueue(name)) .void diff --git a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala index 22bc59c..ac6c601 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueAdministration.scala @@ -18,12 +18,24 @@ package com.commercetools.queue import scala.concurrent.duration.FiniteDuration +/** + * Interface that gives access to the queue administration capabilities. + */ trait QueueAdministration[F[_]] { + /** Creates a queue with the given name, message TTL and lock TTL. */ def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] + /** + * Updates the queue with the given name, with provided message TTL and/or lock TTL. + * Only the provided elements are updated, if a value is not provided, the previous value is kept. + */ + def update(name: String, messageTTL: Option[FiniteDuration] = None, lockTTL: Option[FiniteDuration] = None): F[Unit] + + /** Deletes the queue with the given name. */ def delete(name: String): F[Unit] + /** Indicates whether the queue with the given name exists. */ def exists(name: String): F[Boolean] } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala index d8402bb..8543a02 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueueAdministration.scala @@ -40,6 +40,14 @@ class MeasuringQueueAdministration[F[_]]( } .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.create, requestCounter)) + override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] = + tracer + .span("queue.update") + .surround { + underlying.update(name, messageTTL, lockTTL) + } + .guaranteeCase(QueueMetrics.increment(Attribute("queue", name), QueueMetrics.update, requestCounter)) + override def delete(name: String): F[Unit] = tracer .span("queue.delete") diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala index c20df06..6163576 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala @@ -39,11 +39,12 @@ private object QueueMetrics { final val send = Attribute("method", "send") final val receive = Attribute("method", "receive") final val ack = Attribute("method", "ack") - final val nack = Attribute("method", """|nack""".stripMargin) + final val nack = Attribute("method", "nack") final val extendLock = Attribute("method", "extendLock") - // queueue management attributes + // queue management attributes final val create = Attribute("method", "create") + final val update = Attribute("method", "update") final val delete = Attribute("method", "delete") final val exist = Attribute("method", "exist") From df27418bb698ac3497713ba48d3ad1f825851c35 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 16:13:13 +0200 Subject: [PATCH 05/11] Add queue administration documentation --- docs/getting-started/administration.md | 62 ++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/docs/getting-started/administration.md b/docs/getting-started/administration.md index eac9611..2e53999 100644 --- a/docs/getting-started/administration.md +++ b/docs/getting-started/administration.md @@ -1 +1,63 @@ +{% nav = true %} # Managing Queues + +Managing queues is made using the @:api(com.commercetools.queue.QueueAdministration). You can acquire an instance through a @:api(com.commercetools.queue.QueueClient) by using the `administration()` method. A `QueueAdministration` instance is **not** associated to any specific queue. The queue to operate on will be provided in each administration method. + +```scala mdoc +import cats.effect.IO + +import com.commercetools.queue.{QueueAdministration, QueueClient} + +def client: QueueClient[IO] = ??? + +def admin: QueueAdministration[IO] = + client.administration +``` + +## Create a queue + +A queue can be created by using the `create()` method. +You need to provide the message TTL and lock TTL together with the new queue name. +The different TTLs are defined at queue level and will affect messages published and subscribers. + +```scala mdoc:compile-only +import scala.concurrent.duration._ + +admin.create("my-queue", messageTTL = 14.days, lockTTL = 2.minutes) +``` + +@:callout(info) +The `lockTTL` parameter has an influence on the behavior of the [subscriber][doc-subscribing] methods and streams. If you change it at runtime, you should restart the subscribing streams to ensure they are taken into account. +@:@ + +## Check if queue exists + +You can check the existence of a queue in the queue system by calling the `exists()` method. + +```scala mdoc:compile-only +admin.exists("my-queue") +``` + +## Update queue properties + +The queue TTLs can be updated for an existing queue by using the `update()` method. The method accepts both TTLs as `Option[FiniteDuration]` with a default value of `None`. Only the defined TTLs will be updated. Values that are not provided are left unchanged. + +```scala mdoc:compile-only +import scala.concurrent.duration._ + +// only updates the lock TTL +admin.update("my-queue", lockTTL = Some(5.minutes)) + +// updates both the message and lock TTLs +admin.update("my-queue", messageTTL = Some(1.day), lockTTL = Some(5.minutes)) +``` + +## Delete a queue + +You can delete an existing queue by calling the `delete()` method. + +```scala mdoc:compile-only +admin.delete("my-queue") +``` + +[doc-subscribing]: subscribing.md From fc0518e518fdd8602c5581554d78b9e63d66a5a7 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 16:41:16 +0200 Subject: [PATCH 06/11] Add serialization documentation --- build.sbt | 14 ++++++++---- docs/getting-started/serialization.md | 33 +++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 44299fc..336c856 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,7 @@ import laika.config.PrettyURLs import laika.config.LinkConfig import laika.config.ApiLinks +import laika.config.SourceLinks ThisBuild / tlBaseVersion := "0.0" @@ -103,10 +104,15 @@ lazy val docs = project .settings( tlSiteApiPackage := Some("com.commercetools.queue"), tlSiteHelium := CTTheme(tlSiteHelium.value), - laikaConfig := tlSiteApiUrl.value.fold(laikaConfig.value) { apiUrl => - laikaConfig.value.withConfigValue(LinkConfig.empty - .addApiLinks(ApiLinks(baseUri = apiUrl.toString().dropRight("index.html".size)))) - }, + laikaConfig := tlSiteApiUrl.value + .fold(laikaConfig.value) { apiUrl => + laikaConfig.value.withConfigValue( + LinkConfig.empty + .addApiLinks(ApiLinks(baseUri = apiUrl.toString().dropRight("index.html".size))) + .addSourceLinks( + SourceLinks(baseUri = "https://github.com/commercetools/fs2-queues", suffix = "scala") + )) + }, laikaExtensions += PrettyURLs, tlFatalWarnings := false, libraryDependencies ++= List( diff --git a/docs/getting-started/serialization.md b/docs/getting-started/serialization.md index bf8fb7c..3fbb098 100644 --- a/docs/getting-started/serialization.md +++ b/docs/getting-started/serialization.md @@ -1,6 +1,39 @@ {% nav = true %} # Data Serialization +The library uses a string encoding for the message payload that is published or received from a queue. +The proper string encoding/decoding is performed by the underlying SDK, allowing you to only focus on the string serialization part. + ## Data `Serializer` +A @:api(com.commercetools.queue.Serializer) is defined as a _SAM interface_ that is basically a `T => String`. Defining a new one can be done easily by providing an implicit conversion function from the type `T` to serialize to a `String`. + +For instance, adding a serializer for `Int`s can be done as follows. + +```scala mdoc +import com.commercetools.queue.Serializer + +implicit val serializer: Serializer[Int] = _.toString +``` + +The library provides natively a _no-op_ serializer for `String`s. + ## Data `Deserializer` + +A @:api(com.commercetools.queue.Deserializer) is defined as a _SAM interface_ that is basically a `String => Either[Throwable, T]`. Defining a new one can be done easily by providing an implicit conversion function from a `String` to serialize to either a value of the type `T` or an exception. + +For instance, adding a deserializer for `Int`s can be done as follows. + +```scala mdoc +import cats.syntax.either._ + +import com.commercetools.queue.Deserializer + +implicit val deserializer: Deserializer[Int] = s => Either.catchNonFatal(s.toInt) +``` + +The library provides natively a _no-op_ deserializer for `String`s. + +## Library integration + +We provide integration with some well-established serialization libraries (e.g. to perform JSON serialization). Have a look at the _Library Integration_ section for your favorite library. From b71829f4765f46fccca8c5891de72bce3a710951 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 16:47:49 +0200 Subject: [PATCH 07/11] Add circe integration documentation --- docs/integrations/circe.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/integrations/circe.md b/docs/integrations/circe.md index 2b60567..e710291 100644 --- a/docs/integrations/circe.md +++ b/docs/integrations/circe.md @@ -1 +1,16 @@ # Circe + +The `fs2-queues-circe` provides integration with the [circe][circe] library. + +It provides: + + - a @:api(com.commercetools.queue.Serializer) for each type `T` that has an implicit `io.circe.Encoder[T]` in scope. + - a @:api(com.commercetools.queue.Deserializer) for each type `T` that has an implicit `io.circe.Decoder[T]` in scope. + +To get this feature in your code base, import the following: + +```scala mdoc +import com.commercetools.queue.circe._ +``` + +[circe]: https://circe.github.io/circe/ From 4d511cd8d0b62b3f6be6f74d9a275ecbc5eafd2e Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 16:58:53 +0200 Subject: [PATCH 08/11] Add otel4s integration documentation --- docs/integrations/otel4s.md | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docs/integrations/otel4s.md b/docs/integrations/otel4s.md index a950740..bb542b6 100644 --- a/docs/integrations/otel4s.md +++ b/docs/integrations/otel4s.md @@ -1 +1,40 @@ # Otel4s + +The `fs2-queues-otel4s` provides an integration with [otel4s][otel4s]. + +It allows you to wrap an existing @:api(com.commercetools.queue.QueueClient) into a @:api(com.commercetools.queue.otel4s.MeasuringQueueClient), which adds [tracing][otel4s-tracing] and [metrics][otel4s-metrics] on every call to the underlying queue system. + +You can opt-in for either one of them or both. + +```scala mdoc:compile-only +import cats.effect.IO + +import com.commercetools.queue.QueueClient +import com.commercetools.queue.otel4s._ + +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.trace.Tracer + +val rawClient: QueueClient[IO] = ??? + +implicit val meter: Meter[IO] = ??? +implicit val tracer: Tracer[IO] = ??? + +val withMetrics = rawClient.withMetrics() + +val withTracing = rawClient.withTraces + +val withBoth = rawClient.withMetricsAndTraces() +``` + +The extension methods adding metrics take an optional parameter indicating the name of the counter that counts the calls to the queue system. The default is defined in `MeasuringQueueClient.defaultRequestMetricsName`. + +```scala mdoc +import com.commercetools.queue.otel4s.MeasuringQueueClient + +MeasuringQueueClient.defaultRequestMetricsName +``` + +[otel4s]: https://typelevel.org/otel4s/ +[otel4s-tracing]: https://typelevel.org/otel4s/instrumentation/tracing.html +[otel4s-metrics]: https://typelevel.org/otel4s/instrumentation/metrics.html From 1c2d383a9f5bbdc8e05d79cf432a08a96f66d864 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Mon, 29 Apr 2024 18:02:51 +0200 Subject: [PATCH 09/11] Make the README a more developer centric asset --- README.md | 88 +++++++------------------------------------------------ 1 file changed, 10 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index 5bd9b7d..e561ab6 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,16 @@ # Common Cloud Client Tools +[![Continuous Integration](https://github.com/commercetools/fs2-queues/actions/workflows/ci.yml/badge.svg)](https://github.com/commercetools/fs2-queues/actions/workflows/ci.yml) -Aims at providing a unified way of working with cloud queues (SQS, PubSub, Service Bus, ...) across all CT scala services. +Aims at providing a unified way of working with cloud queues (SQS, PubSub, Service Bus, ...). -## Common queue interface +All the abstractions are defined in the [`core`](core/) module. Other modules implement the abstraction for various queue systems and integration with other libraries. -The library offers both low and high level possibilities, making it possible to have fine grained control over queue pulling, or just focusing on processing, delegating message management to the library. +For more documentation, head over to the [documentation website](https://commercetools.github.io/fs2-queues). -The design of the API is the result of the common usage patterns at CT and how the various client SDKs are designed. -There are several views possible on a queue: - - as a `QueuePublisher` when you only need to publish messages to an existing queue. - - as a `QueueSubscriber` when you only need to subscribe to an existing queue. - - as a `QueueAdministration` when you need to manage queues (creation, deletion, ...). +## Development -The entry point is the `QueueClient` factory for each underlying queue system. - -In the examples below, we will use the following publisher and subscriber streams: - -```scala -import fs2.Stream -import cats.effect.IO -import cats.effect.std.Random -import scala.concurrent.duration._ - -import com.commercetools.queue._ - -def publishStream(publisher: QueuePublisher[IO, String]): Stream[IO, Nothing] = - Stream.eval(Random.scalaUtilRandom[IO]).flatMap { random => - Stream - // repeatedly emit a random string of length 10 - .repeatEval(random.nextString(10)) - // every 100 milliseconds - .metered(100.millis) - // and publish in batch of 10 - .through(publisher.sink(batchSize = 10)) - } - -def subscribeStream(subscriber: QueueSubscriber[IO, String]): Stream[IO, Nothing] = - subscriber - // receives messages in batches of 5, - // waiting max for 20 seconds - // print every received message, - // and ack automatically - .processWithAutoAck(5, 20.seconds)(msg => IO.println(msg.payload)) - // results are non important - .drain - -def program(client: QueueClient[IO]): IO[Unit] = { - val queueName = "my-queue" - // subscribe and publish concurrently - subscribeStream(client.subscribe[String](queueName)) - // concurrently publish messages - .concurrently(publishStream(client.publish[String](queueName))) - .compile - // runs forever - .drain -} -``` - -## Working with Azure Service Bus queues - -```scala -import com.commercetools.queue.azure.servicebus._ -import com.azure.identity.DefaultAzureCredentialBuilder - -val namespace = "{namespace}.servicebus.windows.net" // your namespace -val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate - -ServiceBusClient[IO](namespace, credentials).use(program(_)) -``` - -## Working with AWS SQS - - -```scala -import com.commercetools.queue.aws.sqs._ -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider - -val region = Region.US_EAST_1 // your region -val credentials = DefaultCredentialsProvider.create() // however you want to authenticate - -SQSClient[IO](region, credentials).use(program(_)) -``` +Following commands are useful when developing: + - `sbt compile` compiles all the modules. + - `sbt test` runs all the tests. + - `sbt prePR` prepares the current branch before pushing and opening a PR. It ensures various static checks will pass. + - `sbt docs/tlSitePreview` starts a local server with the built documentation site. From 1d018ef6437bc73a3c150e2b05fbc61e01ce4ebe Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 30 Apr 2024 08:53:39 +0200 Subject: [PATCH 10/11] Add a bit more details in home --- docs/index.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 3c9f369..6add4ba 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,4 +1,5 @@ # Home -Cloud Queues is a library that provides interfaces for working with queue systems. -It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md) +Cloud Queues is a library that provides interfaces for working with queue systems. It is an opinionated library unifying the ways of working with queues independently of the underlying system. + +It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md). From c5242a1c6906b8cfe9fce80dbb4cde1c83159d96 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 30 Apr 2024 08:53:51 +0200 Subject: [PATCH 11/11] Use the generic email address for contact --- CODE_OF_CONDUCT.md | 2 +- build.sbt | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 443b51f..3dadfe6 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -9,6 +9,6 @@ Everyone is expected to follow the [Scala Code of Conduct] when discussing the p If you have any questions, concerns, or moderation requests, please contact a member of the project. -- [Lucas Satabin](mailto:lucas.satabin@commercetools.com) +- [Commercetools Open-Source](mailto:opensource@commercetools.com) [Scala Code of Conduct]: https://scala-lang.org/conduct/ diff --git a/build.sbt b/build.sbt index 336c856..e3105e6 100644 --- a/build.sbt +++ b/build.sbt @@ -10,9 +10,6 @@ ThisBuild / organizationName := "Commercetools GmbH" ThisBuild / startYear := Some(2024) ThisBuild / licenses := Seq(License.Apache2) ThisBuild / tlCiDependencyGraphJob := false -ThisBuild / developers := List( - tlGitHubDev("satabin", "Lucas Satabin") -) val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")