Skip to content

Commit

Permalink
Update specs
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed May 31, 2024
1 parent de8e332 commit f218feb
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 19 deletions.
137 changes: 122 additions & 15 deletions core/src/test/scala/com/commercetools/queue/SubscriberSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cats.collections.Heap
import cats.effect.IO
import cats.effect.std.AtomicCell
import cats.effect.testkit.TestControl
import cats.implicits.catsSyntaxOptionId
import cats.syntax.either._
import cats.syntax.traverse._
import com.commercetools.queue.testing._
Expand Down Expand Up @@ -98,37 +99,143 @@ class SubscriberSuite extends CatsEffectSuite {
}
}

queueSub.test("Messages consumed with process must follow the decision") { case (queue, subscriber, publisher) =>
queueSub.test("Messages consumed and ok'ed or drop'ed should follow the decision") {
case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _))
}
_ <- queue.setAvailableMessages(messages)
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Drop)
else IO.pure(Decision.Ok(1))
})
.interruptAfter(3.seconds)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield ()
}
}

queueSub.test("Messages consumed and confirmed or dropped should follow the decision") {
case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _))
}
_ <- queue.setAvailableMessages(messages)
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
if (msg.rawPayload.toInt % 2 == 0) IO.pure(Decision.Ok(1))
else IO.pure(Decision.Drop)
})
.take(50)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield ()
}
}

queueSub.test("Messages consumed and requeued should follow the decision") { case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(s"message-$i", _))
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _))
}
_ <- queue.setAvailableMessages(messages)
// then process messages in batches of 5
// processing is (virtually) instantaneous in this case,
// so messages are immediately acked, from the mocked time PoV
// however, receiving messages waits for the amount of provided `waitingTime`
// in the test queue implementation, event if enough messages are available
// so this step makes time progress in steps of `waitingTime`
opCounter <- AtomicCell[IO].of(0)
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
IO.pure(Decision.Ok(1))
opCounter.update(_ + 1) >> {
// let's reenqueue at the first run, and then confirm
if (msg.metadata.contains("reenqueued")) IO.pure(Decision.Ok(1))
else IO.pure(Decision.Reenqueue(Map("reenqueued" -> "true").some, None))
}
})
.interruptAfter(3.seconds)
.take(100)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
totOpCount <- opCounter.get
} yield (result, totOpCount))
.flatMap { case (result, totOpCount) =>
for {
_ <- assertIO(IO.pure(result), 100.asRight)
_ <- assertIO(queue.getAvailableMessages, Nil)
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
_ = assertEquals(totOpCount, 200)
_ = assertEquals(result, 100.asRight)
} yield ()
}
}

queueSub.test("Messages that are marked as failed and acked should follow the decision") {
case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _))
}
_ <- queue.setAvailableMessages(messages)
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = true))
})
.take(100)
.collect { case Left(_) => 1 }
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 100)
} yield ()
}
}

queueSub.test("Messages that are marked as failed and not acked should follow the decision") {
case (queue, subscriber, publisher) =>
TestControl
.executeEmbed(for {
// first populate the queue
messages <- List.range(0, 100).traverse { i =>
IO.sleep(10.millis) *> IO.realTimeInstant.map(TestMessage(i.toString, _))
}
_ <- queue.setAvailableMessages(messages)
result <- subscriber
.process(batchSize = 5, waitingTime = 40.millis, publisher)(new MessageHandler[IO, String, Int] {
override def handle(msg: Message[IO, String]): IO[Decision[Int]] =
IO.pure(Decision.Fail(new Throwable(s"failed ${msg.rawPayload}"), ack = false))
})
.take(100)
.collect { case Left(_) => 1 }
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages.map(_.size), 100)
_ = assertEquals(result, 100)
} yield ()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final case class LockedTestMessage[T](

override def enqueuedAt: Instant = msg.enqueuedAt

override val metadata: Map[String, String] = Map.empty
override val metadata: Map[String, String] = msg.metadata

override def ack(): IO[Unit] =
// done, just delete it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import cats.Order

import java.time.Instant

final case class TestMessage[T](payload: T, enqueuedAt: Instant)
final case class TestMessage[T](payload: T, enqueuedAt: Instant, metadata: Map[String, String] = Map.empty)

object TestMessage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ class TestQueue[T](
state <- update(state)
} yield delay match {
case None =>
state.copy(available = state.available.addAll(messages.map(x => TestMessage(x._1, now))))
state.copy(available = state.available.addAll(messages.map { case (payload, metadata) =>
TestMessage(payload, now, metadata)
}))
case Some(delay) =>
val delayed = now.plusMillis(delay.toMillis)
state.copy(delayed = messages.map(x => TestMessage(x._1, delayed)) reverse_::: state.delayed)
state.copy(delayed = messages.map { case (payload, metadata) =>
TestMessage(payload, delayed, metadata)
} reverse_::: state.delayed)
}
}

Expand Down

0 comments on commit f218feb

Please sign in to comment.