Skip to content

Commit

Permalink
[FLINK-31183][Connector/Kinesis] Fix bug where EFO Consumer can fail …
Browse files Browse the repository at this point in the history
…to stop gracefully during stop-with-savepoint
  • Loading branch information
dannycranmer committed Feb 23, 2023
1 parent 98cde3d commit fdfe982
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void run() {
// we can close this consumer thread once we've reached the end of the
// subscribed shard
break;
} else if (result == CANCELLED) {
} else if (isRunning() && result == CANCELLED) {
final String errorMessage =
"Shard consumer cancelled: " + subscribedShard.getShard().getShardId();
LOG.info(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class FanOutRecordPublisher implements RecordPublisher {

private final FanOutRecordPublisherConfiguration configuration;

private final Supplier<Boolean> runningSupplier;

/** The current attempt in the case of subsequent recoverable errors. */
private int attempt = 0;

Expand All @@ -84,20 +87,23 @@ public class FanOutRecordPublisher implements RecordPublisher {
* @param subscribedShard the shard to consumer from
* @param kinesisProxy the proxy used to talk to Kinesis services
* @param configuration the record publisher configuration
* @param runningSupplier a callback to query if the consumer is still running
*/
public FanOutRecordPublisher(
final StartingPosition startingPosition,
final String consumerArn,
final StreamShardHandle subscribedShard,
final KinesisProxyAsyncV2Interface kinesisProxy,
final FanOutRecordPublisherConfiguration configuration,
final FullJitterBackoff backoff) {
final FullJitterBackoff backoff,
final Supplier<Boolean> runningSupplier) {
this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);
this.consumerArn = Preconditions.checkNotNull(consumerArn);
this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
this.configuration = Preconditions.checkNotNull(configuration);
this.backoff = Preconditions.checkNotNull(backoff);
this.runningSupplier = runningSupplier;
}

@Override
Expand Down Expand Up @@ -161,11 +167,12 @@ private RecordPublisherRunResult runWithBackoff(
consumerArn,
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout());
boolean complete;
configuration.getSubscribeToShardTimeout(),
runningSupplier);
RecordPublisherRunResult result;

try {
complete =
result =
fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
attempt = 0;
Expand Down Expand Up @@ -209,7 +216,7 @@ private RecordPublisherRunResult runWithBackoff(
return INCOMPLETE;
}

return complete ? COMPLETE : INCOMPLETE;
return result;
}

private void backoff(final Throwable ex) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class FanOutRecordPublisherFactory implements RecordPublisherFactory {
*/
private final KinesisProxyAsyncV2Interface kinesisProxy;

/** A flag to indicate whether the FanOutRecordPublisherFactory has been closed. */
private boolean closed = false;

/**
* Instantiate a factory responsible for creating {@link FanOutRecordPublisher}.
*
Expand Down Expand Up @@ -89,11 +92,13 @@ public FanOutRecordPublisher create(
streamShardHandle,
kinesisProxy,
configuration,
BACKOFF);
BACKOFF,
() -> !closed);
}

@Override
public void close() {
kinesisProxy.close();
closed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -46,8 +47,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;

/**
* This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records
Expand Down Expand Up @@ -114,6 +119,8 @@ public class FanOutShardSubscriber {

private final Duration queueWaitTimeout;

private final Supplier<Boolean> runningSupplier;

/**
* Create a new Fan Out Shard subscriber.
*
Expand All @@ -122,13 +129,21 @@ public class FanOutShardSubscriber {
* @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
* @param subscribeToShardTimeout A timeout when waiting for a shard subscription to be
* established
* @param runningSupplier a callback to query if the consumer is still running
*/
FanOutShardSubscriber(
final String consumerArn,
final String shardId,
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout) {
this(consumerArn, shardId, kinesis, subscribeToShardTimeout, DEFAULT_QUEUE_TIMEOUT);
final Duration subscribeToShardTimeout,
final Supplier<Boolean> runningSupplier) {
this(
consumerArn,
shardId,
kinesis,
subscribeToShardTimeout,
DEFAULT_QUEUE_TIMEOUT,
runningSupplier);
}

/**
Expand All @@ -139,20 +154,23 @@ public class FanOutShardSubscriber {
* @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
* @param subscribeToShardTimeout A timeout when waiting for a shard subscription to be
* established
* @param queueWaitTimeout A timeout when enqueuing/de-queueing
* @param queueWaitTimeout A timeout when enqueuing/de-queueing param runningSupplier a callback
* to query if the consumer is still running
*/
@VisibleForTesting
FanOutShardSubscriber(
final String consumerArn,
final String shardId,
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout,
final Duration queueWaitTimeout) {
final Duration queueWaitTimeout,
final Supplier<Boolean> runningSupplier) {
this.kinesis = Preconditions.checkNotNull(kinesis);
this.consumerArn = Preconditions.checkNotNull(consumerArn);
this.shardId = Preconditions.checkNotNull(shardId);
this.subscribeToShardTimeout = subscribeToShardTimeout;
this.queueWaitTimeout = queueWaitTimeout;
this.runningSupplier = runningSupplier;
}

/**
Expand All @@ -162,12 +180,12 @@ public class FanOutShardSubscriber {
*
* @param startingPosition the position in the stream in which to start receiving records
* @param eventConsumer the consumer to deliver received events to
* @return true if there are no more messages (complete), false if a subsequent subscription
* should be obtained
* @return complete if there are no more messages (complete), incomplete if a subsequent
* subscription should be obtained and cancelled if the consumer was cancelled
* @throws FanOutSubscriberException when an exception is propagated from the networking stack
* @throws InterruptedException when the thread is interrupted
*/
boolean subscribeToShardAndConsumeRecords(
RecordPublisherRunResult subscribeToShardAndConsumeRecords(
final StartingPosition startingPosition,
final Consumer<SubscribeToShardEvent> eventConsumer)
throws InterruptedException, FanOutSubscriberException {
Expand Down Expand Up @@ -322,19 +340,24 @@ private boolean isInterrupted(final Throwable throwable) {
*
* @param eventConsumer the event consumer to deliver records to
* @param subscription the subscription we are subscribed to
* @return true if there are no more messages (complete), false if a subsequent subscription
* should be obtained
* @return complete if there are no more messages (complete), incomplete if a subsequent
* subscription should be obtained and cancelled if the consumer was cancelled
* @throws FanOutSubscriberException when an exception is propagated from the networking stack
* @throws InterruptedException when the thread is interrupted
*/
private boolean consumeAllRecordsFromKinesisShard(
private RecordPublisherRunResult consumeAllRecordsFromKinesisShard(
final Consumer<SubscribeToShardEvent> eventConsumer,
final FanOutShardSubscription subscription)
throws InterruptedException, FanOutSubscriberException {
String continuationSequenceNumber;
boolean result = true;
RecordPublisherRunResult result = COMPLETE;

do {
if (!runningSupplier.get()) {
LOG.info("FanOutShardSubscriber cancelled - {} ({})", shardId, consumerArn);
return CANCELLED;
}

FanOutSubscriptionEvent subscriptionEvent;
if (subscriptionErrorEvent.get() != null) {
subscriptionEvent = subscriptionErrorEvent.get();
Expand All @@ -348,7 +371,7 @@ private boolean consumeAllRecordsFromKinesisShard(
"Timed out polling events from network, reacquiring subscription - {} ({})",
shardId,
consumerArn);
result = false;
result = INCOMPLETE;
break;
} else if (subscriptionEvent.isSubscribeToShardEvent()) {
// Request for KDS to send the next record batch
Expand All @@ -361,10 +384,10 @@ private boolean consumeAllRecordsFromKinesisShard(
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
// The subscription is complete, but the shard might not be, so we return incomplete
return false;
return INCOMPLETE;
} else {
handleError(subscriptionEvent.getThrowable());
result = false;
result = INCOMPLETE;
break;
}
} while (continuationSequenceNumber != null);
Expand All @@ -391,9 +414,13 @@ private FanOutShardSubscription(final CountDownLatch waitForSubscriptionLatch) {
this.waitForSubscriptionLatch = waitForSubscriptionLatch;
}

private boolean isCancelled() {
return cancelled || !runningSupplier.get();
}

/** Flag to the producer that we are ready to receive more events. */
void requestRecord() {
if (!cancelled) {
if (!isCancelled()) {
LOG.debug(
"Requesting more records from EFO subscription - {} ({})",
shardId,
Expand Down Expand Up @@ -452,7 +479,7 @@ public void onComplete() {
}

private void cancelSubscription() {
if (cancelled) {
if (isCancelled()) {
return;
}
cancelled = true;
Expand All @@ -468,7 +495,7 @@ private void cancelSubscription() {
* @param event the event to enqueue
*/
private void enqueueEvent(final FanOutSubscriptionEvent event) {
if (cancelled) {
if (isCancelled()) {
return;
}

Expand Down
Loading

0 comments on commit fdfe982

Please sign in to comment.