Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31986][Connectors/Kinesis] Setup integration tests for FLIP-27 Kinesis source #172

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,8 @@ public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configPr
return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
}

public static SdkAsyncHttpClient createAsyncHttpClient(
final Properties configProperties,
final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
@VisibleForTesting
static AttributeMap getSdkHttpConfigurationOptions(final Properties configProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);

Expand Down Expand Up @@ -335,7 +334,15 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));
return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);

return clientConfiguration.build();
}

public static SdkAsyncHttpClient createAsyncHttpClient(
final Properties configProperties,
final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
return createAsyncHttpClient(
getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
}

public static SdkAsyncHttpClient createAsyncHttpClient(
Expand All @@ -355,6 +362,12 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
}

public static SdkHttpClient createSyncHttpClient(
final Properties configProperties, final ApacheHttpClient.Builder httpClientBuilder) {
return createSyncHttpClient(
getSdkHttpConfigurationOptions(configProperties), httpClientBuilder);
}

public static SdkHttpClient createSyncHttpClient(
final AttributeMap config, final ApacheHttpClient.Builder httpClientBuilder) {
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public static void createBucket(S3Client s3Client, String bucketName) {
}

public static void createIAMRole(IamClient iam, String roleName) {
CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
CreateRoleRequest request =
CreateRoleRequest.builder()
.roleName(roleName)
.assumeRolePolicyDocument("{}")
.build();

iam.createRole(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.connector.aws.util;

import org.apache.flink.connector.aws.config.AWSConfigConstants;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
Expand All @@ -28,7 +30,9 @@
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
Expand All @@ -50,6 +54,7 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
import static org.apache.flink.connector.aws.util.AWSGeneralUtil.getSdkHttpConfigurationOptions;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -204,6 +209,21 @@ void testCreateAwsSyncClientWithOverrideConfiguration() {
.isTrue();
}

@Test
void testGetSdkHttpConfigurationOptions() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
properties.setProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true");
properties.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
AttributeMap options = getSdkHttpConfigurationOptions(properties);

assertThat(options.get(SdkHttpConfigurationOption.TCP_KEEPALIVE).booleanValue()).isTrue();
assertThat(options.containsKey(SdkHttpConfigurationOption.MAX_CONNECTIONS)).isFalse();
assertThat(options.containsKey(SdkHttpConfigurationOption.READ_TIMEOUT)).isFalse();
assertThat(options.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
.isTrue();
assertThat(options.get(SdkHttpConfigurationOption.PROTOCOL)).isEqualTo(Protocol.HTTP1_1);
}

@Test
void testCreateKinesisAsyncClientWithEndpointOverride() {
Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>

Comment on lines +125 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we test using s3? Can we use kinesis instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are actually testing Kinesis only. The S3 dependency is required because the LocalStack container in the connector base arbitrarily creates an S3 bucket and lists its contents to test readiness. LocalstackContainer.java#L61-L83

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - thanks!

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -193,10 +192,6 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
}

private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());

String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
Expand All @@ -219,6 +214,10 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));

SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
kinesisClientProperties, ApacheHttpClient.builder());

AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package org.apache.flink.connector.kinesis.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;

import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;

/** IT cases for using {@code KinesisStreamsSource} using a localstack container. */
@Testcontainers
@ExtendWith(MiniClusterExtension.class)
public class KinesisStreamsSourceITCase {

private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceITCase.class);
private static final String LOCALSTACK_DOCKER_IMAGE_VERSION = "localstack/localstack:3.7.2";

@Container
private static final LocalstackContainer MOCK_KINESIS_CONTAINER =
new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION));

private StreamExecutionEnvironment env;
private SdkHttpClient httpClient;
private KinesisClient kinesisClient;

@BeforeEach
void setUp() {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

httpClient = AWSServicesTestUtils.createHttpClient();
kinesisClient =
AWSServicesTestUtils.createAwsSyncClient(
MOCK_KINESIS_CONTAINER.getEndpoint(), httpClient, KinesisClient.builder());
}

@AfterEach
void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
}

@Test
void nonExistentStreamShouldResultInFailure() {
hlteoh37 marked this conversation as resolved.
Show resolved Hide resolved
Assertions.assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(
() ->
new Scenario()
.localstackStreamName("stream-exists")
.withSourceConnectionStreamArn(
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists")
.runScenario())
.withStackTraceContaining(
"Stream arn arn:aws:kinesis:ap-southeast-1:000000000000:stream/stream-not-exists not found");
}

@Test
void validStreamIsConsumed() throws Exception {
new Scenario()
.localstackStreamName("valid-stream")
.withSourceConnectionStreamArn(
"arn:aws:kinesis:ap-southeast-1:000000000000:stream/valid-stream")
.runScenario();
}

private Configuration getDefaultConfiguration() {
Configuration configuration = new Configuration();
configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint());
configuration.setString(AWS_ACCESS_KEY_ID, "accessKeyId");
configuration.setString(AWS_SECRET_ACCESS_KEY, "secretAccessKey");
configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
configuration.setString(TRUST_ALL_CERTIFICATES, "true");
configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1");
return configuration;
}

private class Scenario {
private final int expectedElements = 1000;
private String localstackStreamName = null;
private String sourceConnectionStreamArn;
private final Configuration configuration =
KinesisStreamsSourceITCase.this.getDefaultConfiguration();

public void runScenario() throws Exception {
if (localstackStreamName != null) {
prepareStream(localstackStreamName);
}

putRecords(localstackStreamName, expectedElements);

KinesisStreamsSource<String> kdsSource =
KinesisStreamsSource.<String>builder()
.setStreamArn(sourceConnectionStreamArn)
.setSourceConfig(configuration)
.setDeserializationSchema(new SimpleStringSchema())
.build();

List<String> result =
env.fromSource(kdsSource, WatermarkStrategy.noWatermarks(), "Kinesis source")
.returns(TypeInformation.of(String.class))
.executeAndCollect(expectedElements);

Assertions.assertThat(result.size()).isEqualTo(expectedElements);
}

public Scenario withSourceConnectionStreamArn(String sourceConnectionStreamArn) {
this.sourceConnectionStreamArn = sourceConnectionStreamArn;
return this;
}

public Scenario localstackStreamName(String localstackStreamName) {
this.localstackStreamName = localstackStreamName;
return this;
}

private void prepareStream(String streamName) throws Exception {
final RateLimiter rateLimiter =
RateLimiterBuilder.newBuilder()
.withRate(1, SECONDS)
.withConstantThroughput()
.build();

kinesisClient.createStream(
CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());

Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
if (deadline.isOverdue()) {
throw new RuntimeException("Failed to create stream within time");
}
}
}

private void putRecords(String streamName, int numRecords) {
List<byte[]> messages =
IntStream.range(0, numRecords)
.mapToObj(String::valueOf)
.map(String::getBytes)
.collect(Collectors.toList());

for (List<byte[]> partition : Lists.partition(messages, 500)) {
List<PutRecordsRequestEntry> entries =
partition.stream()
.map(
msg ->
PutRecordsRequestEntry.builder()
.partitionKey("fakePartitionKey")
.data(SdkBytes.fromByteArray(msg))
.build())
.collect(Collectors.toList());
PutRecordsRequest requests =
PutRecordsRequest.builder().streamName(streamName).records(entries).build();
PutRecordsResponse putRecordResult = kinesisClient.putRecords(requests);
for (PutRecordsResultEntry result : putRecordResult.records()) {
LOG.debug("Added record: {}", result.sequenceNumber());
}
}
}

private boolean streamExists(final String streamName) {
try {
return kinesisClient
.describeStream(
DescribeStreamRequest.builder()
.streamName(streamName)
.build())
.streamDescription()
.streamStatus()
== StreamStatus.ACTIVE;
} catch (Exception e) {
return false;
}
}
}
}
Loading