-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support AWS Kinesis Data Streams as a Source #4836
Changes from 7 commits
c2dcf76
1523b8a
f8f89fb
0567858
63a2f91
0fd763b
92a797b
806f78d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation project(path: ':data-prepper-plugins:buffer-common') | ||
implementation project(path: ':data-prepper-plugins:aws-plugin-api') | ||
implementation 'com.fasterxml.jackson.core:jackson-core' | ||
implementation 'io.micrometer:micrometer-core' | ||
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0' | ||
compileOnly 'org.projectlombok:lombok:1.18.20' | ||
annotationProcessor 'org.projectlombok:lombok:1.18.20' | ||
|
||
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' | ||
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' | ||
testImplementation project(':data-prepper-test-common') | ||
testImplementation project(':data-prepper-test-event') | ||
testImplementation project(':data-prepper-core') | ||
testImplementation project(':data-prepper-plugin-framework') | ||
testImplementation project(':data-prepper-pipeline-parser') | ||
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' | ||
testImplementation project(':data-prepper-plugins:parse-json-processor') | ||
testImplementation project(':data-prepper-plugins:newline-codecs') | ||
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { //in addition to core projects rule | ||
limit { | ||
minimum = 1.0 | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.extension; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import lombok.Getter; | ||
|
||
@Getter | ||
public class KinesisLeaseConfig { | ||
@JsonProperty("lease_coordination") | ||
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.extension; | ||
|
||
import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; | ||
import org.opensearch.dataprepper.model.plugin.ExtensionPoints; | ||
|
||
@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis", allowInPipelineConfigurations = true) | ||
public class KinesisLeaseConfigExtension implements ExtensionPlugin { | ||
|
||
private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; | ||
@DataPrepperPluginConstructor | ||
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) { | ||
this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig); | ||
} | ||
|
||
@Override | ||
public void apply(final ExtensionPoints extensionPoints) { | ||
extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.extension; | ||
|
||
import org.opensearch.dataprepper.model.plugin.ExtensionProvider; | ||
|
||
import java.util.Optional; | ||
|
||
class KinesisLeaseConfigProvider implements ExtensionProvider<KinesisLeaseConfigSupplier> { | ||
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; | ||
|
||
public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) { | ||
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier; | ||
} | ||
|
||
@Override | ||
public Optional<KinesisLeaseConfigSupplier> provideInstance(Context context) { | ||
return Optional.of(this.kinesisLeaseConfigSupplier); | ||
} | ||
|
||
@Override | ||
public Class<KinesisLeaseConfigSupplier> supportedClass() { | ||
return KinesisLeaseConfigSupplier.class; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.extension; | ||
|
||
import java.util.Optional; | ||
|
||
public class KinesisLeaseConfigSupplier { | ||
|
||
private KinesisLeaseConfig kinesisLeaseConfig; | ||
|
||
public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) { | ||
this.kinesisLeaseConfig = kinesisLeaseConfig; | ||
} | ||
|
||
public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() { | ||
return Optional.ofNullable(kinesisLeaseConfig); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.extension; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import lombok.Getter; | ||
import lombok.NonNull; | ||
import software.amazon.awssdk.regions.Region; | ||
|
||
@Getter | ||
public class KinesisLeaseCoordinationTableConfig { | ||
|
||
@JsonProperty("table_name") | ||
@NonNull | ||
private String tableName; | ||
dlvenable marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@JsonProperty("region") | ||
@NonNull | ||
sb2k16 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private String region; | ||
|
||
public Region getAwsRegion() { | ||
return Region.of(region); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source; | ||
|
||
import java.net.InetAddress; | ||
import java.net.UnknownHostException; | ||
|
||
/** | ||
* Generate a unique ID to represent a consumer application instance. | ||
*/ | ||
public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator { | ||
|
||
private static final String hostName; | ||
|
||
static { | ||
try { | ||
hostName = InetAddress.getLocalHost().getHostName(); | ||
} catch (final UnknownHostException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
|
||
/** | ||
* @return Default to use host name. | ||
*/ | ||
@Override | ||
public String generate() { | ||
return hostName; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source; | ||
|
||
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; | ||
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; | ||
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; | ||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; | ||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | ||
import software.amazon.kinesis.common.KinesisClientUtil; | ||
|
||
public class KinesisClientFactory { | ||
private final AwsCredentialsProvider awsCredentialsProvider; | ||
private final AwsCredentialsProvider defaultCredentialsProvider; | ||
private final AwsAuthenticationConfig awsAuthenticationConfig; | ||
|
||
public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, | ||
final AwsAuthenticationConfig awsAuthenticationConfig) { | ||
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() | ||
.withRegion(awsAuthenticationConfig.getAwsRegion()) | ||
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn()) | ||
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId()) | ||
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) | ||
.build()); | ||
defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions()); | ||
this.awsAuthenticationConfig = awsAuthenticationConfig; | ||
} | ||
|
||
public DynamoDbAsyncClient buildDynamoDBClient(Region region) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it recommended to use the Async client? If so why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @graytaylor0 for your review. This is required to construct ConfigsBuilder class in KCL |
||
return DynamoDbAsyncClient.builder() | ||
.credentialsProvider(defaultCredentialsProvider) | ||
.region(region) | ||
.build(); | ||
} | ||
|
||
public KinesisAsyncClient buildKinesisAsyncClient(Region region) { | ||
return KinesisClientUtil.createKinesisAsyncClient( | ||
KinesisAsyncClient.builder() | ||
.credentialsProvider(awsCredentialsProvider) | ||
.region(region) | ||
); | ||
} | ||
|
||
public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) { | ||
return CloudWatchAsyncClient.builder() | ||
.credentialsProvider(defaultCredentialsProvider) | ||
.region(region) | ||
.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.kinesis.source; | ||
|
||
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; | ||
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; | ||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | ||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; | ||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; | ||
import software.amazon.awssdk.services.kinesis.model.StreamDescription; | ||
import software.amazon.kinesis.common.InitialPositionInStreamExtended; | ||
import software.amazon.kinesis.common.StreamConfig; | ||
import software.amazon.kinesis.common.StreamIdentifier; | ||
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; | ||
import software.amazon.kinesis.processor.MultiStreamTracker; | ||
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
|
||
public class KinesisMultiStreamTracker implements MultiStreamTracker { | ||
private static final String COLON = ":"; | ||
|
||
private final KinesisAsyncClient kinesisClient; | ||
private final KinesisSourceConfig sourceConfig; | ||
private final String applicationName; | ||
|
||
public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) { | ||
this.kinesisClient = kinesisClient; | ||
this.sourceConfig = sourceConfig; | ||
this.applicationName = applicationName; | ||
} | ||
|
||
@Override | ||
public List<StreamConfig> streamConfigList() { | ||
List<StreamConfig> streamConfigList = new ArrayList<>(); | ||
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) { | ||
StreamConfig streamConfig; | ||
try { | ||
streamConfig = getStreamConfig(kinesisStreamConfig); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What kind of exceptions get thrown here? If it's a config error we should throw InvalidPluginConfigurationException and provide context with the message where we can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an internal |
||
} | ||
streamConfigList.add(streamConfig); | ||
} | ||
return streamConfigList; | ||
} | ||
|
||
private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception { | ||
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig); | ||
return new StreamConfig(sourceStreamIdentifier, | ||
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())); | ||
} | ||
|
||
private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception { | ||
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() | ||
.streamName(kinesisStreamConfig.getName()) | ||
.build(); | ||
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get(); | ||
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription()); | ||
return StreamIdentifier.multiStreamInstance(streamIdentifierString); | ||
} | ||
|
||
private String getStreamIdentifierString(StreamDescription streamDescription) { | ||
String accountId = streamDescription.streamARN().split(COLON)[4]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an actual ARN, or just an identifier? If Arn we should use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @graytaylor0. I will fix this. |
||
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond(); | ||
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond)); | ||
} | ||
|
||
/** | ||
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec | ||
*/ | ||
@Override | ||
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { | ||
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() { | ||
@Override | ||
public Duration waitPeriodToDeleteFormerStreams() { | ||
return Duration.ofSeconds(10); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are trade offs for having this higher or lower? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is the strategy to perform cleaning up of the leases for streams that just has been deleted. The wait period is to allow for any pending processing of records before cleaning up the lease from the database. |
||
} | ||
}; | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every class needs a license header. Please add to all files.
See: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers
Hint: If you use IntelliJ you can configure it to automatically add this. And you can have it add the headers to all the files in this Gradle module (since they are all new).