Skip to content

Commit

Permalink
Add QueuePublisher documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Apr 29, 2024
1 parent fbe01a8 commit 3d45534
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 4 deletions.
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import laika.config.PrettyURLs
import laika.config.LinkConfig
import laika.config.ApiLinks

ThisBuild / tlBaseVersion := "0.0"

Expand All @@ -15,7 +17,7 @@ val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s)
lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs)

ThisBuild / tlSitePublishBranch := Some("main")

Expand Down Expand Up @@ -101,6 +103,10 @@ lazy val docs = project
.settings(
tlSiteApiPackage := Some("com.commercetools.queue"),
tlSiteHelium := CTTheme(tlSiteHelium.value),
laikaConfig := tlSiteApiUrl.value.fold(laikaConfig.value) { apiUrl =>
laikaConfig.value.withConfigValue(LinkConfig.empty
.addApiLinks(ApiLinks(baseUri = apiUrl.toString().dropRight("index.html".size))))
},
laikaExtensions += PrettyURLs,
tlFatalWarnings := false,
libraryDependencies ++= List(
Expand All @@ -114,5 +120,10 @@ lazy val unidocs = project
.enablePlugins(TypelevelUnidocPlugin)
.settings(
name := "fs2-queues-docs",
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(core.jvm, circe.jvm, azureServiceBus.jvm, awsSQS.jvm)
ScalaUnidoc / unidoc / unidocProjectFilter := inProjects(
core.jvm,
circe.jvm,
azureServiceBus.jvm,
awsSQS.jvm,
otel4s.jvm)
)
86 changes: 86 additions & 0 deletions docs/getting-started/publishing.md
Original file line number Diff line number Diff line change
@@ -1 +1,87 @@
{% nav = true %}
# Publishing Data

Publishing data is made using a @:api(com.commercetools.queue.QueuePublisher). You can acquire one through a @:api(com.commercetools.queue.QueueClient) by using the `publish()` method. A `QueuePublisher` is associated to a specific queue, which is provided when creating the publisher.

```scala mdoc
import cats.effect.IO

import com.commercetools.queue.{QueueClient, QueuePublisher}

def client: QueueClient[IO] = ???

// returns a publisher for the queue named `my-queue`,
// which can publish messages of type String
def publisher: QueuePublisher[IO, String] =
client.publish[String]("my-queue")
```

## Pipe a stream through the publisher sink

The @:api(com.commercetools.queue.QueuePublisher) abstraction provides a `sink()` pipe, through which you can make your publishing source stream go.
The pipe takes a parameter allowing for batching publications.

```scala mdoc:compile-only
import fs2.{Pipe, Stream}

val input: Stream[IO, String] = ???

// messages are published in batch of 10
val publicationSink: Pipe[IO, String, Nothing] = publisher.sink(batchSize = 10)

// pipe the message producing stream through the publication sink
input.through(publicationSink)
```

@:callout(info)
Several `Stream`s can safely publish to the same sink concurrently, so you can reuse the `publicationSink` variable.
@:@

## Explicit publish

If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the @:api(com.commercetools.queue.QueuePusher) lower level API, which exposes a way to publish a single message or a single batch.
This abstraction comes in handy when the messages you produces do not come from a `Stream`, otherwise you should prefer the `sink()` pipe presented above.

A `QueuePusher` is accessed as a [`Resource`][cats-effect-resource] as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly.

```scala mdoc:compile-only
publisher.pusher.use { queuePusher =>
val produceMessages: IO[List[String]] = ???

// produce a batch
produceMessages
.flatMap { messages =>
// push the batch
queuePusher.push(messages, None)
}
// repeat forever
.foreverM
}
```

@:callout(warning)
Make sure that `IO`s publishing to the `queuePusher` do not outlive the `use` scope, otherwise you will be using the a closed resource after the `use` block returns.
If you need to spawn background fibers using the `queuePusher`, you can for instance use a [`Supervisor`][cats-effect-supervisor] whose lifetime is nested withing the `queuePusher` one.

```scala mdoc:compile-only
import cats.effect.std.Supervisor

publisher.pusher.use { queuePusher =>
val produceMessages: IO[List[String]] = ???

// create a supervisor that waits for supervised spawn fibers
// to finish before being released
Supervisor[IO](await = true).use { supervisor =>
// produce a batch
produceMessages
.flatMap { messages =>
// push the batch in the background
supervisor.supervise(queuePusher.push(messages, None)).void
}
}
}
```
@:@

[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource
[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor
2 changes: 1 addition & 1 deletion docs/getting-started/queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ There are several views possible on a queue:
- as a `QueueAdministration` when you need to [manage](administration.md) queues (creation, deletion, ...).

The entry point is the `QueueClient` factory for each underlying queue system.
For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/sqs.md) section to see how.
For each supported queue provider, you can get an instance of the `QueueClient`, please refer to the [Providers](../systems/index.md) section to see how.
13 changes: 13 additions & 0 deletions docs/helium/templates/pageNav.template.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
@:if(nav)
<nav id="page-nav"@:if(helium.site.pageNavigation.keepOnSmallScreens) class="all-screens"@:@>
<p class="header"><a href="#">${cursor.currentDocument.title}</a></p>

@:navigationTree {
entries = [
{ target = "#", excludeRoot = true, depth = ${helium.site.pageNavigation.depth} }
]
}

<p class="footer">@:for(helium.site.pageNavigation.sourceEditLinks)<a href="${_.baseURL}${cursor.currentDocument.sourcePath}">${_.icon}${_.text}</a>@:@</p>
</nav>
@:@
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# `fs2-queues`
# Home

Cloud Queues is a library that provides interfaces for working with queue systems.
It integrates with various queue providers, such as [AWS SQS](systems/sqs.md) or [Azure Service Bus](systems/service-bus.md)
6 changes: 6 additions & 0 deletions docs/systems/directory.conf
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
laika.title = "Qeueue Systems"

laika.navigationOrder = [
index.md
sqs.md
service-bus.md
]
5 changes: 5 additions & 0 deletions docs/systems/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Providers

`fs2-queues` comes with several queue system implementations. Each of them implements the @:api(com.commercetools.queue.QueueClient) abstraction with the various interfaces it gives access to.

Each implementations comes with its own way to get access to a client, depending on the underlying SDK. Please have a look at the provider documentation to see the different ways to instantiate the clients.

0 comments on commit 3d45534

Please sign in to comment.