diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala new file mode 100644 index 0000000..5f3017d --- /dev/null +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.aws.sqs + +import cats.effect.Async +import cats.implicits.toFunctorOps +import com.commercetools.queue.{Message, UnsealedMessageBatch} +import fs2.Chunk +import software.amazon.awssdk.services.sqs.SqsAsyncClient +import software.amazon.awssdk.services.sqs.model.{ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry} + +private class SQSMessageBatch[F[_], T]( + payload: Chunk[SQSMessageContext[F, T]], + client: SqsAsyncClient, + queueUrl: String +)(implicit F: Async[F]) + extends UnsealedMessageBatch[F, T] { + + override def messages: Chunk[Message[F, T]] = payload + + override def ackAll: F[Unit] = + F.fromCompletableFuture { + F.delay { + client.deleteMessageBatch( + DeleteMessageBatchRequest + .builder() + .queueUrl(queueUrl) + .entries(payload.map { m => + DeleteMessageBatchRequestEntry + .builder() + .receiptHandle(m.receiptHandle) + .id(m.messageId) + .build() + }.asJava) + .build() + ) + } + }.void + + override def nackAll: F[Unit] = F.fromCompletableFuture { + F.delay { + val req = ChangeMessageVisibilityBatchRequest + .builder() + .queueUrl(queueUrl) + .entries( + payload.map { m => + ChangeMessageVisibilityBatchRequestEntry + .builder() + .id(m.messageId) + .receiptHandle(m.receiptHandle) + .visibilityTimeout(0) + .build() + }.asJava + ) + .build() + client.changeMessageVisibilityBatch( + req + ) + } + }.void +} diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala index 8dc4886..0e5c395 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageContext.scala @@ -30,7 +30,7 @@ private class SQSMessageContext[F[_], T]( val rawPayload: String, val enqueuedAt: Instant, val metadata: Map[String, String], - receiptHandle: String, + val receiptHandle: String, val messageId: String, lockTTL: Int, queueName: String, diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala index c5b90a4..ae803ec 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPuller.scala @@ -21,7 +21,7 @@ import cats.effect.syntax.concurrent._ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.monadError._ -import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller} +import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller} import fs2.Chunk import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{MessageSystemAttributeName, ReceiveMessageRequest} @@ -42,6 +42,9 @@ private class SQSPuller[F[_], T]( extends UnsealedQueuePuller[F, T] { override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = + pullInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, T]]] + + private def pullInternal(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[SQSMessageContext[F, T]]] = F.fromCompletableFuture { F.delay { // visibility timeout is at queue creation time @@ -88,6 +91,9 @@ private class SQSPuller[F[_], T]( ) } } - }.widen[Chunk[MessageContext[F, T]]] - .adaptError(makePullQueueException(_, queueName)) + }.adaptError(makePullQueueException(_, queueName)) + + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] = + pullInternal(batchSize, waitingTime) + .map(new SQSMessageBatch[F, T](_, client, queueUrl)) } diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusMessageBatch.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusMessageBatch.scala new file mode 100644 index 0000000..4224dc8 --- /dev/null +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusMessageBatch.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.azure.servicebus + +import cats.effect.Async +import com.azure.messaging.servicebus.ServiceBusReceiverClient +import com.commercetools.queue.{Message, UnsealedMessageBatch} +import fs2.Chunk + +private class ServiceBusMessageBatch[F[_], T]( + payload: Chunk[ServiceBusMessageContext[F, T]], + receiver: ServiceBusReceiverClient +)(implicit F: Async[F]) + extends UnsealedMessageBatch[F, T] { + override def messages: Chunk[Message[F, T]] = payload + + override def ackAll: F[Unit] = F.blocking { + payload.foreach(mCtx => receiver.complete(mCtx.underlying)) + } + + override def nackAll: F[Unit] = F.blocking { + payload.foreach(mCtx => receiver.abandon(mCtx.underlying)) + } +} diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala index 9e5a258..f39dfe4 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPuller.scala @@ -22,7 +22,7 @@ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.monadError._ import com.azure.messaging.servicebus.ServiceBusReceiverClient -import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller} +import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller} import fs2.Chunk import java.time.Duration @@ -37,7 +37,11 @@ private class ServiceBusPuller[F[_], Data]( deserializer: Deserializer[Data]) extends UnsealedQueuePuller[F, Data] { - override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = F + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, Data]]] = + pullBatchInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, Data]]] + + private def pullBatchInternal(batchSize: Int, waitingTime: FiniteDuration) + : F[Chunk[ServiceBusMessageContext[F, Data]]] = F .blocking { Chunk .iterator(receiver.receiveMessages(batchSize, Duration.ofMillis(waitingTime.toMillis)).iterator().asScala) @@ -52,7 +56,8 @@ private class ServiceBusPuller[F[_], Data]( } } } - .widen[Chunk[MessageContext[F, Data]]] .adaptError(makePullQueueException(_, queueName)) + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, Data]] = + pullBatchInternal(batchSize, waitingTime).map(payload => new ServiceBusMessageBatch[F, Data](payload, receiver)) } diff --git a/build.sbt b/build.sbt index 7e78ed5..ef976d2 100644 --- a/build.sbt +++ b/build.sbt @@ -55,7 +55,8 @@ lazy val core: CrossProject = crossProject(JVMPlatform) name := "fs2-queues-core", // TODO: Remove once next version is published mimaBinaryIssueFilters ++= List( - ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink") + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("com.commercetools.queue.QueuePuller.pullMessageBatch") ), libraryDependencies ++= List( "co.fs2" %%% "fs2-core" % Versions.fs2 diff --git a/core/src/main/scala/com/commercetools/queue/MessageBatch.scala b/core/src/main/scala/com/commercetools/queue/MessageBatch.scala new file mode 100644 index 0000000..33b0b99 --- /dev/null +++ b/core/src/main/scala/com/commercetools/queue/MessageBatch.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue + +import fs2.Chunk + +/** + * Interface to interact with the message received from a queue + * as a single batch allowing user to ack/nack all messages in a single + * call if the underlying implementation supports for it. + * + * For implementations that do not support batched acknowledging both + * `ackAll` and `nackAll` methods do not guarantee atomicity and will + * fallback to using per-message calls. + */ +sealed trait MessageBatch[F[_], T] { + def messages: Chunk[Message[F, T]] + + /** + * Acknowledges all the messages in the chunk. + */ + def ackAll: F[Unit] + + /** + * Mark all messages from the chunk as non acknowledged. + */ + def nackAll: F[Unit] +} + +private[queue] trait UnsealedMessageBatch[F[_], T] extends MessageBatch[F, T] diff --git a/core/src/main/scala/com/commercetools/queue/QueuePuller.scala b/core/src/main/scala/com/commercetools/queue/QueuePuller.scala index 42a48b9..b199ac8 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePuller.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePuller.scala @@ -43,6 +43,15 @@ sealed trait QueuePuller[F[_], T] { */ def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] + /** + * Pulls batch of messages with the same semantics as `pullBatch` + * with the difference in message lifecycle control. Messages pulled this + * way can only be managed (ack'ed, nack'ed) in bulk by batching acknowledgements + * if the underlying implementation supports it, otherwise it falls back to + * non-atomic, per-message management. + */ + def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] + } private[queue] trait UnsealedQueuePuller[F[_], T] extends QueuePuller[F, T] diff --git a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala index 5bfea81..eb4165c 100644 --- a/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala +++ b/core/src/main/scala/com/commercetools/queue/QueueSubscriber.scala @@ -59,6 +59,23 @@ sealed abstract class QueueSubscriber[F[_], T](implicit F: Concurrent[F]) { Stream.repeatEval(puller.pullBatch(batchSize, waitingTime)).unchunks } + /** + * The stream of messages published in the subscribed queue. + * The [[MessageBatch]] gives an interface to interact with the + * batch messages in bulk. + * + * The stream emits chunks of size `batchSize` max, and waits for + * elements during `waitingTime` before emitting a chunk. + * + * '''Note:''' message batches returned by this stream must be + * manually managed (ack'ed, nack'ed) but in contrast to `messages`, + * the entire batch must be acknowledged as a whole.. + */ + final def messageBatches(batchSize: Int, waitingTime: FiniteDuration): Stream[F, MessageBatch[F, T]] = + Stream.resource(puller).flatMap { puller => + Stream.repeatEval(puller.pullMessageBatch(batchSize, waitingTime)) + } + /** * Processes the messages with the provided processing function. * The messages are automatically ack'ed on success and nack'ed on error, diff --git a/docs/getting-started/subscribing.md b/docs/getting-started/subscribing.md index 0ff25f5..d8a9249 100644 --- a/docs/getting-started/subscribing.md +++ b/docs/getting-started/subscribing.md @@ -132,6 +132,30 @@ subscriber } ``` +For high throughput scenarios where acknowledging individual messages wouldn't be optimal, consider using `messageBatches()`. + +Batching method exposes `MessageBatch` giving user control over entire batch as a whole allowing for batched acknowledgement if the implementation supports it. + +Chunked messages can be accessed via `messages`. + +```scala mdoc:compile-only +import cats.effect.Outcome + +subscriber + .messageBatches(batchSize = 10, waitingTime = 20.seconds) + .evalMap { batch => + batch.messages.parTraverse_ { msg => + msg.payload.flatTap { payload => + IO.println(s"Received $payload") + } + }.guaranteeCase { + case Outcome.Succeeded(_) => batch.ackAll + case _ => batch.nackAll + } + } +``` + + ### `MessageContext` control flow There are three different methods that can be used to control the message lifecycle from the subscriber point of view: @@ -140,6 +164,14 @@ There are three different methods that can be used to control the message lifecy 2. `MessageContext.nack()` marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process. 3. `MessageContext.extendLock()` extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system. +### `MessageBatch` control flow + +Methods have the same semantics to `MessageContext` ones with the difference that they act on all messages from the batch at once. Whether the action is atomic across all messages depends on the underlying implementation. + +1. `MessageContext.ackAll()` acknowledges all the messages from the batch. +2. `MessageContext.nackAll()` marks all the messages from the batch as not processed. + + ## Explicit pull If you are integrating this library with an existing code base that performs explicit pulls from the queue, you can access the @:api(QueuePuller) lower level API, which exposes ways to pull batch of messages. @@ -168,7 +200,6 @@ subscriber.puller.use { queuePuller => } } } - } ``` @@ -208,6 +239,29 @@ subscriber.puller.use { queuePuller => ``` @:@ +To pull batches that can be acknowledged in batches, use `pullMessageBatch()` + +```scala mdoc:compile-only +import cats.effect.Outcome + +subscriber.puller.use { queuePuller => + + queuePuller + .pullMessageBatch(batchSize = 10, waitingTime = 20.seconds) + .flatMap { batch => + batch.messages.traverse_ { message => + message.payload.flatMap { payload => + IO.println(s"Received $payload") + }.guaranteeCase { + case Outcome.Succeeded(_) => batch.ackAll + case _ => batch.nackAll + } + } + } + +} +``` + [cats-effect-resource]: https://typelevel.org/cats-effect/docs/std/resource [cats-effect-supervisor]: https://typelevel.org/cats-effect/docs/std/supervisor [doc-deserializer]: serialization.md#data-deserializer diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageBatch.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageBatch.scala new file mode 100644 index 0000000..75586ed --- /dev/null +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageBatch.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.gcp.pubsub + +import cats.effect.Async +import cats.implicits.toFunctorOps +import com.commercetools.queue.{Message, UnsealedMessageBatch} +import com.google.cloud.pubsub.v1.stub.SubscriberStub +import com.google.pubsub.v1.{AcknowledgeRequest, ModifyAckDeadlineRequest, SubscriptionName} +import fs2.Chunk + +private class PubSubMessageBatch[F[_], T]( + payload: Chunk[PubSubMessageContext[F, T]], + subscriptionName: SubscriptionName, + subscriber: SubscriberStub +)(implicit F: Async[F]) + extends UnsealedMessageBatch[F, T] { + override def messages: Chunk[Message[F, T]] = payload + + override def ackAll: F[Unit] = + wrapFuture( + F.delay( + subscriber + .acknowledgeCallable() + .futureCall( + AcknowledgeRequest + .newBuilder() + .setSubscription(subscriptionName.toString) + .addAllAckIds(payload.map(_.underlying.getAckId).asJava) + .build()))).void + + override def nackAll: F[Unit] = wrapFuture( + F.delay( + subscriber + .modifyAckDeadlineCallable() + .futureCall( + ModifyAckDeadlineRequest + .newBuilder() + .setSubscription(subscriptionName.toString) + .setAckDeadlineSeconds(0) + .addAllAckIds(payload.map(_.underlying.getAckId).asJava) + .build()))).void +} diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala index 779e8f0..979a466 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubMessageContext.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ private class PubSubMessageContext[F[_], T]( subscriber: SubscriberStub, subscriptionName: SubscriptionName, - underlying: ReceivedMessage, + val underlying: ReceivedMessage, lockDurationSeconds: Int, val payload: F[T], queueName: String diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala index 03b9072..dd42393 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -19,7 +19,7 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.Async import cats.effect.syntax.concurrent._ import cats.syntax.all._ -import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller} +import com.commercetools.queue.{Deserializer, MessageBatch, MessageContext, UnsealedQueuePuller} import com.google.api.gax.grpc.GrpcCallContext import com.google.api.gax.retrying.RetrySettings import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException} @@ -48,6 +48,9 @@ private class PubSubPuller[F[_], T]( .withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = + pullBatchInternal(batchSize, waitingTime).widen[Chunk[MessageContext[F, T]]] + + private def pullBatchInternal(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[PubSubMessageContext[F, T]]] = wrapFuture(F.delay { subscriber .pullCallable() @@ -100,7 +103,9 @@ private class PubSubPuller[F[_], T]( .map(new PubSubMessageContext(subscriber, subscriptionName, msg, lockTTLSeconds, _, queueName)) } } - .widen[Chunk[MessageContext[F, T]]] .adaptError(makePullQueueException(_, queueName)) + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] = + pullBatchInternal(batchSize, waitingTime).map(payload => + new PubSubMessageBatch[F, T](payload, subscriptionName, subscriber)) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageBatch.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageBatch.scala new file mode 100644 index 0000000..3dca14b --- /dev/null +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringMessageBatch.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.otel4s + +import cats.effect.Temporal +import cats.effect.implicits.monadCancelOps +import com.commercetools.queue.{Message, MessageBatch, UnsealedMessageBatch} +import fs2.Chunk +import org.typelevel.otel4s.trace.Tracer + +private class MeasuringMessageBatch[F[_], T]( + underlying: MessageBatch[F, T], + metrics: QueueMetrics[F], + tracer: Tracer[F] +)(implicit F: Temporal[F]) + extends UnsealedMessageBatch[F, T] { + override def messages: Chunk[Message[F, T]] = underlying.messages + + /** + * Acknowledges all the messages in the chunk. + */ + override def ackAll: F[Unit] = tracer + .span("queue.message.batch.ack") + .surround { + underlying.ackAll + } + .guaranteeCase(metrics.ackAll) + + /** + * Mark all messages from the chunk as non acknowledged. + */ + override def nackAll: F[Unit] = tracer + .span("queue.message.batch.nack") + .surround { + underlying.nackAll + } + .guaranteeCase(metrics.nackAll) +} diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala index 2a00513..3808e95 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePuller.scala @@ -19,7 +19,7 @@ package com.commercetools.queue.otel4s import cats.effect.Temporal import cats.effect.syntax.monadCancel._ import cats.syntax.functor._ -import com.commercetools.queue.{MessageContext, QueuePuller, UnsealedQueuePuller} +import com.commercetools.queue.{MessageBatch, MessageContext, QueuePuller, UnsealedQueuePuller} import fs2.Chunk import org.typelevel.otel4s.trace.Tracer @@ -44,4 +44,14 @@ private class MeasuringQueuePuller[F[_], T]( } .guaranteeCase(metrics.receive) + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): F[MessageBatch[F, T]] = + tracer + .span("queue.pullBatch") + .surround { + underlying + .pullMessageBatch(batchSize, waitingTime) + .map(new MeasuringMessageBatch[F, T](_, metrics, tracer)) + .widen[MessageBatch[F, T]] + } + .guaranteeCase(metrics.receive) } diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala index 17b1307..c7b2d0d 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/QueueMetrics.scala @@ -28,6 +28,11 @@ private class QueueMetrics[F[_]](queueName: String, requestCounter: Counter[F, L QueueMetrics.increment(queue, QueueMetrics.receive, requestCounter) final val ack: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.ack, requestCounter) final val nack: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.nack, requestCounter) + final val ackAll: Outcome[F, Throwable, _] => F[Unit] = + QueueMetrics.increment(queue, QueueMetrics.ackAll, requestCounter) + final val nackAll: Outcome[F, Throwable, _] => F[Unit] = + QueueMetrics.increment(queue, QueueMetrics.nackAll, requestCounter) + final val extendLock: Outcome[F, Throwable, _] => F[Unit] = QueueMetrics.increment(queue, QueueMetrics.extendLock, requestCounter) final val stats: Outcome[F, Throwable, _] => F[Unit] = @@ -42,6 +47,8 @@ private object QueueMetrics { final val receive = Attribute("method", "receive") final val ack = Attribute("method", "ack") final val nack = Attribute("method", "nack") + final val ackAll = Attribute("method", "ackAll") + final val nackAll = Attribute("method", "nackAll") final val extendLock = Attribute("method", "extendLock") final val stats = Attribute("method", "stats") diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala index 374bd3d..e6fbc6c 100644 --- a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPullerSuite.scala @@ -18,8 +18,9 @@ package com.commercetools.queue.otel4s import cats.data.Chain import cats.effect.IO +import cats.syntax.foldable._ import com.commercetools.queue.testing.TestingMessageContext -import com.commercetools.queue.{MessageContext, UnsealedQueuePuller} +import com.commercetools.queue.{Message, MessageBatch, MessageContext, UnsealedMessageBatch, UnsealedQueuePuller} import fs2.Chunk import munit.CatsEffectSuite import org.typelevel.otel4s.Attribute @@ -40,6 +41,15 @@ class MeasuringPullerSuite extends CatsEffectSuite { override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, String]]] = batch + + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): IO[MessageBatch[IO, String]] = + pullBatch(batchSize, waitingTime).map { batch => + new UnsealedMessageBatch[IO, String] { + override def messages: Chunk[Message[IO, String]] = batch + override def ackAll: IO[Unit] = batch.traverse_(_.ack()) + override def nackAll: IO[Unit] = batch.traverse_(_.nack()) + } + } } test("Successful pulling results in incrementing the counter") { @@ -66,6 +76,30 @@ class MeasuringPullerSuite extends CatsEffectSuite { } } + test("Successful batch pulling results in incrementing the counter") { + NaiveCounter.create.flatMap { counter => + val measuringPuller = new MeasuringQueuePuller[IO, String]( + puller( + IO.pure( + Chunk.from( + List( + TestingMessageContext("first").noop, + TestingMessageContext("second").noop, + TestingMessageContext("third").noop, + TestingMessageContext("forth").noop)))), + new QueueMetrics(queueName, counter), + Tracer.noop + ) + for { + fiber <- measuringPuller.pullMessageBatch(0, Duration.Zero).start + _ <- assertIO(fiber.join.map(_.isSuccess), true) + _ <- assertIO( + counter.records.get, + Chain.one((1L, List(queueAttribute, QueueMetrics.receive, QueueMetrics.success)))) + } yield () + } + } + test("Failed pulling results in incrementing the counter") { NaiveCounter.create.flatMap { counter => val measuringPuller = diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala index b947f65..1bb0cf7 100644 --- a/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala @@ -17,7 +17,8 @@ package com.commercetools.queue.testing import cats.effect.IO -import com.commercetools.queue.{MessageContext, QueuePuller, UnsealedQueuePuller} +import cats.syntax.foldable._ +import com.commercetools.queue.{Message, MessageBatch, MessageContext, QueuePuller, UnsealedMessageBatch, UnsealedQueuePuller} import fs2.Chunk import scala.concurrent.duration.FiniteDuration @@ -34,6 +35,14 @@ final private class TestQueuePuller[T](queue: TestQueue[T]) extends UnsealedQueu override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = IO.sleep(waitingTime) *> queue.lockMessages(batchSize) + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): IO[MessageBatch[IO, T]] = + pullBatch(batchSize, waitingTime).map { batch => + new UnsealedMessageBatch[IO, T] { + override def messages: Chunk[Message[IO, T]] = batch + override def ackAll: IO[Unit] = batch.traverse_(_.ack()) + override def nackAll: IO[Unit] = batch.traverse_(_.nack()) + } + } } /** @@ -54,6 +63,14 @@ object TestQueuePuller { override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = onPull(batchSize, waitingTime) + override def pullMessageBatch(batchSize: Int, waitingTime: FiniteDuration): IO[MessageBatch[IO, T]] = + pullBatch(batchSize, waitingTime).map { batch => + new UnsealedMessageBatch[IO, T] { + override def messages: Chunk[Message[IO, T]] = batch + override def ackAll: IO[Unit] = batch.traverse_(_.ack()) + override def nackAll: IO[Unit] = batch.traverse_(_.nack()) + } + } } } diff --git a/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala b/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala index 29cf013..eab9e2a 100644 --- a/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala +++ b/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala @@ -35,14 +35,43 @@ class SubscriberSuite extends CatsEffectSuite { (queue, TestQueueSubscriber(queue), TestQueuePublisher(queue)) }) + def produceMessages(queue: TestQueue[String], count: Int): IO[List[TestMessage[String]]] = + List + .range(0, count) + .traverse { i => + IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) + } + .flatTap(queue.setAvailableMessages) + + queueSub.test("Successful message batch must be acked/nacked") { case (queue, subscriber, _) => + TestControl + .executeEmbed(for { + // first populate the queue + _ <- produceMessages(queue, 5) + // ack first batch, nack second + _ <- subscriber + .messageBatches(batchSize = 3, waitingTime = 40.millis) + .zipWithIndex + .evalTap { case (batch, index) => + index match { + case 1 => batch.nackAll + case _ => batch.ackAll + } + } + .take(2) + .compile + .drain + _ <- assertIO(queue.getAvailableMessages.map(_.map(_.payload)), List("3", "4")) + _ <- assertIO(queue.getLockedMessages, Nil) + _ <- assertIO(queue.getDelayedMessages, Nil) + } yield ()) + } + queueSub.test("Successful messages must be acked") { case (queue, subscriber, _) => TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) // then process messages in batches of 5 // processing is (virtually) instantaneous in this case, // so messages are immediately acked, from the mocked time PoV @@ -70,14 +99,11 @@ class SubscriberSuite extends CatsEffectSuite { TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _)) - } - _ <- queue.setAvailableMessages(messages) + messages <- produceMessages(queue, 100) result <- subscriber // take all messages in one big batch .processWithAutoAck(batchSize = 100, waitingTime = 40.millis)(m => - IO.raiseWhen(m.rawPayload == "message-43")(new Exception("BOOM")).as(m)) + IO.raiseWhen(m.rawPayload == "43")(new Exception("BOOM")).as(m)) .attempt .compile .toList @@ -99,10 +125,7 @@ class SubscriberSuite extends CatsEffectSuite { TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) result <- subscriber .process[Int](batchSize = 5, waitingTime = 40.millis, publisher)((msg: Message[IO, String]) => if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Drop) @@ -124,10 +147,7 @@ class SubscriberSuite extends CatsEffectSuite { TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) result <- subscriber .process[Int](batchSize = 5, waitingTime = 40.millis, publisher)((msg: Message[IO, String]) => if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Ok(1)) @@ -147,11 +167,7 @@ class SubscriberSuite extends CatsEffectSuite { queueSub.test("Messages consumed and requeued should follow the decision") { case (queue, subscriber, publisher) => TestControl .executeEmbed(for { - // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) opCounter <- AtomicCell[IO].of(0) result <- subscriber .process[Int](batchSize = 5, waitingTime = 40.millis, publisher)((msg: Message[IO, String]) => @@ -179,10 +195,7 @@ class SubscriberSuite extends CatsEffectSuite { TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) result <- subscriber .process[Int](batchSize = 5, waitingTime = 40.millis, publisher)((msg: Message[IO, String]) => IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = true))) @@ -204,10 +217,7 @@ class SubscriberSuite extends CatsEffectSuite { TestControl .executeEmbed(for { // first populate the queue - messages <- List.range(0, 100).traverse { i => - IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _)) - } - _ <- queue.setAvailableMessages(messages) + _ <- produceMessages(queue, 100) result <- subscriber .process[Int](batchSize = 5, waitingTime = 40.millis, publisher)((msg: Message[IO, String]) => IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = false))) diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala index 8f182f4..749961d 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala @@ -156,6 +156,29 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite => } yield () } + withQueue.test("messageBatch ackAll/nackAll marks entire batch") { queueName => + val client = clientFixture() + val totalMessages = 5 + client.subscribe(queueName).puller.use { puller => + for { + _ <- Stream + .emits(List.fill(totalMessages)((s"msg", Map.empty[String, String]))) + .through(client.publish(queueName).sink(batchSize = totalMessages)) + .compile + .drain + msgBatch <- puller.pullMessageBatch(totalMessages, 1.second) + _ = assertEquals(msgBatch.messages.size, totalMessages) + _ <- msgBatch.nackAll + msgBatchNack <- puller.pullMessageBatch(totalMessages, 1.second) + _ = assertEquals(msgBatchNack.messages.size, totalMessages) + _ <- msgBatchNack.ackAll + _ <- assertIOBoolean( + puller.pullMessageBatch(6, 1.second).map(_.messages.isEmpty) + ) + } yield () + } + } + withQueue.test("process respects the decision from the handler") { queueName => val client = clientFixture() for {