Skip to content

Commit

Permalink
More specs for publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed Sep 20, 2024
1 parent df7eda6 commit 9508d58
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab
def queueName: String

/**
* Returns a way to bush messages into the queue.
* Returns a way to push messages into the queue.
* This is a low-level construct, mainly aiming at integrating existing
* code bases that require to push explicitly.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
package com.commercetools.queue.testkit

import munit.CatsEffectSuite
import fs2.Stream
import cats.syntax.all._

import scala.concurrent.duration.DurationInt

/**
* This suite tests that the features of a [[com.commercetools.queue.QueuePublisher QueuePublisher]] are properly
* implemented for a concrete client.
*/
trait QueuePublisherSuite extends CatsEffectSuite {}
trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite =>

withQueue.test("sink publishes all the messages") { queueName =>
val client = clientFixture()
for {
msgs <- randomMessages(30)
_ <- Stream
.emits(msgs)
.through(client.publish(queueName).sink(batchSize = 10))
.compile
.drain
messagesInQueue <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages)
_ = assertEquals(messagesInQueue, msgs.size)
} yield ()
}

withQueue.test("sink publishes all the messages with a delay") { queueName =>
assume(delayedMessagesStatsSupported, "The test environment does not support delayed messages stats")
val client = clientFixture()
for {
msgs <- randomMessages(30)
_ <- Stream
.emits(msgs)
.through(client.publish(queueName).sink(batchSize = 10, delay = 1.minute.some))
.compile
.drain
statsFetcher = client.statistics(queueName).fetcher
messagesInQueue <- statsFetcher.use(_.fetch).map(_.messages)
delayedMessages <- statsFetcher.use(_.fetch).map(_.delayed)
_ = assertEquals(delayedMessages, msgs.size.some, "delayed messages are not what we expect")
_ = assertEquals(messagesInQueue, 0, "the queue is not empty")
} yield ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite =>
} yield ()
}

withQueue.test("process respect the decision from the handler") { queueName =>
withQueue.test("process respects the decision from the handler") { queueName =>
val client = clientFixture()
for {
_ <- Stream
Expand Down

0 comments on commit 9508d58

Please sign in to comment.