Skip to content

Commit

Permalink
fix flaky JdbcProjectionTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam authored Mar 15, 2024
1 parent 5130669 commit 3eeae34
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class JdbcProjectionTest extends JUnitSuite {

Expand Down Expand Up @@ -209,13 +211,14 @@ private void expectNextUntilErrorMessage(TestSubscriber.Probe<Done> probe, Strin
}

private JdbcHandler<Envelope, PureJdbcSession> concatHandler(StringBuffer str) {
return concatHandler(str, __ -> false);
return concatHandler(str, new CountDownLatch(0), __ -> false);
}

private JdbcHandler<Envelope, PureJdbcSession> concatHandler(
StringBuffer buffer, Predicate<Long> failPredicate) {
StringBuffer buffer, CountDownLatch latch, Predicate<Long> failPredicate) {
return JdbcHandler.fromFunction(
(PureJdbcSession session, Envelope envelope) -> {
latch.countDown();
if (failPredicate.test(envelope.offset)) {
throw new RuntimeException(failMessage(envelope.offset));
} else {
Expand Down Expand Up @@ -275,21 +278,23 @@ public void exactlyOnceShouldRestartFromPreviousOffset() {
ProjectionId projectionId = genRandomProjectionId();

StringBuffer str = new StringBuffer();
CountDownLatch latch = new CountDownLatch(3);

Projection<Envelope> projection =
JdbcProjection.exactlyOnce(
projectionId,
sourceProvider(entityId),
jdbcSessionCreator,
// fail on fourth offset
() -> concatHandler(str, offset -> offset == 4),
() -> concatHandler(str, latch, offset -> offset == 4),
testKit.system());

projectionTestKit.runWithTestSink(
projection,
(probe) -> {
probe.request(3);
probe.expectNextN(3);
assertTrue(latch.await(3, TimeUnit.SECONDS));
assertEquals("abc|def|ghi|", str.toString());
expectNextUntilErrorMessage(probe, failMessage(4));
});
Expand Down Expand Up @@ -326,14 +331,15 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
ProjectionId projectionId = genRandomProjectionId();

StringBuffer str = new StringBuffer();
CountDownLatch latch = new CountDownLatch(3);

Projection<Envelope> projection =
JdbcProjection.atLeastOnce(
projectionId,
sourceProvider(entityId),
jdbcSessionCreator,
// fail on fourth offset
() -> concatHandler(str, offset -> offset == 4),
() -> concatHandler(str, latch, offset -> offset == 4),
testKit.system())
.withSaveOffset(1, Duration.ZERO);

Expand All @@ -348,6 +354,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
*/
probe.request(2);
probe.expectNextN(2);
assertTrue(latch.await(3, TimeUnit.SECONDS));
assertEquals("abc|def|ghi|", str.toString());
expectNextUntilErrorMessage(probe, failMessage(4));
});
Expand Down

0 comments on commit 3eeae34

Please sign in to comment.