diff --git a/build.sbt b/build.sbt index 3224025..44299fc 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,6 @@ import laika.config.PrettyURLs +import laika.config.LinkConfig +import laika.config.ApiLinks ThisBuild / tlBaseVersion := "0.0" @@ -15,7 +17,7 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3") ThisBuild / scalaVersion := Scala213 -lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s) +lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs) ThisBuild / tlSitePublishBranch := Some("main") @@ -101,6 +103,10 @@ lazy val docs = project .settings( tlSiteApiPackage := Some("com.commercetools.queue"), tlSiteHelium := CTTheme(tlSiteHelium.value), + laikaConfig := tlSiteApiUrl.value.fold(laikaConfig.value) { apiUrl => + laikaConfig.value.withConfigValue(LinkConfig.empty + .addApiLinks(ApiLinks(baseUri = apiUrl.toString().dropRight("index.html".size)))) + }, laikaExtensions += PrettyURLs, tlFatalWarnings := false, libraryDependencies ++= List( @@ -114,5 +120,10 @@ lazy val unidocs = project .enablePlugins(TypelevelUnidocPlugin) .settings( name := "fs2-queues-docs", - ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(core.jvm, circe.jvm, azureServiceBus.jvm, awsSQS.jvm) + ScalaUnidoc / unidoc / unidocProjectFilter := inProjects( + core.jvm, + circe.jvm, + azureServiceBus.jvm, + awsSQS.jvm, + otel4s.jvm) ) diff --git a/docs/getting-started/publishing.md b/docs/getting-started/publishing.md index 3b4e052..8d29854 100644 --- a/docs/getting-started/publishing.md +++ b/docs/getting-started/publishing.md @@ -1 +1,87 @@ +{% nav = true %} # Publishing Data + +Publishing data is made using a @:api(com.commercetools.queue.QueuePublisher). You can acquire one through a @:api(com.commercetools.queue.QueueClient) by using the `publish()` method. A `QueuePublisher` is associated to a specific queue, which is provided when creating the publisher. + +```scala mdoc +import cats.effect.IO + +import com.commercetools.queue.{QueueClient, QueuePublisher} + +def client: QueueClient[IO] = ??? + +// returns a publisher for the queue named `my-queue`, +// which can publish messages of type String +def publisher: QueuePublisher[IO, String] = + client.publish[String]("my-queue") +``` + +## Pipe a stream through the publisher sink + +The @:api(com.commercetools.queue.QueuePublisher) abstraction provides a `sink()` pipe, through which you can make your publishing source stream go. +The pipe takes a parameter allowing for batching publications. + +```scala mdoc:compile-only +import fs2.{Pipe, Stream} + +val input: Stream[IO, String] = ??? + +// messages are published in batch of 10 +val publicationSink: Pipe[IO, String, Nothing] = publisher.sink(batchSize = 10) + +// pipe the message producing stream through the publication sink +input.through(publicationSink) +``` + +@:callout(info) +Several `Stream`s can safely publish to the same sink concurrently, so you can reuse the `publicationSink` variable. +@:@ + +## Explicit publish + +If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes a way to publish a single message or a single batch. +This abstraction comes in handy when the messages you produces do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above. + +A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly. + +```scala mdoc:compile-only +publisher.pusher.use { queuePusher => + val produceMessages: IO[List[String]] = ??? + + // produce a batch + produceMessages + .flatMap { messages => + // push the batch + queuePusher.push(messages, None) + } + // repeat forever + .foreverM +} +``` + +@:callout(warning) +Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using the a closed resource after the `use` block returns. +If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested withing the `queuePusher` one. + +```scala mdoc:compile-only +import cats.effect.std.Supervisor + +publisher.pusher.use { queuePusher => + val produceMessages: IO[List[String]] = ??? + + // create a supervisor that waits for supervised spawn fibers + // to finish before being released + Supervisor[IO](await = true).use { supervisor => + // produce a batch + produceMessages + .flatMap { messages => + // push the batch in the background + supervisor.supervise(queuePusher.push(messages, None)).void + } + } +} +``` +@:@ + +[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource +[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor diff --git a/docs/getting-started/queues.md b/docs/getting-started/queues.md index 04597bc..8eb5112 100644 --- a/docs/getting-started/queues.md +++ b/docs/getting-started/queues.md @@ -10,4 +10,4 @@ There are several views possible on a queue: - as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...). The entry point is the `QueueClient` factory for each underlying queue system. -For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/sqs.md) section to see how. +For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/index.md) section to see how. diff --git a/docs/helium/templates/pageNav.template.html b/docs/helium/templates/pageNav.template.html new file mode 100644 index 0000000..c3f4737 --- /dev/null +++ b/docs/helium/templates/pageNav.template.html @@ -0,0 +1,13 @@ +@:if(nav) + +@:@ diff --git a/docs/index.md b/docs/index.md index 6dd013e..3c9f369 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,4 +1,4 @@ -# `fs2-queues` +# Home Cloud Queues is a library that provides interfaces for working with queue systems. It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md) diff --git a/docs/systems/directory.conf b/docs/systems/directory.conf index 6154fe6..97fb442 100644 --- a/docs/systems/directory.conf +++ b/docs/systems/directory.conf @@ -1 +1,7 @@ laika.title = "Qeueue Systems" + +laika.navigationOrder = [ + index.md + sqs.md + service-bus.md +] diff --git a/docs/systems/index.md b/docs/systems/index.md new file mode 100644 index 0000000..fefcfdf --- /dev/null +++ b/docs/systems/index.md @@ -0,0 +1,5 @@ +# Providers + +`fs2-queues` comes with several queue system implementations. Each of them implements the @:api(com.commercetools.queue.QueueClient) abstraction with the various interfaces it gives access to. + +Each implementations comes with its own way to get access to a client, depending on the underlying SDK. Please have a look at the provider documentation to see the different ways to instantiate the clients.