From 979a68ac3cb5c4018fb6fccc7b0ef82ae34cab75 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 17 Sep 2024 12:34:55 +0200 Subject: [PATCH] More extensive specs --- .../queue/sqs/SqsClientSuite.scala | 33 ++- azure/service-bus/integration/README.md | 9 + .../servicebus/ServiceBusClientSuite.scala | 37 ++++ build.sbt | 14 +- .../commercetools/queue/QueuePublisher.scala | 2 +- .../queue/pubsub/PubSubClientSuite.scala | 22 +- .../queue/gcp/pubsub/PubSubPublisher.scala | 8 +- .../queue/gcp/pubsub/PubSubStatistics.scala | 2 +- .../testkit/QueueAdministrationSuite.scala | 40 ++++ .../queue/testkit/QueueClientSuite.scala | 174 ++++------------ .../queue/testkit/QueuePublisherSuite.scala | 48 +++++ .../queue/testkit/QueueStatisticsSuite.scala | 82 ++++++++ .../queue/testkit/QueueSubscriberSuite.scala | 197 ++++++++++++++++++ 13 files changed, 517 insertions(+), 151 deletions(-) create mode 100644 azure/service-bus/integration/README.md create mode 100644 azure/service-bus/integration/src/test/scala/com/commercetools/queue/servicebus/ServiceBusClientSuite.scala create mode 100644 testkit/src/main/scala/com/commercetools/queue/testkit/QueueAdministrationSuite.scala create mode 100644 testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala create mode 100644 testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala create mode 100644 testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala diff --git a/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala b/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala index 5f8d0a0..d90a9c1 100644 --- a/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala +++ b/aws/sqs/integration/src/test/scala/com/commercetools/queue/sqs/SqsClientSuite.scala @@ -17,20 +17,43 @@ package com.commercetools.queue.sqs import cats.effect.{IO, Resource} +import cats.syntax.all._ import com.commercetools.queue.QueueClient import com.commercetools.queue.aws.sqs.SQSClient import com.commercetools.queue.testkit.QueueClientSuite -import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider +import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider} import software.amazon.awssdk.regions.Region import java.net.URI +import scala.jdk.CollectionConverters.CollectionHasAsScala class SqsClientSuite extends QueueClientSuite { + private def config = + booleanOrDefault("AWS_SQS_USE_EMULATOR", default = true).ifM( + ifTrue = + IO.pure((Region.EU_WEST_1, AnonymousCredentialsProvider.create(), Some(new URI("http://localhost:4566")))), + ifFalse = for { + awsRegion <- string("AWS_SQS_REGION") + region <- Region + .regions() + .asScala + .find(_.id == awsRegion) + .liftTo[IO](new IllegalArgumentException(s"Cannot find any suitable AWS region from $awsRegion value!")) + credentials <- (string("AWS_SQS_ACCESS_KEY"), string("AWS_SQS_ACCESS_SECRET")).mapN((accessKey, accessSecret) => + StaticCredentialsProvider.create( + AwsBasicCredentials.create(accessKey, accessSecret) + )) + } yield (region, credentials, None) + ) + override def client: Resource[IO, QueueClient[IO]] = - SQSClient[IO]( - Region.EU_WEST_1, - AnonymousCredentialsProvider.create(), - endpoint = Some(new URI("http://localhost:4566"))) + config.toResource.flatMap { case (region, credentials, endpoint) => + SQSClient[IO]( + region, + credentials, + endpoint = endpoint + ) + } } diff --git a/azure/service-bus/integration/README.md b/azure/service-bus/integration/README.md new file mode 100644 index 0000000..1f84413 --- /dev/null +++ b/azure/service-bus/integration/README.md @@ -0,0 +1,9 @@ +# How to run tests + +Tests are using [AzureCliCredential](https://learn.microsoft.com/en-us/java/api/com.azure.identity.azureclicredential?view=azure-java-stable). +Make sure to be assigned with the [Azure Service Bus Data Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/integration#azure-service-bus-data-owner) role. + +Steps: +- `az login` +- `export AZURE_SERVICEBUS_HOSTNAME=` +- `sbt "project azureServiceBusIt" test` diff --git a/azure/service-bus/integration/src/test/scala/com/commercetools/queue/servicebus/ServiceBusClientSuite.scala b/azure/service-bus/integration/src/test/scala/com/commercetools/queue/servicebus/ServiceBusClientSuite.scala new file mode 100644 index 0000000..8a3c888 --- /dev/null +++ b/azure/service-bus/integration/src/test/scala/com/commercetools/queue/servicebus/ServiceBusClientSuite.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.servicebus + +import cats.effect.{IO, Resource} +import com.azure.identity.AzureCliCredentialBuilder +import com.commercetools.queue.QueueClient +import com.commercetools.queue.azure.servicebus.ServiceBusClient +import com.commercetools.queue.testkit.QueueClientSuite + +class ServiceBusClientSuite extends QueueClientSuite { + + private def config = string("AZURE_SERVICEBUS_HOSTNAME") + override val inFlightMessagesStatsSupported: Boolean = false // not supported + + override def client: Resource[IO, QueueClient[IO]] = + config.toResource.flatMap { namespace => + ServiceBusClient[IO]( + namespace = namespace, + credentials = new AzureCliCredentialBuilder().build() + ) + } +} diff --git a/build.sbt b/build.sbt index 4fcd7cf..fde59e9 100644 --- a/build.sbt +++ b/build.sbt @@ -84,7 +84,8 @@ lazy val testkit = crossProject(JVMPlatform) name := "fs2-queues-testkit", libraryDependencies ++= List( "org.scalameta" %%% "munit" % Versions.munit, - "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect + "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect, + "org.slf4j" % "slf4j-simple" % "2.0.16" ) ) .dependsOn(core) @@ -142,6 +143,17 @@ lazy val azureServiceBus = crossProject(JVMPlatform) ) .dependsOn(core, testkit % Test) +lazy val azureServiceBusIt = project + .in(file("azure/service-bus/integration")) + .enablePlugins(NoPublishPlugin) + .settings(commonSettings) + .settings( + libraryDependencies ++= List( + "com.azure" % "azure-identity" % "1.11.1" + ) + ) + .dependsOn(azureServiceBus.jvm % Test, testkit.jvm % Test) + lazy val awsSQS = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("aws/sqs")) diff --git a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala index dae5db1..d6c610a 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala @@ -30,7 +30,7 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab def queueName: String /** - * Returns a way to bush messages into the queue. + * Returns a way to push messages into the queue. * This is a low-level construct, mainly aiming at integrating existing * code bases that require to push explicitly. * diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index 6fbff4a..5d40f56 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -24,9 +24,27 @@ import com.google.api.gax.core.NoCredentialsProvider class PubSubClientSuite extends QueueClientSuite { - override val queueUpdateSupported = false + private def isEmulatorDefault = true + private def isEmulatorEnvVar = "GCP_PUBSUB_USE_EMULATOR" + + override val queueUpdateSupported: Boolean = false // not supported + override val inFlightMessagesStatsSupported: Boolean = false // not supported + override val delayedMessagesStatsSupported: Boolean = false // not supported + override val messagesStatsSupported: Boolean = // // not supported in the emulator + !sys.env.get(isEmulatorEnvVar).map(_.toBoolean).getOrElse(isEmulatorDefault) + + private def config = + booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM( + ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))), + ifFalse = for { + project <- string("GCP_PUBSUB_PROJECT") + credentials = NoCredentialsProvider.create() // TODO + } yield (project, credentials, None) + ) override def client: Resource[IO, QueueClient[IO]] = - PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("localhost:8042")) + config.toResource.flatMap { case (project, credentials, endpoint) => + PubSubClient(project, credentials, endpoint = endpoint) + } } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala index 5cef2b0..1b17d61 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala @@ -39,10 +39,10 @@ private class PubSubPublisher[F[_], T]( .fromAutoCloseable { F.blocking { val builder = - PublisherStubSettings.newBuilder() - builder - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + PublisherStubSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) GrpcPublisherStub.create(builder.build()) } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala index 090d71b..00c75c3 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubStatistics.scala @@ -40,7 +40,7 @@ private class PubSubStatistics[F[_]]( .newBuilder() .setCredentialsProvider(credentials) .setTransportChannelProvider(channelProvider) - endpoint.foreach(builder.setEndpoint(_)) + endpoint.foreach(builder.setEndpoint) GrpcMetricServiceStub.create(builder.build()) } } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueAdministrationSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueAdministrationSuite.scala new file mode 100644 index 0000000..0127cca --- /dev/null +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueAdministrationSuite.scala @@ -0,0 +1,40 @@ +package com.commercetools.queue.testkit + +import com.commercetools.queue.QueueConfiguration +import munit.CatsEffectSuite + +import scala.concurrent.duration._ + +/** + * This suite tests that the features of a [[com.commercetools.queue.QueueAdministration QueueAdministration]] are properly + * implemented for a concrete client. + */ +trait QueueAdministrationSuite extends CatsEffectSuite { self: QueueClientSuite => + + withQueue.test("existing queue should be indicated as such") { queueName => + val client = clientFixture() + assertIO(client.administration.exists(queueName), true) + } + + test("non existing queue should be indicated as such") { + val client = clientFixture() + assertIO(client.administration.exists("not-existing"), false) + } + + withQueue.test("get configuration") { queueName => + val admin = clientFixture().administration + assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL)) + } + + withQueue.test("configuration can be updated") { queueName => + assume(queueUpdateSupported, "The test environment does not support queue update") + val admin = clientFixture().administration + for { + _ <- admin.update(queueName, Some(originalMessageTTL + 1.minute), Some(originalLockTTL + 10.seconds)) + _ <- assertIO( + admin.configuration(queueName), + QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds)) + } yield () + } + +} diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala index 6acef18..ca10029 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueClientSuite.scala @@ -1,11 +1,8 @@ package com.commercetools.queue.testkit -import cats.effect.std.Random -import cats.effect.{IO, Ref, Resource} -import cats.implicits.{catsSyntaxApplicativeId, catsSyntaxOptionId} -import com.commercetools.queue.{Decision, Message, QueueClient, QueueConfiguration} -import fs2.{Chunk, Stream} -import munit.CatsEffectSuite +import cats.effect.std.{Env, Random} +import cats.effect.{IO, Resource, SyncIO} +import com.commercetools.queue.QueueClient import scala.concurrent.duration._ @@ -14,21 +11,36 @@ import scala.concurrent.duration._ * implemented for a concrete client. * This is used in integration tests for the various implemented queue providers. */ -abstract class QueueClientSuite extends CatsEffectSuite { - +abstract class QueueClientSuite + extends QueueAdministrationSuite + with QueueStatisticsSuite + with QueuePublisherSuite + with QueueSubscriberSuite { + def optString(varName: String): IO[Option[String]] = + Env[IO].get(varName) + def string(varName: String): IO[String] = + optString(varName).flatMap(_.map(IO.pure).getOrElse(IO.raiseError(new RuntimeException(s"'$varName' is required")))) + def optBoolean(varName: String): IO[Option[Boolean]] = + optString(varName).map(_.map(_.toBoolean)) + def booleanOrDefault(varName: String, default: Boolean): IO[Boolean] = + optBoolean(varName).map(_.getOrElse(default)) + + override def munitTimeout: Duration = 1.minute + + /** Override these if the given provider is not supporting these features */ val queueUpdateSupported: Boolean = true + val messagesStatsSupported: Boolean = true + val inFlightMessagesStatsSupported: Boolean = true + val delayedMessagesStatsSupported: Boolean = true + + final val originalMessageTTL: FiniteDuration = 10.minutes + final val originalLockTTL: FiniteDuration = 2.minutes /** Provide a way to acquire a queue client for the provider under test. */ def client: Resource[IO, QueueClient[IO]] - val clientFixture = ResourceSuiteLocalFixture("queue-client", client) - - val originalMessageTTL = 10.minutes - val originalLockTTL = 2.minutes - - override def munitFixtures = List(clientFixture) - - val withQueue = + final val clientFixture: Fixture[QueueClient[IO]] = ResourceSuiteLocalFixture("queue-client", client) + final lazy val withQueue: SyncIO[FunFixture[String]] = ResourceFixture( Resource.make( IO.randomUUID @@ -37,128 +49,16 @@ abstract class QueueClientSuite extends CatsEffectSuite { clientFixture().administration .create(queueName, originalMessageTTL, originalLockTTL) })(queueName => clientFixture().administration.delete(queueName))) + final override def munitFixtures: List[Fixture[QueueClient[IO]]] = List(clientFixture) - withQueue.test("published messages are received by a processor") { queueName => - for { - random <- Random.scalaUtilRandom[IO] - size <- random.nextLongBounded(30L) - messages = List - .range(0L, size) - .map(i => (i.toString, Map(s"metadata-$i-key" -> s"$i-value", s"metadata-$i-another-key" -> "another-value"))) - received <- Ref[IO].of(List.empty[(String, Map[String, String])]) - client = clientFixture() - _ <- Stream - .emits(messages) - .through(client.publish(queueName).sink(batchSize = 10)) - .merge( - client - .subscribe(queueName) - .processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => - received.update(_ :+ (msg.rawPayload, msg.metadata))) - .take(size) - ) - .compile - .drain - _ <- assertIO_(received.get.map { receivedMessages => - if (receivedMessages.size != messages.size) - fail(s"expected to receive ${messages.size} messages, got ${receivedMessages.size}") - - messages.zip(receivedMessages).forall { - case ((expectedPayload, expectedMetadata), (actualPayload, actualMetadata)) => - if (expectedPayload != actualPayload) - fail(s"expected payload '$expectedPayload', got '$actualPayload'") - else if (!metadataContains(actualMetadata, expectedMetadata)) - fail(s"expected metadata to contain '$expectedMetadata', got '$actualMetadata'") - else true - } - }.void) - } yield () - } - - withQueue.test("puller returns no messages if none is available during the configured duration") { queueName => - val client = clientFixture() - client.subscribe(queueName).puller.use { puller => - assertIO(puller.pullBatch(10, 2.seconds), Chunk.empty) - } - } - - withQueue.test("existing queue should be indicated as such") { queueName => - val client = clientFixture() - assertIO(client.administration.exists(queueName), true) - } - - test("non existing queue should be indicated as such") { - val client = clientFixture() - assertIO(client.administration.exists("not-existing"), false) - } - - withQueue.test("delayed messages should not be pulled before deadline") { queueName => - val client = clientFixture() - client.publish(queueName).pusher.use { pusher => - pusher.push("delayed message", Map("metadata-key" -> "value"), Some(2.seconds)) - } *> client.subscribe(queueName).puller.use { puller => - for { - _ <- assertIO(puller.pullBatch(1, 1.second), Chunk.empty) - _ <- IO.sleep(2.seconds) - msg <- puller - .pullBatch(1, 1.second) - .map(_.head.getOrElse(fail("expected a message, got nothing."))) - _ = assertEquals(msg.rawPayload, "delayed message") - _ = assert(metadataContains(msg.metadata, Map("metadata-key" -> "value"))) - } yield () - } - } - - withQueue.test("published messages are processed as expected") { queueName => - val client = clientFixture() - - for { - _ <- Stream - .emits(List.range(0, 5).map(i => (i.toString, Map.empty[String, String]))) - .through(client.publish(queueName).sink(batchSize = 10)) - .compile - .drain - shouldAck4 <- Ref.of[IO, Boolean](false) - res <- client - .subscribe(queueName) - .process[Int](5, 1.second, client.publish(queueName))((msg: Message[IO, String]) => - msg.rawPayload.toInt match { - // checking various scenarios, like a message that gets reenqueue'ed once and then ok'ed, - // a message dropped, a message failed and ack'ed, a message failed and initially not ack'ed, then ack'ed - case 0 => Decision.Ok(0).pure[IO] - case 1 if msg.metadata.contains("reenqueued") => Decision.Ok(1).pure[IO] - case 1 => Decision.Reenqueue(Map("reenqueued" -> "true").some, None).pure[IO] - case 2 => Decision.Drop.pure[IO] - case 3 => Decision.Fail(new Throwable("3"), ack = true).pure[IO] - case 4 => shouldAck4.getAndSet(true).map(shouldAck => Decision.Fail(new Throwable("4"), ack = shouldAck)) - }) - .take(5) - .compile - .toList - _ = assertEquals( - res.map { - case Right(i) => i - case Left(t) => t.getMessage.toInt - }.sorted, // not checking the ordering, since reenqueue may influence that slightly - List(0, 1, 3, 4, 4) - ) - } yield () - } - - withQueue.test("configuration can be updated") { queueName => - assume(queueUpdateSupported, "The test environment does not support queue update") - val client = clientFixture() - val admin = client.administration - for { - _ <- assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL)) - _ <- admin.update(queueName, Some(originalMessageTTL + 1.minute), Some(originalLockTTL + 10.seconds)) - _ <- assertIO( - admin.configuration(queueName), - QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds)) - } yield () - } + final def randomMessages(n: Int): IO[List[(String, Map[String, String])]] = for { + random <- Random.scalaUtilRandom[IO] + size <- random.nextIntBounded(n) + } yield messages(size) - private def metadataContains(actual: Map[String, String], expected: Map[String, String]) = - expected.forall { case (k, v) => actual.get(k).contains(v) } + final def messages(n: Int): List[(String, Map[String, String])] = + List + .range(0, n) + .map(i => (i.toString, Map(s"metadata-$i-key" -> s"$i-value", s"metadata-$i-another-key" -> "another-value"))) } diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala new file mode 100644 index 0000000..68d95c4 --- /dev/null +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala @@ -0,0 +1,48 @@ +package com.commercetools.queue.testkit + +import munit.CatsEffectSuite +import fs2.Stream +import cats.syntax.all._ + +import scala.concurrent.duration.DurationInt + +/** + * This suite tests that the features of a [[com.commercetools.queue.QueuePublisher QueuePublisher]] are properly + * implemented for a concrete client. + */ +trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite => + + withQueue.test("sink publishes all the messages") { queueName => + assume(messagesStatsSupported) + val client = clientFixture() + for { + msgs <- randomMessages(30) + _ <- Stream + .emits(msgs) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + messagesInQueue <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages) + _ = assertEquals(messagesInQueue, msgs.size) + } yield () + } + + withQueue.test("sink publishes all the messages with a delay") { queueName => + assume(messagesStatsSupported && delayedMessagesStatsSupported) + val client = clientFixture() + for { + msgs <- randomMessages(30) + _ <- Stream + .emits(msgs) + .through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some)) + .compile + .drain + statsFetcher = client.statistics(queueName).fetcher + messagesInQueue <- statsFetcher.use(_.fetch).map(_.messages) + delayedMessages <- statsFetcher.use(_.fetch).map(_.delayed) + _ = assertEquals(delayedMessages, msgs.size.some, "delayed messages are not what we expect") + _ = assertEquals(messagesInQueue, 0, "the queue is not empty") + } yield () + } + +} diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala new file mode 100644 index 0000000..0c62044 --- /dev/null +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueStatisticsSuite.scala @@ -0,0 +1,82 @@ +package com.commercetools.queue.testkit + +import cats.effect.IO +import cats.syntax.all._ +import fs2.Stream +import munit.CatsEffectSuite + +import scala.concurrent.duration.DurationInt + +/** + * This suite tests that the features of a [[com.commercetools.queue.QueueStatistics QueueStatistics]] are properly + * implemented for a concrete client. + */ +trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite => + + withQueue.test("stats should report queued messages") { queueName => + assume(messagesStatsSupported) + for { + messages <- randomMessages(30) + client = clientFixture() + _ <- Stream + .emits(messages) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + statsFetcher = client.statistics(queueName).fetcher + _ <- statsFetcher + .use(_.fetch) + .map(stats => assertEquals(stats.messages, messages.size, "Queue should be full")) + _ <- client + .subscribe(queueName) + .processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(_ => IO.unit) + .take(messages.size.toLong) + .compile + .drain + _ <- statsFetcher + .use(_.fetch) + .map(stats => assertEquals(stats.messages, 0, "Queue should be empty")) + } yield () + } + + withQueue.test("stats should report inflight messages") { queueName => + assume(messagesStatsSupported && inFlightMessagesStatsSupported) + for { + messages <- randomMessages(30) + client = clientFixture() + _ <- Stream + .emits(messages) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + _ <- (client.subscribe(queueName).puller, client.statistics(queueName).fetcher).tupled + .use { case (puller, statsFetcher) => + for { + chunk <- puller.pullBatch(10, 10.seconds) + stats <- statsFetcher.fetch + } yield assertEquals(stats.inflight, chunk.size.some, "Inflight stats doesn't match pulled messages") + } + } yield () + } + + withQueue.test("stats should report delayed messages") { queueName => + assume(messagesStatsSupported && delayedMessagesStatsSupported) + for { + messages <- randomMessages(30) + client = clientFixture() + _ <- Stream + .emits(messages) // putting a really long delay so that the test can pass even in slow envs + .through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some)) + .compile + .drain + _ <- client + .statistics(queueName) + .fetcher + .use { statsFetcher => + statsFetcher.fetch.map(stats => + assertEquals(stats.delayed, messages.size.some, "Delayed stats doesn't match pulled messages")) + } + } yield () + } + +} diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala new file mode 100644 index 0000000..8f182f4 --- /dev/null +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala @@ -0,0 +1,197 @@ +package com.commercetools.queue.testkit + +import cats.effect.{IO, Ref} +import cats.syntax.all._ +import com.commercetools.queue.{Decision, Message} +import fs2.{Chunk, Stream} +import munit.CatsEffectSuite + +import scala.concurrent.duration._ + +/** + * This suite tests that the features of a [[com.commercetools.queue.QueueSubscriber QueueSubscriber]] are properly + * implemented for a concrete client. + */ +trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite => + + withQueue.test("puller returns no messages if none is available during the configured duration") { queueName => + val client = clientFixture() + client.subscribe(queueName).puller.use { puller => + assertIO(puller.pullBatch(10, 2.seconds), Chunk.empty) + } + } + + withQueue.test("puller pulls") { queueName => + for { + messages <- randomMessages(30) + client = clientFixture() + _ <- Stream + .emits(messages) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + n <- client + .subscribe(queueName) + .puller + .use( + _.pullBatch(1, 30.seconds) + .as(1) + .replicateA(messages.size) + .map(_.sum)) + } yield assertEquals(n, messages.size, "pulled messages are not as expected") + } + + withQueue.test("puller pulls in batches") { queueName => + val msgNum = 30 + val batchSize = 10 + val expectedBatches = 3 + val client = clientFixture() + for { + _ <- Stream + .emits(messages(msgNum)) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + n <- client + .subscribe(queueName) + .puller + .use( + _.pullBatch(batchSize, 30.seconds) + .map(_.size) + .replicateA(expectedBatches) + .map(_.sum)) + } yield assertEquals(n, msgNum, "pulled batches are not containing all the messages") + } + + withQueue.test("delayed messages should not be pulled before deadline") { queueName => + val client = clientFixture() + client.publish(queueName).pusher.use { pusher => + pusher.push("delayed message", Map("metadata-key" -> "value"), Some(2.seconds)) + } *> client.subscribe(queueName).puller.use { puller => + for { + _ <- assertIO(puller.pullBatch(1, 1.second), Chunk.empty) + _ <- IO.sleep(2.seconds) + msg <- puller + .pullBatch(1, 10.second) // waiting 10 sec, some cloud provider is really slow in non-premium plans + .map(_.head.getOrElse(fail("expected a message, got nothing."))) + _ = assertEquals(msg.rawPayload, "delayed message") + _ = assert(metadataContains(msg.metadata, Map("metadata-key" -> "value"))) + } yield () + } + } + + withQueue.test("processWithAutoAck receives and acks all the messages") { queueName => + for { + messages <- randomMessages(30) + received <- Ref[IO].of(List.empty[(String, Map[String, String])]) + client = clientFixture() + _ <- Stream + .emits(messages) + .through(client.publish(queueName).sink(batchSize = 10)) + .merge( + client + .subscribe(queueName) + .processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => + received.update(_ :+ (msg.rawPayload, msg.metadata))) + .take(messages.size.toLong) + ) + .compile + .drain + _ <- assertIO_(received.get.map { receivedMessages => + if (receivedMessages.size != messages.size) + fail(s"expected to receive ${messages.size} messages, got ${receivedMessages.size}") + + messages.zip(receivedMessages).forall { + case ((expectedPayload, expectedMetadata), (actualPayload, actualMetadata)) => + if (expectedPayload != actualPayload) + fail(s"expected payload '$expectedPayload', got '$actualPayload'") + else if (!metadataContains(actualMetadata, expectedMetadata)) + fail(s"expected metadata to contain '$expectedMetadata', got '$actualMetadata'") + else true + } + }.void) + _ <- + if (messagesStatsSupported) + assertIO( + client.statistics(queueName).fetcher.use(_.fetch).map(_.messages), + 0, + "not all the messages got acked") + else IO.unit + } yield () + } + + withQueue.test("attemptProcessWithAutoAck acks/nacks accordingly") { queueName => + for { + toBeAckedRef <- Ref[IO].of(Set.empty[String]) + toBeNackedRef <- Ref[IO].of(Set.empty[String]) + client = clientFixture() + _ <- Stream + .emits(messages(10)) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + _ <- client + .subscribe(queueName) + .attemptProcessWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => + if (msg.rawPayload.toInt % 2 == 0) toBeAckedRef.update(_ + msg.rawPayload) + else toBeNackedRef.update(_ + msg.rawPayload) >> IO.raiseError(new RuntimeException("failed"))) + .take(10L) + .compile + .drain + toBeAcked <- toBeAckedRef.get + toBeNacked <- toBeNackedRef.get + _ = assertEquals(toBeAcked, Set("0", "2", "4", "6", "8")) + _ = assertEquals(toBeNacked, Set("1", "3", "5", "7", "9")) + _ <- + if (messagesStatsSupported) + assertIOBoolean( + client + .statistics(queueName) + .fetcher + .use(_.fetch) + .map(stats => stats.messages + stats.inflight.getOrElse(0) == 5), + "not all the expected messages got nacked" + ) + else IO.unit + } yield () + } + + withQueue.test("process respects the decision from the handler") { queueName => + val client = clientFixture() + for { + _ <- Stream + .emits(List.range(0, 5).map(i => (i.toString, Map.empty[String, String]))) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + shouldAck4 <- Ref.of[IO, Boolean](false) + res <- client + .subscribe(queueName) + .process[Int](5, 1.second, client.publish(queueName))((msg: Message[IO, String]) => + msg.rawPayload.toInt match { + // checking various scenarios, like a message that gets reenqueue'ed once and then ok'ed, + // a message dropped, a message failed and ack'ed, a message failed and initially not ack'ed, then ack'ed + case 0 => Decision.Ok(0).pure[IO] + case 1 if msg.metadata.contains("reenqueued") => Decision.Ok(1).pure[IO] + case 1 => Decision.Reenqueue(Map("reenqueued" -> "true").some, None).pure[IO] + case 2 => Decision.Drop.pure[IO] + case 3 => Decision.Fail(new Throwable("3"), ack = true).pure[IO] + case 4 => shouldAck4.getAndSet(true).map(shouldAck => Decision.Fail(new Throwable("4"), ack = shouldAck)) + }) + .take(5) + .compile + .toList + _ = assertEquals( + res.map { + case Right(i) => i + case Left(t) => t.getMessage.toInt + }.sorted, // not checking the ordering, since reenqueue may influence that slightly + List(0, 1, 3, 4, 4) + ) + } yield () + } + + private def metadataContains(actual: Map[String, String], expected: Map[String, String]) = + expected.forall { case (k, v) => actual.get(k).contains(v) } + +}