Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36357][Connectors/Kinesis] Set up AWS SDK default retry strate… #170

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.awscore.internal.AwsErrorCode;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.StandardRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
Expand Down Expand Up @@ -254,31 +252,10 @@ private RetryStrategy createExpBackoffRetryStrategy(
final BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay);

return StandardRetryStrategy.builder()
return SdkDefaultRetryStrategy.standardRetryStrategyBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any thoughts on how this would work with a user defined retryable exceptions improvement like the one in the old connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any thoughts on how this would work with a user defined retryable exceptions improvement like the one in the old connector?

Good qn - we can add more predicates by calling retryOnException(), but we won't be able to override the ones already marked as non-retryable in the SDK defaults, because I think it does eager checks for non-retryable errors.

.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.maxAttempts(maxAttempts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circuit breaker is enabled in this strategy by default. Is this intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering that - is there a reason we really don't want to enable circuit breaker for our setup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. I believe it will be useful, we just need to document this as a new change as it might be surprising for users.

.retryOnException(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception = (AwsServiceException) throwable;
return (AwsErrorCode.RETRYABLE_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()))
|| (AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()));
}
return false;
})
.treatAsThrottling(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception = (AwsServiceException) throwable;
return AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode());
}
return false;
})
.circuitBreakerEnabled(false)
.build();
}
}
Loading