From b207606a95d0ce508c55e69dd0dc6c598eb2fb3c Mon Sep 17 00:00:00 2001 From: Aleksandr Pilipenko Date: Fri, 9 Feb 2024 21:02:03 +0000 Subject: [PATCH] [FLINK-34407][Connectors/Kinesis] Fix unstable test --- .../FakeKinesisFanOutBehavioursFactory.java | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java index 9985fbf5..0b49ac5a 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java @@ -56,9 +56,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE; import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING; import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.DELETING; @@ -479,23 +476,18 @@ public CompletableFuture subscribeToShard( .build()); } - Subscription subscription = mock(Subscription.class); Iterator iterator = eventsToSend.iterator(); - doAnswer( - a -> { - if (!iterator.hasNext()) { - completeSubscription(subscriber); - } else { + Subscription subscription = + new FakeSubscription( + (n) -> { + if (iterator.hasNext()) { subscriber.onNext(iterator.next()); + } else { + completeSubscription(subscriber); } - - return null; - }) - .when(subscription) - .request(anyLong()); - + }); subscriber.onSubscribe(subscription); }); return null; @@ -692,6 +684,25 @@ public void close() { } } + private static class FakeSubscription implements Subscription { + + private final java.util.function.Consumer onRequest; + + public FakeSubscription(java.util.function.Consumer onRequest) { + this.onRequest = onRequest; + } + + @Override + public void request(long n) { + onRequest.accept(n); + } + + @Override + public void cancel() { + // Nothing to do + } + } + private static Record createRecord(final AtomicInteger sequenceNumber) { return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber); }