Skip to content

Commit

Permalink
Merge pull request #71 from commercetools/testQueuePuller-fix
Browse files Browse the repository at this point in the history
Fix SubscriberSuite
  • Loading branch information
AL333Z authored Oct 7, 2024
2 parents a10d764 + ecd6aa2 commit a769474
Showing 1 changed file with 26 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,11 @@ class SubscriberSuite extends CatsEffectSuite {
.interruptAfter(3.seconds)
.compile
.foldMonoid
_ <- assertIO(IO.pure(result), 100)
_ <- assertIO(queue.getAvailableMessages, Nil)
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield result)
.flatMap { result =>
for {
_ <- assertIO(IO.pure(result), 100)
_ <- assertIO(queue.getAvailableMessages, Nil)
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ()
}
}

queueSub.test("Messages must be unack'ed if processing fails and emit everything up to failure") {
Expand All @@ -107,17 +103,13 @@ class SubscriberSuite extends CatsEffectSuite {
.attempt
.compile
.toList
} yield (messages, result))
.flatMap { case (originals, result) =>
for {
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init.map(_.map(_.rawPayload))), originals.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, originals.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ()
}
// check that all messages were consumed up to message #43
_ <- assertIO(IO.pure(result.init.map(_.map(_.rawPayload))), messages.take(43).map(m => Right(m.payload)))
_ <- assertIO(IO.pure(result.last.leftMap(_.getMessage())), Left("BOOM"))
_ <- assertIO(queue.getAvailableMessages, messages.drop(43))
_ <- assertIO(queue.getLockedMessages, Nil)
_ <- assertIO(queue.getDelayedMessages, Nil)
} yield ())
}

queueSub.test("Messages consumed and ok'ed or drop'ed should follow the decision") {
Expand All @@ -133,13 +125,9 @@ class SubscriberSuite extends CatsEffectSuite {
.interruptAfter(3.seconds)
.compile
.foldMonoid
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield ()
}
}

queueSub.test("Messages consumed and confirmed or dropped should follow the decision") {
Expand All @@ -155,13 +143,9 @@ class SubscriberSuite extends CatsEffectSuite {
.take(50)
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield ()
}
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 50.asRight)
} yield ())
}

queueSub.test("Messages consumed and requeued should follow the decision") { case (queue, subscriber, publisher) =>
Expand All @@ -180,14 +164,10 @@ class SubscriberSuite extends CatsEffectSuite {
.compile
.foldMonoid
totOpCount <- opCounter.get
} yield (result, totOpCount))
.flatMap { case (result, totOpCount) =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(totOpCount, 200)
_ = assertEquals(result, 100.asRight)
} yield ()
}
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(totOpCount, 200)
_ = assertEquals(result, 100.asRight)
} yield ())
}

queueSub.test("Messages that are marked as failed and acked should follow the decision") {
Expand All @@ -203,13 +183,9 @@ class SubscriberSuite extends CatsEffectSuite {
.collect { case Left(_) => 1 }
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 100)
} yield ()
}
_ <- assertIO(queue.getAvailableMessages, Nil)
_ = assertEquals(result, 100)
} yield ())
}

queueSub.test("Messages that are marked as failed and not acked should follow the decision") {
Expand All @@ -225,12 +201,8 @@ class SubscriberSuite extends CatsEffectSuite {
.collect { case Left(_) => 1 }
.compile
.foldMonoid
} yield result)
.flatMap { result =>
for {
_ <- assertIO(queue.getAvailableMessages.map(_.size), 100)
_ = assertEquals(result, 100)
} yield ()
}
_ <- assertIO(queue.getAvailableMessages.map(_.size), 100)
_ = assertEquals(result, 100)
} yield ())
}
}

0 comments on commit a769474

Please sign in to comment.