diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
index cea82483..5325b43a 100644
--- a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -297,9 +297,8 @@ public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configPr
return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
}
- public static SdkAsyncHttpClient createAsyncHttpClient(
- final Properties configProperties,
- final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ @VisibleForTesting
+ static AttributeMap getSdkHttpConfigurationOptions(final Properties configProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
@@ -335,7 +334,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));
- return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);
+
+ return clientConfiguration.build();
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final Properties configProperties,
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ return createAsyncHttpClient(
+ getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
}
public static SdkAsyncHttpClient createAsyncHttpClient(
@@ -355,6 +362,12 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
}
+ public static SdkHttpClient createSyncHttpClient(
+ final Properties configProperties, final ApacheHttpClient.Builder httpClientBuilder) {
+ return createSyncHttpClient(
+ getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
+ }
+
public static SdkHttpClient createSyncHttpClient(
final AttributeMap config, final ApacheHttpClient.Builder httpClientBuilder) {
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 207f567d..5bea73bb 100644
--- a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -115,7 +115,11 @@ public static void createBucket(S3Client s3Client, String bucketName) {
}
public static void createIAMRole(IamClient iam, String roleName) {
- CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
+ CreateRoleRequest request =
+ CreateRoleRequest.builder()
+ .roleName(roleName)
+ .assumeRolePolicyDocument("{}")
+ .build();
iam.createRole(request);
}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
index 0c2c2a95..67331e3d 100644
--- a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.connector.aws.util;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -28,7 +30,9 @@
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -50,6 +54,7 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
+import static org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
@@ -204,6 +209,21 @@ void testCreateAwsSyncClientWithOverrideConfiguration() {
.isTrue();
}
+ @Test
+ void testGetSdkHttpConfigurationOptions() {
+ Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+ properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true");
+ properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ AttributeMap options = getSdkHttpConfigurationOptions(properties);
+
+ assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue();
+ assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse();
+ assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse();
+ assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
+ .isTrue();
+ assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1);
+ }
+
@Test
void testCreateKinesisAsyncClientWithEndpointOverride() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
index 639dbbb8..dc3f6a12 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
@@ -122,6 +122,12 @@ under the License.
test
+
+ software.amazon.awssdk
+ s3
+ test
+
+
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 1ba37045..78383570 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -57,7 +57,6 @@
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.utils.AttributeMap;
import java.time.Duration;
import java.util.Map;
@@ -193,10 +192,6 @@ public SimpleVersionedSerializer getSplitSerializer() {
}
private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
- SdkHttpClient httpClient =
- AWSGeneralUtil.createSyncHttpClient(
- AttributeMap.builder().build(), ApacheHttpClient.builder());
-
String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
@@ -219,6 +214,10 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));
+ SdkHttpClient httpClient =
+ AWSGeneralUtil.createSyncHttpClient(
+ kinesisClientProperties, ApacheHttpClient.builder());
+
AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
new file mode 100644
index 00000000..4a2adec2
--- /dev/null
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
@@ -0,0 +1,220 @@
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/** IT cases for using {@code KinesisStreamsSource} using a localstack container. */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+public class KinesisStreamsSourceITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceITCase.class);
+ private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";
+
+ @Container
+ private static final LocalstackContainer MOCK_KINESIS_CONTAINER =
+ new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION));
+
+ private StreamExecutionEnvironment env;
+ private SdkHttpClient httpClient;
+ private KinesisClient kinesisClient;
+
+ @BeforeEach
+ void setUp() {
+ System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ kinesisClient =
+ AWSServicesTestUtils.createAwsSyncClient(
+ MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient, KinesisClient.builder());
+ }
+
+ @AfterEach
+ void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+ }
+
+ @Test
+ void nonExistentStreamShouldResultInFailure() {
+ Assertions.assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ new Scenario()
+ .localstackStreamName("stream-exists")
+ .withSourceConnectionStreamArn(
+ "arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists")
+ .runScenario())
+ .withStackTraceContaining(
+ "Stream arn arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not found");
+ }
+
+ @Test
+ void validStreamIsConsumed() throws Exception {
+ new Scenario()
+ .localstackStreamName("valid-stream")
+ .withSourceConnectionStreamArn(
+ "arn:aws:kinesis:ap-southeast-1:000000000000:stream/valid-stream")
+ .runScenario();
+ }
+
+ private Configuration getDefaultConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint());
+ configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId");
+ configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey");
+ configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+ configuration.setString(TRUST_ALL_CERTIFICATES, "true");
+ configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ return configuration;
+ }
+
+ private class Scenario {
+ private final int expectedElements = 1000;
+ private String localstackStreamName = null;
+ private String sourceConnectionStreamArn;
+ private final Configuration configuration =
+ KinesisStreamsSourceITCase.this.getDefaultConfiguration();
+
+ public void runScenario() throws Exception {
+ if (localstackStreamName != null) {
+ prepareStream(localstackStreamName);
+ }
+
+ putRecords(localstackStreamName, expectedElements);
+
+ KinesisStreamsSource kdsSource =
+ KinesisStreamsSource.builder()
+ .setStreamArn(sourceConnectionStreamArn)
+ .setSourceConfig(configuration)
+ .setDeserializationSchema(new SimpleStringSchema())
+ .build();
+
+ List result =
+ env.fromSource(kdsSource, WatermarkStrategy.noWatermarks(), "Kinesis source")
+ .returns(TypeInformation.of(String.class))
+ .executeAndCollect(expectedElements);
+
+ Assertions.assertThat(result.size()).isEqualTo(expectedElements);
+ }
+
+ public Scenario withSourceConnectionStreamArn(String sourceConnectionStreamArn) {
+ this.sourceConnectionStreamArn = sourceConnectionStreamArn;
+ return this;
+ }
+
+ public Scenario localstackStreamName(String localstackStreamName) {
+ this.localstackStreamName = localstackStreamName;
+ return this;
+ }
+
+ private void prepareStream(String streamName) throws Exception {
+ final RateLimiter rateLimiter =
+ RateLimiterBuilder.newBuilder()
+ .withRate(1, SECONDS)
+ .withConstantThroughput()
+ .build();
+
+ kinesisClient.createStream(
+ CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+ while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
+ if (deadline.isOverdue()) {
+ throw new RuntimeException("Failed to create stream within time");
+ }
+ }
+ }
+
+ private void putRecords(String streamName, int numRecords) {
+ List messages =
+ IntStream.range(0, numRecords)
+ .mapToObj(String::valueOf)
+ .map(String::getBytes)
+ .collect(Collectors.toList());
+
+ for (List partition : Lists.partition(messages, 500)) {
+ List entries =
+ partition.stream()
+ .map(
+ msg ->
+ PutRecordsRequestEntry.builder()
+ .partitionKey("fakePartitionKey")
+ .data(SdkBytes.fromByteArray(msg))
+ .build())
+ .collect(Collectors.toList());
+ PutRecordsRequest requests =
+ PutRecordsRequest.builder().streamName(streamName).records(entries).build();
+ PutRecordsResponse putRecordResult = kinesisClient.putRecords(requests);
+ for (PutRecordsResultEntry result : putRecordResult.records()) {
+ LOG.debug("Added record: {}", result.sequenceNumber());
+ }
+ }
+ }
+
+ private boolean streamExists(final String streamName) {
+ try {
+ return kinesisClient
+ .describeStream(
+ DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build())
+ .streamDescription()
+ .streamStatus()
+ == StreamStatus.ACTIVE;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+}