Skip to content

Commit

Permalink
[FLINK-36329][Connectors/DynamoDB] Changing retry strategy to use SDK…
Browse files Browse the repository at this point in the history
… retries to catch all retryable exceptions
  • Loading branch information
gguptp authored and hlteoh37 committed Sep 24, 2024
1 parent 4d4f04b commit c525e9a
Showing 1 changed file with 5 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.awscore.internal.AwsErrorCode;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
Expand Down Expand Up @@ -202,40 +201,19 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo
consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);

AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
int maxDescribeStreamCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
Duration minDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
Duration maxDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
AdaptiveRetryStrategy adaptiveRetryStrategy =
AdaptiveRetryStrategy.builder()
.maxAttempts(maxDescribeStreamCallAttempts)
SdkDefaultRetryStrategy.adaptiveRetryStrategy()
.toBuilder()
.maxAttempts(maxApiCallAttempts)
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.retryOnException(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception =
(AwsServiceException) throwable;
return (AwsErrorCode.RETRYABLE_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()))
|| (AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()));
}
return false;
})
.treatAsThrottling(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception =
(AwsServiceException) throwable;
return AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode());
}
return false;
})
.build();
DynamoDbStreamsClient dynamoDbStreamsClient =
AWSClientUtil.createAwsSyncClient(
Expand Down

0 comments on commit c525e9a

Please sign in to comment.