Skip to content

Commit

Permalink
[FLINK-34407][Connectors/Kinesis] Fix unstable test
Browse files Browse the repository at this point in the history
  • Loading branch information
z3d1k authored and hlteoh37 committed Feb 10, 2024
1 parent 05fdf37 commit b207606
Showing 1 changed file with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE;
import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING;
import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.DELETING;
Expand Down Expand Up @@ -479,23 +476,18 @@ public CompletableFuture<Void> subscribeToShard(
.build());
}

Subscription subscription = mock(Subscription.class);
Iterator<SubscribeToShardEvent> iterator =
eventsToSend.iterator();

doAnswer(
a -> {
if (!iterator.hasNext()) {
completeSubscription(subscriber);
} else {
Subscription subscription =
new FakeSubscription(
(n) -> {
if (iterator.hasNext()) {
subscriber.onNext(iterator.next());
} else {
completeSubscription(subscriber);
}

return null;
})
.when(subscription)
.request(anyLong());

});
subscriber.onSubscribe(subscription);
});
return null;
Expand Down Expand Up @@ -692,6 +684,25 @@ public void close() {
}
}

private static class FakeSubscription implements Subscription {

private final java.util.function.Consumer<Long> onRequest;

public FakeSubscription(java.util.function.Consumer<Long> onRequest) {
this.onRequest = onRequest;
}

@Override
public void request(long n) {
onRequest.accept(n);
}

@Override
public void cancel() {
// Nothing to do
}
}

private static Record createRecord(final AtomicInteger sequenceNumber) {
return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber);
}
Expand Down

0 comments on commit b207606

Please sign in to comment.