From b96b69dfd3c9da8b028ad3d1c3c574f90bf41697 Mon Sep 17 00:00:00 2001 From: Hong Teoh Date: Tue, 24 Sep 2024 09:25:05 +0100 Subject: [PATCH] [FLINK-36357][Connectors/Kinesis] Set up AWS SDK default retry strategy configurations --- .../kinesis/source/KinesisStreamsSource.java | 27 ++----------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 2977005f..1ba37045 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -50,12 +50,10 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.UserCodeClassLoader; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.awscore.internal.AwsErrorCode; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.retries.StandardRetryStrategy; import software.amazon.awssdk.retries.api.BackoffStrategy; import software.amazon.awssdk.retries.api.RetryStrategy; import software.amazon.awssdk.services.kinesis.KinesisClient; @@ -254,31 +252,10 @@ private RetryStrategy createExpBackoffRetryStrategy( final BackoffStrategy backoffStrategy = BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay); - return StandardRetryStrategy.builder() + return SdkDefaultRetryStrategy.standardRetryStrategyBuilder() .backoffStrategy(backoffStrategy) .throttlingBackoffStrategy(backoffStrategy) .maxAttempts(maxAttempts) - .retryOnException( - throwable -> { - if (throwable instanceof AwsServiceException) { - AwsServiceException exception = (AwsServiceException) throwable; - return (AwsErrorCode.RETRYABLE_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode())) - || (AwsErrorCode.THROTTLING_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode())); - } - return false; - }) - .treatAsThrottling( - throwable -> { - if (throwable instanceof AwsServiceException) { - AwsServiceException exception = (AwsServiceException) throwable; - return AwsErrorCode.THROTTLING_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode()); - } - return false; - }) - .circuitBreakerEnabled(false) .build(); } }