From 54b1212dcfe8ad804c7509773afdf80cc67ee9bb Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Wed, 18 Sep 2024 11:42:35 +0200 Subject: [PATCH] Add testing module The module comes in handy to write unit tests for code bases using the library. It provides different tools to test various aspects of logic using it. --- .github/workflows/ci.yml | 4 +- .../queue/aws/sqs/SQSPusher.scala | 4 +- .../azure/servicebus/ServiceBusPusher.scala | 4 +- build.sbt | 52 +++++++++---- .../com/commercetools/queue/QueuePusher.scala | 4 +- .../queue/testing/TestQueuePublisher.scala | 28 ------- .../queue/testing/TestQueuePuller.scala | 32 -------- .../queue/testing/TestQueuePusher.scala | 34 -------- .../queue/testing/TestQueueSubscriber.scala | 28 ------- .../queue/testing/TestingMessageContext.scala | 64 --------------- docs/getting-started/directory.conf | 1 + docs/getting-started/testing.md | 78 +++++++++++++++++++ .../queue/gcp/pubsub/PubSubPusher.scala | 4 +- .../queue/otel4s/MeasuringQueuePusher.scala | 4 +- .../queue/otel4s/MeasuringPusherSuite.scala | 16 +--- .../queue/testing/LockedTestMessage.scala | 0 .../queue/testing/QueueState.scala | 0 .../queue/testing/TestMessage.scala | 0 .../queue/testing/TestQueue.scala | 23 ++++++ .../queue/testing/TestQueuePublisher.scala | 50 ++++++++++++ .../queue/testing/TestQueuePuller.scala | 59 ++++++++++++++ .../queue/testing/TestQueuePusher.scala | 61 +++++++++++++++ .../queue/testing/TestQueueSubscriber.scala | 56 +++++++++++++ .../queue/testing/TestingMessageContext.scala | 59 ++++++++++++++ .../commercetools/queue/SubscriberSuite.scala | 13 +--- 25 files changed, 446 insertions(+), 232 deletions(-) delete mode 100644 core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala delete mode 100644 core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala delete mode 100644 core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala delete mode 100644 core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala delete mode 100644 core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala create mode 100644 docs/getting-started/testing.md rename {core/src/test => testing/src/main}/scala/com/commercetools/queue/testing/LockedTestMessage.scala (100%) rename {core/src/test => testing/src/main}/scala/com/commercetools/queue/testing/QueueState.scala (100%) rename {core/src/test => testing/src/main}/scala/com/commercetools/queue/testing/TestMessage.scala (100%) rename {core/src/test => testing/src/main}/scala/com/commercetools/queue/testing/TestQueue.scala (88%) create mode 100644 testing/src/main/scala/com/commercetools/queue/testing/TestQueuePublisher.scala create mode 100644 testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala create mode 100644 testing/src/main/scala/com/commercetools/queue/testing/TestQueuePusher.scala create mode 100644 testing/src/main/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala create mode 100644 testing/src/main/scala/com/commercetools/queue/testing/TestingMessageContext.scala rename {core => testing}/src/test/scala/com/commercetools/queue/SubscriberSuite.scala (95%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 081aeda..c169aeb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,11 +91,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target + run: mkdir -p testing/.jvm/target circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target testkit/.jvm/target project/target + run: tar cf targets.tar testing/.jvm/target circe/.jvm/target unidocs/target otel4s/.jvm/target aws/sqs/.jvm/target gcp/pubsub/.jvm/target core/.jvm/target azure/service-bus/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala index 80ce3ee..dbaae3d 100644 --- a/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala +++ b/aws/sqs/src/main/scala/com/commercetools/queue/aws/sqs/SQSPusher.scala @@ -20,7 +20,7 @@ import cats.effect.Async import cats.syntax.functor._ import cats.syntax.monadError._ import cats.syntax.traverse._ -import com.commercetools.queue.{QueuePusher, Serializer} +import com.commercetools.queue.{Serializer, UnsealedQueuePusher} import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest} @@ -34,7 +34,7 @@ private class SQSPusher[F[_], T]( )(implicit serializer: Serializer[T], F: Async[F]) - extends QueuePusher[F, T] { + extends UnsealedQueuePusher[F, T] { override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] = F.fromCompletableFuture { diff --git a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala index e33baaa..9beb677 100644 --- a/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala +++ b/azure/service-bus/src/main/scala/com/commercetools/queue/azure/servicebus/ServiceBusPusher.scala @@ -19,7 +19,7 @@ package com.commercetools.queue.azure.servicebus import cats.effect.Async import cats.syntax.all._ import com.azure.messaging.servicebus.{ServiceBusMessage, ServiceBusSenderClient} -import com.commercetools.queue.{QueuePusher, Serializer} +import com.commercetools.queue.{Serializer, UnsealedQueuePusher} import java.time.ZoneOffset import scala.concurrent.duration.FiniteDuration @@ -31,7 +31,7 @@ private class ServiceBusPusher[F[_], Data]( )(implicit serializer: Serializer[Data], F: Async[F]) - extends QueuePusher[F, Data] { + extends UnsealedQueuePusher[F, Data] { override def push(message: Data, metadata: Map[String, String], delay: Option[FiniteDuration]): F[Unit] = { val sbMessage = new ServiceBusMessage(serializer.serialize(message)) diff --git a/build.sbt b/build.sbt index f440036..4f87874 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,4 @@ +import sbtcrossproject.CrossProject import com.typesafe.tools.mima.core._ import laika.config.PrettyURLs @@ -20,16 +21,24 @@ ThisBuild / scalaVersion := Scala213 ThisBuild / tlSonatypeUseLegacyHost := true lazy val root = - tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, gcpPubSub, gcpPubSubIt, circe, otel4s, unidocs) + tlCrossRootProject.aggregate( + core, + testing, + azureServiceBus, + awsSQS, + awsSqsIt, + gcpPubSub, + gcpPubSubIt, + circe, + otel4s, + unidocs) ThisBuild / tlSitePublishBranch := Some("main") val commonSettings = List( libraryDependencies ++= Seq( - "co.fs2" %%% "fs2-core" % Versions.fs2, "org.scalameta" %%% "munit" % Versions.munit % Test, "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test, - "org.typelevel" %%% "cats-collections-core" % "0.9.8" % Test, "org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test ), scalacOptions += (scalaVersion.value match { @@ -38,25 +47,41 @@ val commonSettings = List( }) ) -lazy val core = crossProject(JVMPlatform) +lazy val core: CrossProject = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("core")) .settings(commonSettings) .settings( - name := "fs2-queues-core" + name := "fs2-queues-core", + libraryDependencies ++= List( + "co.fs2" %%% "fs2-core" % Versions.fs2 + ) ) +lazy val testing = crossProject(JVMPlatform) + .crossType(CrossType.Pure) + .in(file("testing")) + .settings(commonSettings) + .settings( + name := "fs2-queues-testing", + libraryDependencies ++= List( + "org.typelevel" %%% "cats-collections-core" % "0.9.8" + ), + tlVersionIntroduced := Map("3" -> "0.4.0", "2.13" -> "0.4.0") + ) + .dependsOn(core) + lazy val testkit = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("testkit")) + .enablePlugins(NoPublishPlugin) .settings(commonSettings) .settings( name := "fs2-queues-testkit", libraryDependencies ++= List( "org.scalameta" %%% "munit" % Versions.munit, "org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect - ), - tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0") + ) ) .dependsOn(core) @@ -87,7 +112,7 @@ lazy val otel4s = crossProject(JVMPlatform) "org.typelevel" %%% "otel4s-core" % "0.9.0" ) ) - .dependsOn(core % "compile->compile;test->test") + .dependsOn(core, testing % Test) lazy val circe = crossProject(JVMPlatform) .crossType(CrossType.Pure) @@ -140,8 +165,7 @@ lazy val gcpPubSub = crossProject(JVMPlatform) libraryDependencies ++= List( "com.google.cloud" % "google-cloud-pubsub" % "1.129.3", "com.google.cloud" % "google-cloud-monitoring" % "3.47.0" - ), - tlVersionIntroduced := Map("3" -> "0.2.0", "2.13" -> "0.2.0") + ) ) .dependsOn(core) @@ -169,10 +193,11 @@ lazy val docs = project laikaExtensions += PrettyURLs, tlFatalWarnings := false, libraryDependencies ++= List( - "com.azure" % "azure-identity" % "1.11.1" + "com.azure" % "azure-identity" % "1.11.1", + "org.typelevel" %% "cats-effect-testkit" % "3.5.4" ) ) - .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm, testkit.jvm) + .dependsOn(circe.jvm, azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, otel4s.jvm, testing.jvm) lazy val unidocs = project .in(file("unidocs")) @@ -185,5 +210,6 @@ lazy val unidocs = project azureServiceBus.jvm, awsSQS.jvm, gcpPubSub.jvm, - otel4s.jvm) + otel4s.jvm, + testing.jvm) ) diff --git a/core/src/main/scala/com/commercetools/queue/QueuePusher.scala b/core/src/main/scala/com/commercetools/queue/QueuePusher.scala index ad74dc8..1a0d455 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePusher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePusher.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration * A queue pusher allows for pushing elements into a queue either on at a time * or in batch. */ -trait QueuePusher[F[_], T] { +sealed trait QueuePusher[F[_], T] { /** The queue name to which this pusher is pushing. */ def queueName: String @@ -52,3 +52,5 @@ object QueuePusher { override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): F[Unit] = F.unit } } + +private[queue] trait UnsealedQueuePusher[F[_], T] extends QueuePusher[F, T] diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala deleted file mode 100644 index 7fca981..0000000 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePublisher.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.testing - -import cats.effect.{IO, Resource} -import com.commercetools.queue.{QueuePusher, UnsealedQueuePublisher} - -class TestQueuePublisher[T](queue: TestQueue[T]) extends UnsealedQueuePublisher[IO, T] { - - override val queueName = queue.name - - override def pusher: Resource[IO, QueuePusher[IO, T]] = Resource.pure(new TestQueuePusher(queue)) - -} diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala deleted file mode 100644 index 74b2e52..0000000 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePuller.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.testing - -import cats.effect.IO -import com.commercetools.queue.{MessageContext, UnsealedQueuePuller} -import fs2.Chunk - -import scala.concurrent.duration.FiniteDuration - -class TestQueuePuller[T](queue: TestQueue[T]) extends UnsealedQueuePuller[IO, T] { - - override val queueName: String = queue.name - - override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = - IO.sleep(waitingTime) *> queue.lockMessages(batchSize) - -} diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala deleted file mode 100644 index fb257f1..0000000 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueuePusher.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.testing - -import cats.effect.IO -import com.commercetools.queue.QueuePusher - -import scala.concurrent.duration.FiniteDuration - -class TestQueuePusher[T](queue: TestQueue[T]) extends QueuePusher[IO, T] { - - override val queueName: String = queue.name - - override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] = - queue.enqeueMessages((message, metadata) :: Nil, delay) - - override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] = - queue.enqeueMessages(messages, delay) - -} diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala b/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala deleted file mode 100644 index 04c3409..0000000 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.testing - -import cats.effect.{IO, Resource} -import com.commercetools.queue.{QueuePuller, UnsealedQueueSubscriber} - -class TestQueueSubscriber[T](queue: TestQueue[T]) extends UnsealedQueueSubscriber[IO, T] { - - override val queueName: String = queue.name - - override def puller: Resource[IO, QueuePuller[IO, T]] = Resource.pure(new TestQueuePuller(queue)) - -} diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala b/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala deleted file mode 100644 index 0ed7468..0000000 --- a/core/src/test/scala/com/commercetools/queue/testing/TestingMessageContext.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2024 Commercetools GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.commercetools.queue.testing - -import cats.effect.IO -import com.commercetools.queue.{MessageContext, UnsealedMessageContext} - -import java.time.Instant - -case class TestingMessageContext[T]( - payload: T, - enqueuedAt: Instant = Instant.EPOCH, - messageId: String = "", - metadata: Map[String, String] = Map.empty) { - self => - - def noop: MessageContext[IO, T] = new UnsealedMessageContext[IO, T] { - override def messageId: String = self.messageId - override def payload: IO[T] = IO.pure(self.payload) - override def rawPayload: String = self.payload.toString() - override def enqueuedAt: Instant = self.enqueuedAt - override def metadata: Map[String, String] = self.metadata - override def ack(): IO[Unit] = IO.unit - override def nack(): IO[Unit] = IO.unit - override def extendLock(): IO[Unit] = IO.unit - } - - def failing(t: Exception): MessageContext[IO, T] = new UnsealedMessageContext[IO, T] { - override def messageId: String = self.messageId - override def payload: IO[T] = IO.pure(self.payload) - override def rawPayload: String = self.payload.toString() - override def enqueuedAt: Instant = self.enqueuedAt - override def metadata: Map[String, String] = self.metadata - override def ack(): IO[Unit] = IO.raiseError(t) - override def nack(): IO[Unit] = IO.raiseError(t) - override def extendLock(): IO[Unit] = IO.raiseError(t) - } - - def canceled: MessageContext[IO, T] = new UnsealedMessageContext[IO, T] { - override def messageId: String = self.messageId - override def payload: IO[T] = IO.pure(self.payload) - override def rawPayload: String = self.payload.toString() - override def enqueuedAt: Instant = self.enqueuedAt - override def metadata: Map[String, String] = self.metadata - override def ack(): IO[Unit] = IO.canceled - override def nack(): IO[Unit] = IO.canceled - override def extendLock(): IO[Unit] = IO.canceled - } - -} diff --git a/docs/getting-started/directory.conf b/docs/getting-started/directory.conf index 3f0d235..acd7870 100644 --- a/docs/getting-started/directory.conf +++ b/docs/getting-started/directory.conf @@ -7,4 +7,5 @@ laika.navigationOrder = [ stats.md administration.md serialization.md + testing.md ] diff --git a/docs/getting-started/testing.md b/docs/getting-started/testing.md new file mode 100644 index 0000000..cad1d34 --- /dev/null +++ b/docs/getting-started/testing.md @@ -0,0 +1,78 @@ +{% nav = true %} +# Testing + +The testing module provides tools to write unit tests for code base using the `fs2-queues` library. + +```scala +libraryDependencies += "com.commercetools" %% "fs2-queues-testing" % "@VERSION@" +``` + +## Using `TestQueue` + +The @:api(com.commercetools.queue.testing.TestQueue) class implements an in-memory queue system. A `TestQueue` can be wrapped to create a: + + - puller via @:api(com.commercetools.queue.testing.TestQueuePuller) `apply` method + - pusher via @:api(com.commercetools.queue.testing.TestQueuePusher) `apply` method + - subscriber via @:api(com.commercetools.queue.testing.TestQueueSubscriber) `apply` method + - publisher via @:api(com.commercetools.queue.testing.TestQueuePublisher) `apply` method + +The `TestQueue` and the various test tools are designed to work well when used with the [cats-effect test runtime][test-runtime] + +For instance, if you want to test code that needs to publish with delays, you can use the following approach: + +```scala mdoc +import com.commercetools.queue.testing._ + +import scala.concurrent.duration._ + +import cats.effect._ +import cats.effect.testkit._ +import cats.effect.unsafe.implicits.global + +TestQueue[String](name = "test-queue", messageTTL = 10.minutes, lockTTL = 1.minute) + .flatMap { testQueue => + val puller = TestQueuePuller(testQueue) + val pusher = TestQueuePusher(testQueue) + + val program = + for { + _ <- pusher.push("one", Map.empty, Some(10.seconds)) + immediatelyPulled <- puller.pullBatch(10, Duration.Zero) + _ <- IO.sleep(11.seconds) + laterPulled <- puller.pullBatch(10, Duration.Zero) + } yield (immediatelyPulled.size, laterPulled.size) + + TestControl.executeEmbed(program) + } + .unsafeRunSync() +``` + +## Using custom effects + +You might want to use a custom effect in your unit test, instead of a full blown queue implementation. For instance, if you want to test the behavior of a failing push, you can use the following approach: + +```scala mdoc:crash +val pusher = TestQueuePusher.fromPush[String]((_, _, _) => IO.raiseError(new Exception("BOOM!"))) + +pusher.push("test message", Map.empty, None).unsafeRunSync() +``` + +The provided function takes the same parameters as the `QueuePusher.push` method for a single message and can return any effect. + +These variants are available on test entities. + +## Testing message contexts + +If you need to unit test different behavior on message contexts, you can use the @:api(com.commercetools.queue.testing.TestingMessageContext) class. +It allows you to create a message context with different behaviors. + +For instance, if you want to check the behavior of a failing ack on a message, you can use this approach: + + +```scala mdoc:crash +val ctx = TestingMessageContext("payload").failing(new Exception("BOOM!")) + +ctx.ack().unsafeRunSync() +``` + +[test-runtime]: https://typelevel.org/cats-effect/docs/core/test-runtime diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala index b62f557..4c03906 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPusher.scala @@ -21,7 +21,7 @@ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.monadError._ import cats.syntax.traverse._ -import com.commercetools.queue.{QueuePusher, Serializer} +import com.commercetools.queue.{Serializer, UnsealedQueuePusher} import com.google.cloud.pubsub.v1.stub.PublisherStub import com.google.protobuf.ByteString import com.google.pubsub.v1.{PublishRequest, PubsubMessage, TopicName} @@ -37,7 +37,7 @@ private class PubSubPusher[F[_], T]( )(implicit F: Async[F], serializer: Serializer[T]) - extends QueuePusher[F, T] { + extends UnsealedQueuePusher[F, T] { private def makeMessage(payload: T, metadata: Map[String, String], waitUntil: Option[Instant]): F[PubsubMessage] = F.delay { diff --git a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala index f6899f8..0413d73 100644 --- a/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala +++ b/otel4s/src/main/scala/com/commercetools/queue/otel4s/MeasuringQueuePusher.scala @@ -18,7 +18,7 @@ package com.commercetools.queue.otel4s import cats.effect.MonadCancel import cats.effect.syntax.monadCancel._ -import com.commercetools.queue.QueuePusher +import com.commercetools.queue.{QueuePusher, UnsealedQueuePusher} import org.typelevel.otel4s.trace.Tracer import scala.concurrent.duration.FiniteDuration @@ -28,7 +28,7 @@ private class MeasuringQueuePusher[F[_], T]( metrics: QueueMetrics[F], tracer: Tracer[F] )(implicit F: MonadCancel[F, Throwable]) - extends QueuePusher[F, T] { + extends UnsealedQueuePusher[F, T] { override def queueName: String = underlying.queueName diff --git a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala index 8bc7cba..afb0463 100644 --- a/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala +++ b/otel4s/src/test/scala/com/commercetools/queue/otel4s/MeasuringPusherSuite.scala @@ -19,12 +19,11 @@ package com.commercetools.queue.otel4s import cats.data.Chain import cats.effect.IO import com.commercetools.queue.QueuePusher +import com.commercetools.queue.testing.TestQueuePusher import munit.CatsEffectSuite import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Tracer -import scala.concurrent.duration.FiniteDuration - class MeasuringPusherSuite extends CatsEffectSuite { self => @@ -32,17 +31,8 @@ class MeasuringPusherSuite extends CatsEffectSuite { val queueAttribute = Attribute("queue", queueName) - def pusher(result: IO[Unit]) = new QueuePusher[IO, String] { - - override def queueName: String = self.queueName - - override def push(message: String, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] = - result - - override def push(messages: List[(String, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] = - result - - } + def pusher(result: IO[Unit]): QueuePusher[IO, String] = + TestQueuePusher.fromPush[String]((_, _, _) => result) test("Successfully pushing one message results in incrementing the counter") { NaiveCounter.create.flatMap { counter => diff --git a/core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala b/testing/src/main/scala/com/commercetools/queue/testing/LockedTestMessage.scala similarity index 100% rename from core/src/test/scala/com/commercetools/queue/testing/LockedTestMessage.scala rename to testing/src/main/scala/com/commercetools/queue/testing/LockedTestMessage.scala diff --git a/core/src/test/scala/com/commercetools/queue/testing/QueueState.scala b/testing/src/main/scala/com/commercetools/queue/testing/QueueState.scala similarity index 100% rename from core/src/test/scala/com/commercetools/queue/testing/QueueState.scala rename to testing/src/main/scala/com/commercetools/queue/testing/QueueState.scala diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestMessage.scala similarity index 100% rename from core/src/test/scala/com/commercetools/queue/testing/TestMessage.scala rename to testing/src/main/scala/com/commercetools/queue/testing/TestMessage.scala diff --git a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueue.scala similarity index 88% rename from core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala rename to testing/src/main/scala/com/commercetools/queue/testing/TestQueue.scala index b836a98..ec859cc 100644 --- a/core/src/test/scala/com/commercetools/queue/testing/TestQueue.scala +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueue.scala @@ -26,6 +26,16 @@ import fs2.Chunk import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration +/** + * In-memory queue for testing purpose only. + * Use this as a queue implementation in unit tests. This queue can then be used to create: + * - a [[TestQueuePuller]] + * - a [[TestQueueSubscriber]] + * - a [[TestQueuePusher]] + * - a [[TestQueuePublisher]] + * + * It provides accessors to its internal state to check it during tests. It also provides way to set the state to desired values directly. + */ class TestQueue[T]( val name: String, state: AtomicCell[IO, QueueState[T]], @@ -138,3 +148,16 @@ class TestQueue[T]( } } + +object TestQueue { + + def apply[T]( + name: String, + messageTTL: FiniteDuration, + lockTTL: FiniteDuration + ): IO[TestQueue[T]] = + AtomicCell[IO] + .of(QueueState[T](Heap.empty, List.empty, Map.empty)) + .map(new TestQueue[T](name, _, messageTTL, lockTTL)) + +} diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePublisher.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePublisher.scala new file mode 100644 index 0000000..d3f1365 --- /dev/null +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePublisher.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.{IO, Resource} +import com.commercetools.queue.{QueuePublisher, QueuePusher, UnsealedQueuePublisher} + +import scala.concurrent.duration.FiniteDuration + +final private class TestQueuePublisher[T](queue: TestQueue[T]) extends UnsealedQueuePublisher[IO, T] { + + override val queueName = queue.name + + override def pusher: Resource[IO, QueuePusher[IO, T]] = Resource.pure(new TestQueuePusher(queue)) + +} + +/** + * Utilities to create [[QueuePublisher]]s for testing purpose. + */ +object TestQueuePublisher { + + /** Creates a test publisher based on the provided test queue. */ + def apply[T](queue: TestQueue[T]): QueuePublisher[IO, T] = new TestQueuePublisher[T](queue) + + /** Creates a testing publisher based on a function to execute for each pushed message. */ + def fromPusher[T](onPush: (T, Map[String, String], Option[FiniteDuration]) => IO[Unit]): QueuePublisher[IO, T] = + new UnsealedQueuePublisher[IO, T] { + + override def queueName: String = "mock-queue" + + override def pusher: Resource[IO, QueuePusher[IO, T]] = Resource.pure(TestQueuePusher.fromPush(onPush)) + + } + +} diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala new file mode 100644 index 0000000..b947f65 --- /dev/null +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePuller.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.IO +import com.commercetools.queue.{MessageContext, QueuePuller, UnsealedQueuePuller} +import fs2.Chunk + +import scala.concurrent.duration.FiniteDuration + +/** + * A queue puller for testing purpose only. + * + * It always wait for the provided waiting time before returning messages, even if the messages are already available. + */ +final private class TestQueuePuller[T](queue: TestQueue[T]) extends UnsealedQueuePuller[IO, T] { + + override val queueName: String = queue.name + + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = + IO.sleep(waitingTime) *> queue.lockMessages(batchSize) + +} + +/** + * Utilities to create [[QueuePuller]]s for testing purpose. + */ +object TestQueuePuller { + + /** Creates a test puller based on the provided test queue. */ + def apply[T](queue: TestQueue[T]): QueuePuller[IO, T] = + new TestQueuePuller[T](queue) + + /** Creates a testing puller based on a function to execute for each pull. */ + def fromPull[T](onPull: (Int, FiniteDuration) => IO[Chunk[MessageContext[IO, T]]]): QueuePuller[IO, T] = + new UnsealedQueuePuller[IO, T] { + + override def queueName: String = "mock-queue" + + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): IO[Chunk[MessageContext[IO, T]]] = + onPull(batchSize, waitingTime) + + } + +} diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePusher.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePusher.scala new file mode 100644 index 0000000..8856d2d --- /dev/null +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueuePusher.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.IO +import cats.syntax.foldable._ +import com.commercetools.queue.{QueuePusher, UnsealedQueuePusher} + +import scala.concurrent.duration.FiniteDuration + +final private class TestQueuePusher[T](queue: TestQueue[T]) extends UnsealedQueuePusher[IO, T] { + + override val queueName: String = queue.name + + override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] = + queue.enqeueMessages((message, metadata) :: Nil, delay) + + override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] = + queue.enqeueMessages(messages, delay) + +} + +/** + * Utilities to create [[QueuePusher]]s for testing purpose. + */ +object TestQueuePusher { + + /** Creates a test pusher based on the provided test queue. */ + def apply[T](queue: TestQueue[T]): QueuePusher[IO, T] = new TestQueuePusher[T](queue) + + /** Creates a testing pusher based on a function to execute for each pushed message. */ + def fromPush[T](onPush: (T, Map[String, String], Option[FiniteDuration]) => IO[Unit]): QueuePusher[IO, T] = + new UnsealedQueuePusher[IO, T] { + + override def queueName: String = "mock-queue" + + override def push(message: T, metadata: Map[String, String], delay: Option[FiniteDuration]): IO[Unit] = + onPush(message, metadata, delay) + + override def push(messages: List[(T, Map[String, String])], delay: Option[FiniteDuration]): IO[Unit] = + messages.traverse_ { case (message, metadata) => + onPush(message, metadata, delay) + } + + } + +} diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala new file mode 100644 index 0000000..9f8f158 --- /dev/null +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestQueueSubscriber.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.{IO, Resource} +import com.commercetools.queue.{MessageContext, QueuePuller, QueueSubscriber, UnsealedQueueSubscriber} +import fs2.Chunk + +import scala.concurrent.duration.FiniteDuration + +/** + * A queue subscriber for testing purpose only. + */ +final private class TestQueueSubscriber[T](queue: TestQueue[T]) extends UnsealedQueueSubscriber[IO, T] { + + override val queueName: String = queue.name + + override def puller: Resource[IO, QueuePuller[IO, T]] = Resource.pure(new TestQueuePuller(queue)) + +} + +/** + * Utilities to create [[QueueSubscriber]]s for testing purpose. + */ +object TestQueueSubscriber { + + /** Creates a test subscriber based on the provided test queue. */ + def apply[T](queue: TestQueue[T]): QueueSubscriber[IO, T] = + new TestQueueSubscriber[T](queue) + + /** Creates a testing subscriber based on a function to execute for each pull. */ + def fromPuller[T](onPull: (Int, FiniteDuration) => IO[Chunk[MessageContext[IO, T]]]): QueueSubscriber[IO, T] = + new UnsealedQueueSubscriber[IO, T] { + + override def queueName: String = "mock-queue" + + override def puller: Resource[IO, QueuePuller[IO, T]] = + Resource.pure(TestQueuePuller.fromPull(onPull)) + + } + +} diff --git a/testing/src/main/scala/com/commercetools/queue/testing/TestingMessageContext.scala b/testing/src/main/scala/com/commercetools/queue/testing/TestingMessageContext.scala new file mode 100644 index 0000000..d5f2872 --- /dev/null +++ b/testing/src/main/scala/com/commercetools/queue/testing/TestingMessageContext.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Commercetools GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.commercetools.queue.testing + +import cats.effect.IO +import com.commercetools.queue.{MessageContext, UnsealedMessageContext} + +import java.time.Instant + +/** + * A message context for testing purpose. + */ +case class TestingMessageContext[T]( + payload: T, + enqueuedAt: Instant = Instant.EPOCH, + messageId: String = "", + metadata: Map[String, String] = Map.empty) { + self => + + /** A message context that performs the provided effects on every action. */ + def forEffects(onAck: IO[Unit], onNack: IO[Unit], onExtendLock: IO[Unit]): MessageContext[IO, T] = + new UnsealedMessageContext[IO, T] { + override def messageId: String = self.messageId + override def payload: IO[T] = IO.pure(self.payload) + override def rawPayload: String = self.payload.toString() + override def enqueuedAt: Instant = self.enqueuedAt + override def metadata: Map[String, String] = self.metadata + override def ack(): IO[Unit] = onAck + override def nack(): IO[Unit] = onNack + override def extendLock(): IO[Unit] = onExtendLock + } + + /** A message context that does not perform anything on any action. */ + def noop: MessageContext[IO, T] = + forEffects(IO.unit, IO.unit, IO.unit) + + /** A message context that raises the provided exception on every action. */ + def failing(t: Exception): MessageContext[IO, T] = + forEffects(IO.raiseError(t), IO.raiseError(t), IO.raiseError(t)) + + /** A message context that returns a canceled `IO` on every action. */ + def canceled: MessageContext[IO, T] = + forEffects(IO.canceled, IO.canceled, IO.canceled) + +} diff --git a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala b/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala similarity index 95% rename from core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala rename to testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala index de8f50f..382fea2 100644 --- a/core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala +++ b/testing/src/test/scala/com/commercetools/queue/SubscriberSuite.scala @@ -16,7 +16,6 @@ package com.commercetools.queue -import cats.collections.Heap import cats.effect.IO import cats.effect.std.AtomicCell import cats.effect.testkit.TestControl @@ -31,14 +30,10 @@ import scala.concurrent.duration._ class SubscriberSuite extends CatsEffectSuite { val queueSub = ResourceFixture( - AtomicCell[IO] - .of(testing.QueueState[String](Heap.empty, List.empty, Map.empty)) - .map { state => - val queue = - new TestQueue[String](name = "test-queue", state = state, messageTTL = 15.minutes, lockTTL = 1.minute) - (queue, new TestQueueSubscriber(queue), new TestQueuePublisher(queue)) - } - .toResource) + TestQueue[String](name = "test-queue", messageTTL = 15.minutes, lockTTL = 1.minute).toResource + .map { queue => + (queue, TestQueueSubscriber(queue), TestQueuePublisher(queue)) + }) queueSub.test("Successful messages must be acked") { case (queue, subscriber, _) => TestControl