From b0f7a16e16ca6bf710aa4036160024631c641c76 Mon Sep 17 00:00:00 2001 From: Elphas Toringepi Date: Fri, 27 Sep 2024 17:14:50 +0100 Subject: [PATCH] [FLINK-31986][Connectors/Kinesis] Setup integration tests for source --- .../connector/aws/util/AWSGeneralUtil.java | 21 +- .../aws/testutils/AWSServicesTestUtils.java | 6 +- .../connector/aws/util/AWSClientUtilTest.java | 20 ++ .../pom.xml | 6 + .../kinesis/source/KinesisStreamsSource.java | 9 +- .../source/KinesisStreamsSourceITCase.java | 220 ++++++++++++++++++ 6 files changed, 272 insertions(+), 10 deletions(-) create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java index cea82483..5325b43a 100644 --- a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java +++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java @@ -297,9 +297,8 @@ public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configPr return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder()); } - public static SdkAsyncHttpClient createAsyncHttpClient( - final Properties configProperties, - final NettyNioAsyncHttpClient.Builder httpClientBuilder) { + @VisibleForTesting + static AttributeMap getSdkHttpConfigurationOptions(final Properties configProperties) { final AttributeMap.Builder clientConfiguration = AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true); @@ -335,7 +334,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient( protocol -> clientConfiguration.put( SdkHttpConfigurationOption.PROTOCOL, protocol)); - return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder); + + return clientConfiguration.build(); + } + + public static SdkAsyncHttpClient createAsyncHttpClient( + final Properties configProperties, + final NettyNioAsyncHttpClient.Builder httpClientBuilder) { + return createAsyncHttpClient( + getSdkHttpConfigurationOptions(configProperties), httpClientBuilder); } public static SdkAsyncHttpClient createAsyncHttpClient( @@ -355,6 +362,12 @@ public static SdkAsyncHttpClient createAsyncHttpClient( return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS)); } + public static SdkHttpClient createSyncHttpClient( + final Properties configProperties, final ApacheHttpClient.Builder httpClientBuilder) { + return createSyncHttpClient( + getSdkHttpConfigurationOptions(configProperties), httpClientBuilder); + } + public static SdkHttpClient createSyncHttpClient( final AttributeMap config, final ApacheHttpClient.Builder httpClientBuilder) { httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT); diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java index 207f567d..5bea73bb 100644 --- a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java +++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java @@ -115,7 +115,11 @@ public static void createBucket(S3Client s3Client, String bucketName) { } public static void createIAMRole(IamClient iam, String roleName) { - CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build(); + CreateRoleRequest request = + CreateRoleRequest.builder() + .roleName(roleName) + .assumeRolePolicyDocument("{}") + .build(); iam.createRole(request); } diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java index 0c2c2a95..67331e3d 100644 --- a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java +++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java @@ -17,6 +17,8 @@ package org.apache.flink.connector.aws.util; +import org.apache.flink.connector.aws.config.AWSConfigConstants; + import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; @@ -28,7 +30,9 @@ import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; +import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; @@ -50,6 +54,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; import static org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix; +import static org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -204,6 +209,21 @@ void testCreateAwsSyncClientWithOverrideConfiguration() { .isTrue(); } + @Test + void testGetSdkHttpConfigurationOptions() { + Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2"); + properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true"); + properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1"); + AttributeMap options = getSdkHttpConfigurationOptions(properties); + + assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue(); + assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse(); + assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse(); + assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue()) + .isTrue(); + assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1); + } + @Test void testCreateKinesisAsyncClientWithEndpointOverride() { Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2"); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml index 639dbbb8..dc3f6a12 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml @@ -122,6 +122,12 @@ under the License. test + + software.amazon.awssdk + s3 + test + + com.fasterxml.jackson.datatype jackson-datatype-jsr310 diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 1ba37045..78383570 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -57,7 +57,6 @@ import software.amazon.awssdk.retries.api.BackoffStrategy; import software.amazon.awssdk.retries.api.RetryStrategy; import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; import java.util.Map; @@ -193,10 +192,6 @@ public SimpleVersionedSerializer getSplitSerializer() { } private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) { - SdkHttpClient httpClient = - AWSGeneralUtil.createSyncHttpClient( - AttributeMap.builder().build(), ApacheHttpClient.builder()); - String region = AWSGeneralUtil.getRegionFromArn(streamArn) .orElseThrow( @@ -219,6 +214,10 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig AWSConfigOptions .RETRY_STRATEGY_MAX_ATTEMPTS_OPTION))); + SdkHttpClient httpClient = + AWSGeneralUtil.createSyncHttpClient( + kinesisClientProperties, ApacheHttpClient.builder()); + AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties); KinesisClient kinesisClient = AWSClientUtil.createAwsSyncClient( diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java new file mode 100644 index 00000000..4a2adec2 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java @@ -0,0 +1,220 @@ +package org.apache.flink.connector.kinesis.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import com.google.common.collect.Lists; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.rnorth.ducttape.ratelimits.RateLimiter; +import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; + +/** IT cases for using {@code KinesisStreamsSource} using a localstack container. */ +@Testcontainers +@ExtendWith(MiniClusterExtension.class) +public class KinesisStreamsSourceITCase { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceITCase.class); + private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2"; + + @Container + private static final LocalstackContainer MOCK_KINESIS_CONTAINER = + new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION)); + + private StreamExecutionEnvironment env; + private SdkHttpClient httpClient; + private KinesisClient kinesisClient; + + @BeforeEach + void setUp() { + System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + httpClient = AWSServicesTestUtils.createHttpClient(); + kinesisClient = + AWSServicesTestUtils.createAwsSyncClient( + MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient, KinesisClient.builder()); + } + + @AfterEach + void teardown() { + System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); + AWSGeneralUtil.closeResources(httpClient, kinesisClient); + } + + @Test + void nonExistentStreamShouldResultInFailure() { + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy( + () -> + new Scenario() + .localstackStreamName("stream-exists") + .withSourceConnectionStreamArn( + "arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists") + .runScenario()) + .withStackTraceContaining( + "Stream arn arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not found"); + } + + @Test + void validStreamIsConsumed() throws Exception { + new Scenario() + .localstackStreamName("valid-stream") + .withSourceConnectionStreamArn( + "arn:aws:kinesis:ap-southeast-1:000000000000:stream/valid-stream") + .runScenario(); + } + + private Configuration getDefaultConfiguration() { + Configuration configuration = new Configuration(); + configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint()); + configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId"); + configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey"); + configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString()); + configuration.setString(TRUST_ALL_CERTIFICATES, "true"); + configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1"); + return configuration; + } + + private class Scenario { + private final int expectedElements = 1000; + private String localstackStreamName = null; + private String sourceConnectionStreamArn; + private final Configuration configuration = + KinesisStreamsSourceITCase.this.getDefaultConfiguration(); + + public void runScenario() throws Exception { + if (localstackStreamName != null) { + prepareStream(localstackStreamName); + } + + putRecords(localstackStreamName, expectedElements); + + KinesisStreamsSource kdsSource = + KinesisStreamsSource.builder() + .setStreamArn(sourceConnectionStreamArn) + .setSourceConfig(configuration) + .setDeserializationSchema(new SimpleStringSchema()) + .build(); + + List result = + env.fromSource(kdsSource, WatermarkStrategy.noWatermarks(), "Kinesis source") + .returns(TypeInformation.of(String.class)) + .executeAndCollect(expectedElements); + + Assertions.assertThat(result.size()).isEqualTo(expectedElements); + } + + public Scenario withSourceConnectionStreamArn(String sourceConnectionStreamArn) { + this.sourceConnectionStreamArn = sourceConnectionStreamArn; + return this; + } + + public Scenario localstackStreamName(String localstackStreamName) { + this.localstackStreamName = localstackStreamName; + return this; + } + + private void prepareStream(String streamName) throws Exception { + final RateLimiter rateLimiter = + RateLimiterBuilder.newBuilder() + .withRate(1, SECONDS) + .withConstantThroughput() + .build(); + + kinesisClient.createStream( + CreateStreamRequest.builder().streamName(streamName).shardCount(1).build()); + + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1)); + while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) { + if (deadline.isOverdue()) { + throw new RuntimeException("Failed to create stream within time"); + } + } + } + + private void putRecords(String streamName, int numRecords) { + List messages = + IntStream.range(0, numRecords) + .mapToObj(String::valueOf) + .map(String::getBytes) + .collect(Collectors.toList()); + + for (List partition : Lists.partition(messages, 500)) { + List entries = + partition.stream() + .map( + msg -> + PutRecordsRequestEntry.builder() + .partitionKey("fakePartitionKey") + .data(SdkBytes.fromByteArray(msg)) + .build()) + .collect(Collectors.toList()); + PutRecordsRequest requests = + PutRecordsRequest.builder().streamName(streamName).records(entries).build(); + PutRecordsResponse putRecordResult = kinesisClient.putRecords(requests); + for (PutRecordsResultEntry result : putRecordResult.records()) { + LOG.debug("Added record: {}", result.sequenceNumber()); + } + } + } + + private boolean streamExists(final String streamName) { + try { + return kinesisClient + .describeStream( + DescribeStreamRequest.builder() + .streamName(streamName) + .build()) + .streamDescription() + .streamStatus() + == StreamStatus.ACTIVE; + } catch (Exception e) { + return false; + } + } + } +}