Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MessageBatch to QueuePuller #56

Merged
merged 11 commits into from
Sep 26, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.implicits.toFunctorOps
import com.commercetools.queue.{Message, MessageBatch}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry}

private class SQSMessageBatch[F[_], T](
payload: Chunk[SQSMessageContext[F, T]],
client: SqsAsyncClient,
queueUrl: String
)(implicit F: Async[F])
extends MessageBatch[F, T] {

override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[Unit] =
F.fromCompletableFuture {
F.delay {
client.deleteMessageBatch(
DeleteMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(payload.map { m =>
DeleteMessageBatchRequestEntry
.builder()
.receiptHandle(m.receiptHandle)
.id(m.messageId)
.build()
}.asJava)
.build()
)
}
}.void

override def nackAll: F[Unit] = F.fromCompletableFuture {
F.delay {
val req = ChangeMessageVisibilityBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(
payload.map { m =>
ChangeMessageVisibilityBatchRequestEntry
.builder()
.id(m.messageId)
.receiptHandle(m.receiptHandle)
.visibilityTimeout(0)
.build()
}.asJava
)
.build()
client.changeMessageVisibilityBatch(
req
)
}
}.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private class SQSMessageContext[F[_], T](
val rawPayload: String,
val enqueuedAt: Instant,
val metadata: Map[String, String],
receiptHandle: String,
val receiptHandle: String,
val messageId: String,
lockTTL: Int,
queueName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.effect.syntax.concurrent._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest}
Expand All @@ -42,6 +42,9 @@ private class SQSPuller[F[_], T](
extends UnsealedQueuePuller[F, T] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
pullInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, T]]]

private def pullInternal(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[SQSMessageContext[F, T]]] =
F.fromCompletableFuture {
F.delay {
// visibility timeout is at queue creation time
Expand Down Expand Up @@ -88,6 +91,9 @@ private class SQSPuller[F[_], T](
)
}
}
}.widen[Chunk[MessageContext[F, T]]]
.adaptError(makePullQueueException(_, queueName))
}.adaptError(makePullQueueException(_, queueName))

override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] =
pullInternal(batchSize, waitingTime)
.map(new SQSMessageBatch[F, T](_, client, queueUrl))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.azure.servicebus

import cats.effect.Async
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Message, MessageBatch}
import fs2.Chunk

private class ServiceBusMessageBatch[F[_], T](
payload: Chunk[ServiceBusMessageContext[F, T]],
receiver: ServiceBusReceiverClient
)(implicit F: Async[F])
extends MessageBatch[F, T] {
override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[Unit] = F.blocking {
payload.foreach(mCtx => receiver.complete(mCtx.underlying))
}

override def nackAll: F[Unit] = F.blocking {
payload.foreach(mCtx => receiver.abandon(mCtx.underlying))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.ServiceBusReceiverClient
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import fs2.Chunk

import java.time.Duration
Expand All @@ -37,7 +37,11 @@ private class ServiceBusPuller[F[_], Data](
deserializer: Deserializer[Data])
extends UnsealedQueuePuller[F, Data] {

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = F
override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] =
pullBatchInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, Data]]]

private def pullBatchInternal(batchSize: Int, waitingTime: FiniteDuration)
: F[Chunk[ServiceBusMessageContext[F, Data]]] = F
.blocking {
Chunk
.iterator(receiver.receiveMessages(batchSize, Duration.ofMillis(waitingTime.toMillis)).iterator().asScala)
Expand All @@ -52,7 +56,8 @@ private class ServiceBusPuller[F[_], Data](
}
}
}
.widen[Chunk[MessageContext[F, Data]]]
.adaptError(makePullQueueException(_, queueName))

override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, Data]] =
pullBatchInternal(batchSize, waitingTime).map(payload => new ServiceBusMessageBatch[F, Data](payload, receiver))
}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ lazy val core: CrossProject = crossProject(JVMPlatform)
name := "fs2-queues-core",
// TODO: Remove once next version is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink")
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePuller.pullMessageBatch")
),
libraryDependencies ++= List(
"co.fs2" %%% "fs2-core" % Versions.fs2
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/com/commercetools/queue/MessageBatch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

import fs2.Chunk

/**
* Interface to interact with the message received from a queue
* as a single batch allowing user to ack/nack all messages in a single
* call if the underlying implementation supports for it.
*
* For implementations that do not support batched acknowledging both
* `ackAll` and `nackAll` methods do not guarantee atomicity and will
* fallback to using per-message calls.
*/
trait MessageBatch[F[_], T] {
mladens marked this conversation as resolved.
Show resolved Hide resolved
def messages: Chunk[Message[F, T]]

/**
* Acknowledges all the messages in the chunk.
*/
def ackAll: F[Unit]

/**
* Mark all messages from the chunk as non acknowledged.
*/
def nackAll: F[Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ sealed trait QueuePuller[F[_], T] {
* methods.
*/
def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]]
def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]]
mladens marked this conversation as resolved.
Show resolved Hide resolved

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ sealed abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) {
Stream.repeatEval(puller.pullBatch(batchSize, waitingTime)).unchunks
}

final def messageBatches(batchSize: Int, waitingTime: FiniteDuration): Stream[F, MessageBatch[F, T]] =
mladens marked this conversation as resolved.
Show resolved Hide resolved
Stream.resource(puller).flatMap { puller =>
Stream.repeatEval(puller.pullMessageBatch(batchSize, waitingTime))
}

/**
* Processes the messages with the provided processing function.
* The messages are automatically ack'ed on success and nack'ed on error,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.gcp.pubsub

import cats.effect.Async
import cats.implicits.toFunctorOps
import com.commercetools.queue.{Message, MessageBatch}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.{AcknowledgeRequest, ModifyAckDeadlineRequest, SubscriptionName}
import fs2.Chunk

private class PubSubMessageBatch[F[_], T](
payload: Chunk[PubSubMessageContext[F, T]],
subscriptionName: SubscriptionName,
subscriber: SubscriberStub
)(implicit F: Async[F])
extends MessageBatch[F, T] {
override def messages: Chunk[Message[F, T]] = payload

override def ackAll: F[Unit] =
wrapFuture(
F.delay(
subscriber
.acknowledgeCallable()
.futureCall(
AcknowledgeRequest
.newBuilder()
.setSubscription(subscriptionName.toString)
.addAllAckIds(payload.map(_.underlying.getAckId).asJava)
.build()))).void

override def nackAll: F[Unit] = wrapFuture(
F.delay(
subscriber
.modifyAckDeadlineCallable()
.futureCall(
ModifyAckDeadlineRequest
.newBuilder()
.setSubscription(subscriptionName.toString)
.setAckDeadlineSeconds(0)
.addAllAckIds(payload.map(_.underlying.getAckId).asJava)
.build()))).void
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
private class PubSubMessageContext[F[_], T](
subscriber: SubscriberStub,
subscriptionName: SubscriptionName,
underlying: ReceivedMessage,
val underlying: ReceivedMessage,
lockDurationSeconds: Int,
val payload: F[T],
queueName: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.commercetools.queue.gcp.pubsub
import cats.effect.Async
import cats.effect.syntax.concurrent._
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller}
import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.retrying.RetrySettings
import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException}
Expand Down Expand Up @@ -48,6 +48,9 @@ private class PubSubPuller[F[_], T](
.withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
pullBatchInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, T]]]

private def pullBatchInternal(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[PubSubMessageContext[F, T]]] =
wrapFuture(F.delay {
subscriber
.pullCallable()
Expand Down Expand Up @@ -100,7 +103,9 @@ private class PubSubPuller[F[_], T](
.map(new PubSubMessageContext(subscriber, subscriptionName, msg, lockTTLSeconds, _, queueName))
}
}
.widen[Chunk[MessageContext[F, T]]]
.adaptError(makePullQueueException(_, queueName))

override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] =
pullBatchInternal(batchSize, waitingTime).map(payload =>
new PubSubMessageBatch[F, T](payload, subscriptionName, subscriber))
}
Loading
Loading