diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala index 5f3017d..372a092 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSMessageBatch.scala @@ -33,43 +33,47 @@ private class SQSMessageBatch[F[_], T]( override def messages: Chunk[Message[F, T]] = payload override def ackAll: F[Unit] = + F.unlessA(payload.isEmpty) { + F.fromCompletableFuture { + F.delay { + client.deleteMessageBatch( + DeleteMessageBatchRequest + .builder() + .queueUrl(queueUrl) + .entries(payload.map { m => + DeleteMessageBatchRequestEntry + .builder() + .receiptHandle(m.receiptHandle) + .id(m.messageId) + .build() + }.asJava) + .build() + ) + } + } + }.void + + override def nackAll: F[Unit] = F.unlessA(payload.isEmpty) { F.fromCompletableFuture { F.delay { - client.deleteMessageBatch( - DeleteMessageBatchRequest - .builder() - .queueUrl(queueUrl) - .entries(payload.map { m => - DeleteMessageBatchRequestEntry + val req = ChangeMessageVisibilityBatchRequest + .builder() + .queueUrl(queueUrl) + .entries( + payload.map { m => + ChangeMessageVisibilityBatchRequestEntry .builder() - .receiptHandle(m.receiptHandle) .id(m.messageId) + .receiptHandle(m.receiptHandle) + .visibilityTimeout(0) .build() - }.asJava) - .build() + }.asJava + ) + .build() + client.changeMessageVisibilityBatch( + req ) } }.void - - override def nackAll: F[Unit] = F.fromCompletableFuture { - F.delay { - val req = ChangeMessageVisibilityBatchRequest - .builder() - .queueUrl(queueUrl) - .entries( - payload.map { m => - ChangeMessageVisibilityBatchRequestEntry - .builder() - .id(m.messageId) - .receiptHandle(m.receiptHandle) - .visibilityTimeout(0) - .build() - }.asJava - ) - .build() - client.changeMessageVisibilityBatch( - req - ) - } - }.void + } } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala index 2a6a1ef..06808d4 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala @@ -168,6 +168,21 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite => } } + withQueue.test("messageBatch noop ack/nack on empty batches") { queueName => + val client = clientFixture() + client.subscribe(queueName).puller.use { puller => + for { + msgBatch <- puller.pullMessageBatch(10, waitingTime) + _ = assertEquals(msgBatch.messages.size, 0) + _ <- msgBatch.nackAll + _ <- msgBatch.ackAll + _ <- assertIOBoolean( + puller.pullMessageBatch(10, waitingTime).map(_.messages.isEmpty) + ) + } yield () + } + } + withQueue.test("process respects the decision from the handler") { queueName => val client = clientFixture() for {