From 9508d584e96d5cfa80c339c5271dfa6ab4654cbe Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Fri, 20 Sep 2024 10:01:48 +0200 Subject: [PATCH] More specs for publisher --- .../commercetools/queue/QueuePublisher.scala | 2 +- .../queue/testkit/QueuePublisherSuite.scala | 40 ++++++++++++++++++- .../queue/testkit/QueueSubscriberSuite.scala | 2 +- 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala index dae5db1..d6c610a 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala @@ -30,7 +30,7 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab def queueName: String /** - * Returns a way to bush messages into the queue. + * Returns a way to push messages into the queue. * This is a low-level construct, mainly aiming at integrating existing * code bases that require to push explicitly. * diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala index 1c05cf0..1ebfdb7 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueuePublisherSuite.scala @@ -1,9 +1,47 @@ package com.commercetools.queue.testkit import munit.CatsEffectSuite +import fs2.Stream +import cats.syntax.all._ + +import scala.concurrent.duration.DurationInt /** * This suite tests that the features of a [[com.commercetools.queue.QueuePublisher QueuePublisher]] are properly * implemented for a concrete client. */ -trait QueuePublisherSuite extends CatsEffectSuite {} +trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite => + + withQueue.test("sink publishes all the messages") { queueName => + val client = clientFixture() + for { + msgs <- randomMessages(30) + _ <- Stream + .emits(msgs) + .through(client.publish(queueName).sink(batchSize = 10)) + .compile + .drain + messagesInQueue <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages) + _ = assertEquals(messagesInQueue, msgs.size) + } yield () + } + + withQueue.test("sink publishes all the messages with a delay") { queueName => + assume(delayedMessagesStatsSupported, "The test environment does not support delayed messages stats") + val client = clientFixture() + for { + msgs <- randomMessages(30) + _ <- Stream + .emits(msgs) + .through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some)) + .compile + .drain + statsFetcher = client.statistics(queueName).fetcher + messagesInQueue <- statsFetcher.use(_.fetch).map(_.messages) + delayedMessages <- statsFetcher.use(_.fetch).map(_.delayed) + _ = assertEquals(delayedMessages, msgs.size.some, "delayed messages are not what we expect") + _ = assertEquals(messagesInQueue, 0, "the queue is not empty") + } yield () + } + +} diff --git a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala index 448c488..2abe296 100644 --- a/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala +++ b/testkit/src/main/scala/com/commercetools/queue/testkit/QueueSubscriberSuite.scala @@ -142,7 +142,7 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite => } yield () } - withQueue.test("process respect the decision from the handler") { queueName => + withQueue.test("process respects the decision from the handler") { queueName => val client = clientFixture() for { _ <- Stream