Skip to content

Commit

Permalink
[FLINK-31986][Connectors/Kinesis] Setup integration tests for source
Browse files Browse the repository at this point in the history
  • Loading branch information
elphastori committed Oct 3, 2024
1 parent 7fbca55 commit 95b17b9
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,8 @@ public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configPr
return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
}

public static SdkAsyncHttpClient createAsyncHttpClient(
final Properties configProperties,
final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
@VisibleForTesting
static AttributeMap getSdkHttpConfigurationOptions(final Properties configProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);

Expand Down Expand Up @@ -335,7 +334,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));
return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);

return clientConfiguration.build();
}

public static SdkAsyncHttpClient createAsyncHttpClient(
final Properties configProperties,
final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
return createAsyncHttpClient(
getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
}

public static SdkAsyncHttpClient createAsyncHttpClient(
Expand All @@ -355,6 +362,12 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
}

public static SdkHttpClient createSyncHttpClient(
final Properties configProperties, final ApacheHttpClient.Builder httpClientBuilder) {
return createSyncHttpClient(
getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
}

public static SdkHttpClient createSyncHttpClient(
final AttributeMap config, final ApacheHttpClient.Builder httpClientBuilder) {
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public static void createBucket(S3Client s3Client, String bucketName) {
}

public static void createIAMRole(IamClient iam, String roleName) {
CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
CreateRoleRequest request =
CreateRoleRequest.builder()
.roleName(roleName)
.assumeRolePolicyDocument("{}")
.build();

iam.createRole(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.connector.aws.util;

import org.apache.flink.connector.aws.config.AWSConfigConstants;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
Expand All @@ -28,7 +30,9 @@
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
Expand All @@ -50,6 +54,7 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
import static org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -204,6 +209,21 @@ void testCreateAwsSyncClientWithOverrideConfiguration() {
.isTrue();
}

@Test
void testGetSdkHttpConfigurationOptions() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true");
properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
AttributeMap options = getSdkHttpConfigurationOptions(properties);

assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue();
assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse();
assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse();
assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
.isTrue();
assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1);
}

@Test
void testCreateKinesisAsyncClientWithEndpointOverride() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -193,10 +192,6 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
}

private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());

String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
Expand All @@ -219,6 +214,10 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));

SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
kinesisClientProperties, ApacheHttpClient.builder());

AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package org.apache.flink.connector.kinesis.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;

import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;

/** IT cases for using {@code KinesisStreamsSource} using a localstack container. */
@Testcontainers
@ExtendWith(MiniClusterExtension.class)
public class KinesisStreamsSourceITCase {

private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceITCase.class);
private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";

@Container
private static final LocalstackContainer MOCK_KINESIS_CONTAINER =
new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION));

private StreamExecutionEnvironment env;
private SdkHttpClient httpClient;
private KinesisClient kinesisClient;

@BeforeEach
void setUp() {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

httpClient = AWSServicesTestUtils.createHttpClient();
kinesisClient =
AWSServicesTestUtils.createAwsSyncClient(
MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient, KinesisClient.builder());
}

@AfterEach
void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
}

@Test
void nonExistentStreamShouldResultInFailure() {
Assertions.assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(
() ->
new Scenario()
.localstackStreamName("stream-exists")
.withSourceConnectionStreamArn(
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists")
.runScenario())
.havingCause()
.havingCause()
.havingCause()
.havingCause()
.havingCause()
.havingCause()
.havingCause()
.withMessageContaining(
"Stream arn arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not found");
}

private Configuration getDefaultConfiguration() {
Configuration configuration = new Configuration();
configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint());
configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId");
configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey");
configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
configuration.setString(TRUST_ALL_CERTIFICATES, "true");
configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1");
return configuration;
}

private class Scenario {
private final int expectedElements = 1000;
private String localstackStreamName = null;
private String sourceConnectionStreamArn;
private final Configuration configuration =
KinesisStreamsSourceITCase.this.getDefaultConfiguration();

public void runScenario() throws Exception {
if (localstackStreamName != null) {
prepareStream(localstackStreamName);
}

putRecords(localstackStreamName, expectedElements);

KinesisStreamsSource<String> kdsSource =
KinesisStreamsSource.<String>builder()
.setStreamArn(sourceConnectionStreamArn)
.setSourceConfig(configuration)
.setDeserializationSchema(new SimpleStringSchema())
.build();

List<String> result =
env.fromSource(kdsSource, WatermarkStrategy.noWatermarks(), "Kinesis source")
.returns(TypeInformation.of(String.class))
.executeAndCollect(expectedElements);

Assertions.assertThat(result.size()).isEqualTo(expectedElements);
}

public Scenario withSourceConnectionStreamArn(String sourceConnectionStreamArn) {
this.sourceConnectionStreamArn = sourceConnectionStreamArn;
return this;
}

public Scenario localstackStreamName(String localstackStreamName) {
this.localstackStreamName = localstackStreamName;
return this;
}

private void prepareStream(String streamName) throws Exception {
final RateLimiter rateLimiter =
RateLimiterBuilder.newBuilder()
.withRate(1, SECONDS)
.withConstantThroughput()
.build();

kinesisClient.createStream(
CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());

Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
if (deadline.isOverdue()) {
throw new RuntimeException("Failed to create stream within time");
}
}
}

private void putRecords(String streamName, int numRecords) {
List<byte[]> messages =
IntStream.range(0, numRecords)
.mapToObj(String::valueOf)
.map(String::getBytes)
.collect(Collectors.toList());

for (List<byte[]> partition : Lists.partition(messages, 500)) {
List<PutRecordsRequestEntry> entries =
partition.stream()
.map(
msg ->
PutRecordsRequestEntry.builder()
.partitionKey("fakePartitionKey")
.data(SdkBytes.fromByteArray(msg))
.build())
.collect(Collectors.toList());
PutRecordsRequest requests =
PutRecordsRequest.builder().streamName(streamName).records(entries).build();
PutRecordsResponse putRecordResult = kinesisClient.putRecords(requests);
for (PutRecordsResultEntry result : putRecordResult.records()) {
LOG.debug("Added record: {}", result.sequenceNumber());
}
}
}

private boolean streamExists(final String streamName) {
try {
return kinesisClient
.describeStream(
DescribeStreamRequest.builder()
.streamName(streamName)
.build())
.streamDescription()
.streamStatus()
== StreamStatus.ACTIVE;
} catch (Exception e) {
return false;
}
}
}
}

0 comments on commit 95b17b9

Please sign in to comment.