From c525e9aa66dede30d1a9d2f70a2ec92ee5ab15e5 Mon Sep 17 00:00:00 2001 From: gguptp Date: Thu, 19 Sep 2024 19:05:23 +0530 Subject: [PATCH] [FLINK-36329][Connectors/DynamoDB] Changing retry strategy to use SDK retries to catch all retryable exceptions --- .../source/DynamoDbStreamsSource.java | 32 +++---------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java index 5a4d60a5..14f94e7b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java @@ -49,9 +49,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.UserCodeClassLoader; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.awscore.internal.AwsErrorCode; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.retries.AdaptiveRetryStrategy; @@ -202,7 +201,7 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); - int maxDescribeStreamCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); + int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); Duration minDescribeStreamDelay = sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY); Duration maxDescribeStreamDelay = @@ -210,32 +209,11 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo BackoffStrategy backoffStrategy = BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); AdaptiveRetryStrategy adaptiveRetryStrategy = - AdaptiveRetryStrategy.builder() - .maxAttempts(maxDescribeStreamCallAttempts) + SdkDefaultRetryStrategy.adaptiveRetryStrategy() + .toBuilder() + .maxAttempts(maxApiCallAttempts) .backoffStrategy(backoffStrategy) .throttlingBackoffStrategy(backoffStrategy) - .retryOnException( - throwable -> { - if (throwable instanceof AwsServiceException) { - AwsServiceException exception = - (AwsServiceException) throwable; - return (AwsErrorCode.RETRYABLE_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode())) - || (AwsErrorCode.THROTTLING_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode())); - } - return false; - }) - .treatAsThrottling( - throwable -> { - if (throwable instanceof AwsServiceException) { - AwsServiceException exception = - (AwsServiceException) throwable; - return AwsErrorCode.THROTTLING_ERROR_CODES.contains( - exception.awsErrorDetails().errorCode()); - } - return false; - }) .build(); DynamoDbStreamsClient dynamoDbStreamsClient = AWSClientUtil.createAwsSyncClient(