Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MessageBatch to QueuePuller #56

Merged
merged 11 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.implicits.toFunctorOps
import com.commercetools.queue.{Message, UnsealedMessageBatch}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry}

private class SQSMessageBatch[F[_], T](
payload: Chunk[SQSMessageContext[F, T]],
client: SqsAsyncClient,
queueUrl: String
)(implicit F: Async[F])
extends UnsealedMessageBatch[F, T] {

override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[Unit] =
F.fromCompletableFuture {
F.delay {
client.deleteMessageBatch(
DeleteMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(payload.map { m =>
DeleteMessageBatchRequestEntry
.builder()
.receiptHandle(m.receiptHandle)
.id(m.messageId)
.build()
}.asJava)
.build()
)
}
}.void

override def nackAll: F[Unit] = F.fromCompletableFuture {
F.delay {
val req = ChangeMessageVisibilityBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(
payload.map { m =>
ChangeMessageVisibilityBatchRequestEntry
.builder()
.id(m.messageId)
.receiptHandle(m.receiptHandle)
.visibilityTimeout(0)
.build()
}.asJava
)
.build()
client.changeMessageVisibilityBatch(
req
)
}
}.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private class SQSMessageContext[F[_], T](
val rawPayload: String,
val enqueuedAt: Instant,
val metadata: Map[String, String],
receiptHandle: String,
val receiptHandle: String,
val messageId: String,
lockTTL: Int,
queueName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest}
Expand All @@ -42,6 +42,9 @@ private class SQSPuller[F[_], T](
extends UnsealedQueuePuller[F, T] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
pullInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, T]]]

private def pullInternal(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[SQSMessageContext[F, T]]] =
F.fromCompletableFuture {
F.delay {
// visibility timeout is at queue creation time
Expand Down Expand Up @@ -88,6 +91,9 @@ private class SQSPuller[F[_], T](
)
}
}
}.widen[Chunk[MessageContext[F, T]]]
.adaptError(makePullQueueException(_, queueName))
}.adaptError(makePullQueueException(_, queueName))

override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] =
pullInternal(batchSize, waitingTime)
.map(new SQSMessageBatch[F, T](_, client, queueUrl))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Message, UnsealedMessageBatch}
import fs2.Chunk

private class ServiceBusMessageBatch[F[_], T](
payload: Chunk[ServiceBusMessageContext[F, T]],
receiver: ServiceBusReceiverClient
)(implicit F: Async[F])
extends UnsealedMessageBatch[F, T] {
override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[Unit] = F.blocking {
payload.foreach(mCtx => receiver.complete(mCtx.underlying))
}

override def nackAll: F[Unit] = F.blocking {
payload.foreach(mCtx => receiver.abandon(mCtx.underlying))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import java.time.Duration
Expand All @@ -37,7 +37,11 @@ private class ServiceBusPuller[F[_], Data](
deserializer: Deserializer[Data])
extends UnsealedQueuePuller[F, Data] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = F
override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] =
pullBatchInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, Data]]]

private def pullBatchInternal(batchSize: Int, waitingTime: FiniteDuration)
: F[Chunk[ServiceBusMessageContext[F, Data]]] = F
.blocking {
Chunk
.iterator(receiver.receiveMessages(batchSize, Duration.ofMillis(waitingTime.toMillis)).iterator().asScala)
Expand All @@ -52,7 +56,8 @@ private class ServiceBusPuller[F[_], Data](
}
}
}
.widen[Chunk[MessageContext[F, Data]]]
.adaptError(makePullQueueException(_, queueName))

override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, Data]] =
pullBatchInternal(batchSize, waitingTime).map(payload => new ServiceBusMessageBatch[F, Data](payload, receiver))
}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ lazy val core: CrossProject = crossProject(JVMPlatform)
name := "fs2-queues-core",
// TODO: Remove once next version is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePuller.pullMessageBatch")
),
libraryDependencies ++= List(
"co.fs2" %%% "fs2-core" % Versions.fs2
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/com/commercetools/queue/MessageBatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import fs2.Chunk

/**
* Interface to interact with the message received from a queue
* as a single batch allowing user to ack/nack all messages in a single
* call if the underlying implementation supports for it.
*
* For implementations that do not support batched acknowledging both
* `ackAll` and `nackAll` methods do not guarantee atomicity and will
* fallback to using per-message calls.
*/
sealed trait MessageBatch[F[_], T] {
def messages: Chunk[Message[F, T]]

/**
* Acknowledges all the messages in the chunk.
*/
def ackAll: F[Unit]

/**
* Mark all messages from the chunk as non acknowledged.
*/
def nackAll: F[Unit]
}

private[queue] trait UnsealedMessageBatch[F[_], T] extends MessageBatch[F, T]
9 changes: 9 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueuePuller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ sealed trait QueuePuller[F[_], T] {
*/
def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]]

/**
* Pulls batch of messages with the same semantics as `pullBatch`
* with the difference in message lifecycle control. Messages pulled this
* way can only be managed (ack'ed, nack'ed) in bulk by batching acknowledgements
* if the underlying implementation supports it, otherwise it falls back to
* non-atomic, per-message management.
*/
def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]]
mladens marked this conversation as resolved.
Show resolved Hide resolved

}

private[queue] trait UnsealedQueuePuller[F[_], T] extends QueuePuller[F, T]
17 changes: 17 additions & 0 deletions core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ sealed abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
Stream.repeatEval(puller.pullBatch(batchSize, waitingTime)).unchunks
}

/**
* The stream of messages published in the subscribed queue.
* The [[MessageBatch]] gives an interface to interact with the
* batch messages in bulk.
*
* The stream emits chunks of size `batchSize` max, and waits for
* elements during `waitingTime` before emitting a chunk.
*
* '''Note:''' message batches returned by this stream must be
* manually managed (ack'ed, nack'ed) but in contrast to `messages`,
* the entire batch must be acknowledged as a whole..
*/
final def messageBatches(batchSize: Int, waitingTime: FiniteDuration): Stream[F, MessageBatch[F, T]] =
mladens marked this conversation as resolved.
Show resolved Hide resolved
Stream.resource(puller).flatMap { puller =>
Stream.repeatEval(puller.pullMessageBatch(batchSize, waitingTime))
}

/**
* Processes the messages with the provided processing function.
* The messages are automatically ack'ed on success and nack'ed on error,
Expand Down
52 changes: 51 additions & 1 deletion docs/getting-started/subscribing.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ subscriber
}
```

For high throughput scenarios where acknowledging individual messages wouldn't be optimal, consider using `messageBatches()`.

Batching method exposes `MessageBatch` giving user control over entire batch as a whole allowing for batched acknowledgement if the implementation supports it.

Chunked messages can be accessed via `messages`.

```scala mdoc:compile-only
subscriber
.messageBatches(batchSize = 10, waitingTime = 20.seconds)
.evalMap { batch =>
batch.messages.parTraverse_ { msg =>
msg.payload.flatTap { payload =>
IO.println(s"Received $payload")
}
}.guaranteeCase {
case Outcome.Succeeded(_) => batch.ackAll()
case _ => batch.nackAll
}
}
```


### `MessageContext` control flow

There are three different methods that can be used to control the message lifecycle from the subscriber point of view:
Expand All @@ -140,6 +162,14 @@ There are three different methods that can be used to control the message lifecy
2. `MessageContext.nack()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process.
3. `MessageContext.extendLock()` extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system.

### `MessageBatch` control flow

Methods have the same semantics to `MessageContext` ones with the difference that they act on all messages from the batch at once. Whether the action is atomic across all messages depends on the underlying implementation.

1. `MessageContext.ackAll()` acknowledges all the messages from the batch.
2. `MessageContext.nackAll()` marks all the messages from the batch as not processed.


## Explicit pull

If you are integrating this library with an existing code base that performs explicit pulls from the queue, you can access the @:api(QueuePuller) lower level API, which exposes ways to pull batch of messages.
Expand Down Expand Up @@ -168,7 +198,6 @@ subscriber.puller.use { queuePuller =>
}
}
}

}
```

Expand Down Expand Up @@ -208,6 +237,27 @@ subscriber.puller.use { queuePuller =>
```
@:@

To pull batches that can be acknowledged in batches, use `pullMessageBatch()`

```scala mdoc:compile-only
subscriber.puller.use { queuePuller =>

queuePuller
.pullMessageBatch(batchSize = 10, waitingTime = 20.seconds)
.flatMap { batch =>
batch.messages.traverse_ { message =>
message.payload.flatMap { payload =>
IO.println(s"Received $payload")
}.guaranteeCase {
case Outcome.Succeeded(_) => batch.ackAll()
case _ => batch.nackAll()
}
}
}

}
```

[cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource
[cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor
[doc-deserializer]: serialization.md#data-deserializer
Loading
Loading