From 4bcde00a0624a9a4e4ef11976264994e1befcd57 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Fri, 16 Aug 2024 09:03:42 -0700 Subject: [PATCH] Add support for extensions to configure lease coordination table. Signed-off-by: Souvik Bose --- .../kinesis-source/build.gradle | 4 ++ .../DefaultKinesisLeaseConfigSupplier.java | 17 ++++++ .../kinesis/extension/KinesisLeaseConfig.java | 10 ++++ .../KinesisLeaseConfigExtension.java | 21 ++++++++ .../extension/KinesisLeaseConfigProvider.java | 23 ++++++++ .../extension/KinesisLeaseConfigSupplier.java | 10 ++++ .../source}/ClientFactory.java | 4 +- .../source}/KinesisMultiStreamTracker.java | 6 +-- .../source}/KinesisService.java | 23 ++++---- .../source}/KinesisSource.java | 13 +++-- .../source}/WorkerIdentifierGenerator.java | 2 +- .../AwsAuthenticationConfig.java | 2 +- .../configuration/ConsumerStrategy.java | 2 +- .../configuration/KinesisSourceConfig.java | 6 +-- .../configuration/KinesisStreamConfig.java | 2 +- .../KinesisStreamPollingConfig.java | 2 +- .../metrics/MicrometerFilterMetricsScope.java | 38 +++++++++++++ .../metrics/MicrometerMetricFactory.java | 18 +++++++ .../processor/KinesisRecordProcessor.java | 8 +-- .../KinesisShardRecordProcessorFactory.java | 4 +- ...DefaultKinesisLeaseConfigSupplierTest.java | 44 +++++++++++++++ .../KinesisLeaseConfigExtensionTest.java | 38 +++++++++++++ .../KinesisLeaseConfigProviderTest.java | 47 ++++++++++++++++ .../extension/KinesisLeaseConfigTest.java | 53 +++++++++++++++++++ .../source}/ClientFactoryTest.java | 4 +- .../KinesisMultiStreamTrackerTest.java | 6 +-- .../source}/KinesisServiceTest.java | 34 ++++++++---- .../source}/KinesisSourceTest.java | 24 ++++++--- .../AwsAuthenticationConfigTest.java | 2 +- .../KinesisSourceConfigTest.java | 2 +- .../KinesisStreamPollingConfigTest.java | 2 +- .../processor/KinesisRecordProcessorTest.java | 12 ++--- ...inesisShardRecordProcessorFactoryTest.java | 6 +-- .../simple_pipeline_with_extensions.yaml | 3 ++ 34 files changed, 425 insertions(+), 67 deletions(-) create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProvider.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/ClientFactory.java (94%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisMultiStreamTracker.java (95%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisService.java (88%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisSource.java (79%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/WorkerIdentifierGenerator.java (90%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/AwsAuthenticationConfig.java (98%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/ConsumerStrategy.java (86%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/KinesisSourceConfig.java (91%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/KinesisStreamConfig.java (93%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/KinesisStreamPollingConfig.java (88%) create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerFilterMetricsScope.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerMetricFactory.java rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/processor/KinesisRecordProcessor.java (97%) rename data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/processor/KinesisShardRecordProcessorFactory.java (94%) create mode 100644 data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java create mode 100644 data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtensionTest.java create mode 100644 data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProviderTest.java create mode 100644 data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/ClientFactoryTest.java (94%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisMultiStreamTrackerTest.java (97%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisServiceTest.java (89%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/KinesisSourceTest.java (83%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/AwsAuthenticationConfigTest.java (99%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/KinesisSourceConfigTest.java (99%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/configuration/KinesisStreamPollingConfigTest.java (90%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/processor/KinesisRecordProcessorTest.java (97%) rename data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/{source/kinesis => kinesis/source}/processor/KinesisShardRecordProcessorFactoryTest.java (94%) create mode 100644 data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle index 400fe22c00..f5bb41385a 100644 --- a/data-prepper-plugins/kinesis-source/build.gradle +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -29,6 +29,10 @@ dependencies { testImplementation platform('org.junit:junit-bom:5.9.1') testImplementation 'org.junit.jupiter:junit-jupiter' testImplementation project(':data-prepper-test-event') + testImplementation project(':data-prepper-core') + testImplementation project(':data-prepper-plugin-framework') + testImplementation project(':data-prepper-pipeline-parser') + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java new file mode 100644 index 0000000000..f7e4bd9e87 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplier.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import java.util.Optional; + +public class DefaultKinesisLeaseConfigSupplier implements KinesisLeaseConfigSupplier { + + private KinesisLeaseConfig kinesisLeaseConfig; + + public DefaultKinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) { + this.kinesisLeaseConfig = kinesisLeaseConfig; + } + + @Override + public Optional getKinesisExtensionLeaseConfig() { + return kinesisLeaseConfig != null ? Optional.of(kinesisLeaseConfig) : Optional.empty(); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java new file mode 100644 index 0000000000..fbafce9d7c --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java @@ -0,0 +1,10 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; + +@Getter +public class KinesisLeaseConfig { + @JsonProperty("lease_coordination_table") + private String leaseCoordinationTable; +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java new file mode 100644 index 0000000000..5e6ad46efe --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtension.java @@ -0,0 +1,21 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; +import org.opensearch.dataprepper.model.plugin.ExtensionPoints; + +@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis_lease_config", allowInPipelineConfigurations = true) +public class KinesisLeaseConfigExtension implements ExtensionPlugin { + + private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + @DataPrepperPluginConstructor + public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) { + this.kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig); + } + + @Override + public void apply(final ExtensionPoints extensionPoints) { + extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier)); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProvider.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProvider.java new file mode 100644 index 0000000000..30165c001c --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProvider.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; + +import java.util.Optional; + +class KinesisLeaseConfigProvider implements ExtensionProvider { + private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + + public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) { + this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier; + } + + @Override + public Optional provideInstance(Context context) { + return Optional.of(this.kinesisLeaseConfigSupplier); + } + + @Override + public Class supportedClass() { + return KinesisLeaseConfigSupplier.class; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java new file mode 100644 index 0000000000..3db85d3ca5 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplier.java @@ -0,0 +1,10 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import java.util.Optional; + +public interface KinesisLeaseConfigSupplier { + + default Optional getKinesisExtensionLeaseConfig() { + return Optional.empty(); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java similarity index 94% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactory.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java index 5abf9185d0..dd12324f1e 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactory.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java similarity index 95% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java index 5da7a3d160..166be331ea 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTracker.java @@ -1,7 +1,7 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java similarity index 88% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 0d48aaf161..dd0ad61249 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -1,6 +1,5 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; -import io.micrometer.core.instrument.util.StringUtils; import lombok.Setter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -9,9 +8,10 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.ConsumerStrategy; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisShardRecordProcessorFactory; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisShardRecordProcessorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; @@ -56,8 +56,9 @@ public KinesisService(final KinesisSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription, - final AcknowledgementSetManager acknowledgementSetManager - ){ + final AcknowledgementSetManager acknowledgementSetManager, + final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier + ){ this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.pluginFactory = pluginFactory; @@ -67,8 +68,10 @@ public KinesisService(final KinesisSourceConfig sourceConfig, this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient(); this.pipelineName = pipelineDescription.getPipelineName(); this.applicationName = pipelineName; - this.tableName = StringUtils.isNotEmpty(sourceConfig.getLeaseCoordinationTable()) ? - sourceConfig.getLeaseCoordinationTable() : applicationName; + if (kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().isEmpty()) { + throw new IllegalStateException("Lease Coordination table should be provided!"); + } + this.tableName = kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable(); this.executorService = Executors.newFixedThreadPool(1); } @@ -115,7 +118,7 @@ public Scheduler createScheduler(final Buffer> buffer) { new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName), applicationName, kinesisClient, dynamoDbClient, cloudWatchClient, new WorkerIdentifierGenerator().generate(), processorFactory - ).tableName(applicationName); + ).tableName(tableName); ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy(); if (consumerStrategy == ConsumerStrategy.POLLING) { diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java similarity index 79% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java index 98069b5bb8..a88cff0de0 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSource.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import lombok.Setter; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -12,7 +12,8 @@ import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +21,7 @@ public class KinesisSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); private final KinesisSourceConfig kinesisSourceConfig; + private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; @Setter private KinesisService kinesisService; @@ -30,10 +32,13 @@ public KinesisSource(final KinesisSourceConfig kinesisSourceConfig, final PluginFactory pluginFactory, final PipelineDescription pipelineDescription, final AwsCredentialsSupplier awsCredentialsSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) { this.kinesisSourceConfig = kinesisSourceConfig; + this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier; ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig()); - this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager); + this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); } @Override public void start(final Buffer> buffer) { diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/WorkerIdentifierGenerator.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java similarity index 90% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/WorkerIdentifierGenerator.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java index 9390e25920..7686861a87 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/WorkerIdentifierGenerator.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/WorkerIdentifierGenerator.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import java.net.InetAddress; import java.net.UnknownHostException; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java similarity index 98% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfig.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java index d51a7ee6a4..27774828ef 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfig.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; /* * Copyright OpenSearch Contributors diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/ConsumerStrategy.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java similarity index 86% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/ConsumerStrategy.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java index 7b7a7ddf9d..fa507f55d5 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/ConsumerStrategy.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/ConsumerStrategy.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.annotation.JsonValue; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java similarity index 91% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfig.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index 3aefe7bece..4be5bb7f95 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; @@ -48,10 +48,6 @@ public class KinesisSourceConfig { @JsonProperty("polling") private KinesisStreamPollingConfig pollingConfig; - @JsonProperty("lease_coordination_table") - @Getter - private String leaseCoordinationTable; - @Getter @JsonProperty("codec") private PluginModel codec; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java similarity index 93% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamConfig.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java index 49e3a5743f..3faf55469c 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamConfig.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java similarity index 88% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfig.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java index e48908d7f8..36dbedbdd6 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfig.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerFilterMetricsScope.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerFilterMetricsScope.java new file mode 100644 index 0000000000..bfab36170f --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerFilterMetricsScope.java @@ -0,0 +1,38 @@ +//package org.opensearch.dataprepper.plugins.kinesis.source.metrics; +// +//import com.google.common.collect.ImmutableSet; +//import org.opensearch.dataprepper.metrics.PluginMetrics; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +//import software.amazon.kinesis.metrics.FilteringMetricsScope; +//import software.amazon.kinesis.metrics.MetricsLevel; +// +//public class MicrometerFilterMetricsScope extends FilteringMetricsScope { +// +// private static final Logger LOG = LoggerFactory.getLogger(MicrometerFilterMetricsScope.class); +// +// private final PluginMetrics pluginMetrics; +// +// public MicrometerFilterMetricsScope(final PluginMetrics pluginMetrics) { +// super(MetricsLevel.SUMMARY, ImmutableSet.of(METRICS_DIMENSIONS_ALL)); +// this.pluginMetrics = pluginMetrics; +// } +// +// @Override +// public void addData(String name, double value, StandardUnit unit) { +// // TODO: report metrics. +// LOG.info("{} {} {}", name, value, unit); +// } +// +// @Override +// public void addData(String name, double value, StandardUnit unit, MetricsLevel level) { +// // TODO: report metrics. +// LOG.info("{} {} {} {}", name, value, unit, level); +// } +// +// @Override +// public void end() { +// // TODO: Update this +// } +//} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerMetricFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerMetricFactory.java new file mode 100644 index 0000000000..114e1ca4fa --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/metrics/MicrometerMetricFactory.java @@ -0,0 +1,18 @@ +//package org.opensearch.dataprepper.plugins.kinesis.source.metrics; +// +//import org.opensearch.dataprepper.metrics.PluginMetrics; +//import software.amazon.kinesis.metrics.MetricsFactory; +//import software.amazon.kinesis.metrics.MetricsScope; +// +//public class MicrometerMetricFactory implements MetricsFactory { +// +// final PluginMetrics pluginMetrics; +// +// public MicrometerMetricFactory(PluginMetrics pluginMetrics) { +// this.pluginMetrics = pluginMetrics; +// } +// +// public MetricsScope createMetrics() { +// return new MicrometerFilterMetricsScope(pluginMetrics); +// } +//} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java similarity index 97% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessor.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 0a7c7c0ebe..bfa7c2c023 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.processor; +package org.opensearch.dataprepper.plugins.kinesis.source.processor; import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -11,9 +11,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.KinesisSource; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.KinesisSource; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.kinesis.common.StreamIdentifier; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java similarity index 94% rename from data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactory.java rename to data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java index 91294857b4..f551c503e5 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.processor; +package org.opensearch.dataprepper.plugins.kinesis.source.processor; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -6,7 +6,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java new file mode 100644 index 0000000000..9914c30fbf --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/DefaultKinesisLeaseConfigSupplierTest.java @@ -0,0 +1,44 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class DefaultKinesisLeaseConfigSupplierTest { + private static final String LEASE_COORDINATION_TABLE = "lease-table"; + @Mock + KinesisLeaseConfig kinesisLeaseConfig; + + private DefaultKinesisLeaseConfigSupplier createObjectUnderTest() { + return new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig); + } + + @Test + void testGetters() { + when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(LEASE_COORDINATION_TABLE); + KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = createObjectUnderTest(); + assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable(), equalTo(LEASE_COORDINATION_TABLE)); + } + + @Test + void testGettersWithNullTableConfig() { + when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(null); + DefaultKinesisLeaseConfigSupplier defaultKinesisLeaseConfigSupplier = createObjectUnderTest(); + assertThat(defaultKinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable(), equalTo(null)); + + } + + @Test + void testGettersWithNullConfig() { + KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(null); + assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig(), equalTo(Optional.empty())); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtensionTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtensionTest.java new file mode 100644 index 0000000000..4476d28def --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigExtensionTest.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.opensearch.dataprepper.model.plugin.ExtensionPoints; +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class KinesisLeaseConfigExtensionTest { + @Mock + private ExtensionPoints extensionPoints; + + @Mock + private KinesisLeaseConfig kinesisLeaseConfig; + + private KinesisLeaseConfigExtension createObjectUnderTest() { + return new KinesisLeaseConfigExtension(kinesisLeaseConfig); + } + + @Test + void applyShouldAddExtensionProvider() { + extensionPoints = mock(ExtensionPoints.class); + createObjectUnderTest().apply(extensionPoints); + final ArgumentCaptor extensionProviderArgumentCaptor = + ArgumentCaptor.forClass(ExtensionProvider.class); + + verify(extensionPoints).addExtensionProvider(extensionProviderArgumentCaptor.capture()); + + final ExtensionProvider actualExtensionProvider = extensionProviderArgumentCaptor.getValue(); + + assertThat(actualExtensionProvider, instanceOf(KinesisLeaseConfigProvider.class)); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProviderTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProviderTest.java new file mode 100644 index 0000000000..50bc02d628 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigProviderTest.java @@ -0,0 +1,47 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; + +@ExtendWith(MockitoExtension.class) +public class KinesisLeaseConfigProviderTest { + @Mock + private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + + @Mock + private ExtensionProvider.Context context; + + private KinesisLeaseConfigProvider createObjectUnderTest() { + return new KinesisLeaseConfigProvider(kinesisLeaseConfigSupplier); + } + + @Test + void supportedClassReturnsKinesisSourceConfigSupplier() { + assertThat(createObjectUnderTest().supportedClass(), equalTo(KinesisLeaseConfigSupplier.class)); + } + + @Test + void provideInstanceReturnsKinesisSourceConfigSupplierFromConstructor() { + final KinesisLeaseConfigProvider objectUnderTest = createObjectUnderTest(); + + final Optional optionalKinesisSourceConfigSupplier = objectUnderTest.provideInstance(context); + assertThat(optionalKinesisSourceConfigSupplier, notNullValue()); + assertThat(optionalKinesisSourceConfigSupplier.isPresent(), equalTo(true)); + assertThat(optionalKinesisSourceConfigSupplier.get(), equalTo(kinesisLeaseConfigSupplier)); + + final Optional anotherOptionalKinesisSourceConfigSupplier = objectUnderTest.provideInstance(context); + assertThat(anotherOptionalKinesisSourceConfigSupplier, notNullValue()); + assertThat(anotherOptionalKinesisSourceConfigSupplier.isPresent(), equalTo(true)); + assertThat(anotherOptionalKinesisSourceConfigSupplier.get(), sameInstance(optionalKinesisSourceConfigSupplier.get())); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java new file mode 100644 index 0000000000..9731a7fda9 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java @@ -0,0 +1,53 @@ +package org.opensearch.dataprepper.plugins.kinesis.extension; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.opensearch.dataprepper.pipeline.parser.ByteCountDeserializer; +import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationDeserializer; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.time.Duration; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KinesisLeaseConfigTest { + private static SimpleModule simpleModule = new SimpleModule() + .addDeserializer(Duration.class, new DataPrepperDurationDeserializer()) + .addDeserializer(ByteCount.class, new ByteCountDeserializer()); + private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()).registerModule(simpleModule); + + private KinesisLeaseConfig makeConfig(String filePath) throws IOException { + final File configurationFile = new File(filePath); + final DataPrepperConfiguration dataPrepperConfiguration = OBJECT_MAPPER.readValue(configurationFile, DataPrepperConfiguration.class); + assertThat(dataPrepperConfiguration, notNullValue()); + assertThat(dataPrepperConfiguration.getPipelineExtensions(), notNullValue()); + final Map kinesisLeaseConfigMap = + (Map) dataPrepperConfiguration.getPipelineExtensions().getExtensionMap().get("kinesis_lease_config"); + String json = OBJECT_MAPPER.writeValueAsString(kinesisLeaseConfigMap); + Reader reader = new StringReader(json); + return OBJECT_MAPPER.readValue(reader, KinesisLeaseConfig.class); + } + + + @Test + void testConfigWithTestExtension() throws IOException { + final KinesisLeaseConfig kinesisLeaseConfig = makeConfig( + "src/test/resources/simple_pipeline_with_extensions.yaml"); + + assertNotNull(kinesisLeaseConfig.getLeaseCoordinationTable()); + assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable(), "kinesis-pipeline-kcl"); + + } + +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactoryTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java similarity index 94% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactoryTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java index 3365a1e690..f2e4ed7797 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/ClientFactoryTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/ClientFactoryTest.java @@ -1,8 +1,8 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTrackerTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTrackerTest.java similarity index 97% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTrackerTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTrackerTest.java index 637cb16375..4f90243939 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTrackerTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisMultiStreamTrackerTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Assertions; @@ -6,8 +6,8 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java similarity index 89% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisServiceTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 3a631dae97..d523a1c768 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -10,11 +10,13 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.ConsumerStrategy; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamPollingConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamPollingConfig; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -31,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -98,6 +101,12 @@ public class KinesisServiceTest { @Mock private Scheduler scheduler; + @Mock + KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + + @Mock + KinesisLeaseConfig kinesisLeaseConfig; + @BeforeEach void setup() { awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); @@ -111,6 +120,10 @@ void setup() { scheduler = mock(Scheduler.class); pipelineDescription = mock(PipelineDescription.class); buffer = mock(Buffer.class); + kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); + kinesisLeaseConfig = mock(KinesisLeaseConfig.class); + when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn("kinesis-lease-table"); + when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig)); when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); @@ -156,7 +169,8 @@ void setup() { } public KinesisService createObjectUnderTest() { - return new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager); + return new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); } @Test @@ -168,7 +182,8 @@ void testServiceStart() { @Test void testCreateScheduler() { - KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); assertNotNull(schedulerObjectUnderTest); @@ -185,7 +200,8 @@ void testCreateScheduler() { @Test void testCreateSchedulerWithPollingStrategy() { when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); - KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); assertNotNull(schedulerObjectUnderTest); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java similarity index 83% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java index d367780571..69249b82e4 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis; +package org.opensearch.dataprepper.plugins.kinesis.source; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -11,13 +11,16 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.awssdk.regions.Region; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,6 +61,12 @@ public class KinesisSourceTest { @Mock KinesisService kinesisService; + @Mock + KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + + @Mock + KinesisLeaseConfig kinesisLeaseConfig; + @BeforeEach void setup() { pluginMetrics = mock(PluginMetrics.class); @@ -68,7 +77,10 @@ void setup() { awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); kinesisService = mock(KinesisService.class); - + kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); + kinesisLeaseConfig = mock(KinesisLeaseConfig.class); + when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn("kinesis-lease-table"); + when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig)); when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); when(awsAuthenticationConfig.getAwsStsExternalId()).thenReturn(UUID.randomUUID().toString()); @@ -79,7 +91,7 @@ void setup() { } public KinesisSource createObjectUnderTest() { - return new KinesisSource(kinesisSourceConfig, pluginMetrics, pluginFactory, pipelineDescription, awsCredentialsSupplier, acknowledgementSetManager); + return new KinesisSource(kinesisSourceConfig, pluginMetrics, pluginFactory, pipelineDescription, awsCredentialsSupplier, acknowledgementSetManager, kinesisLeaseConfigSupplier); } @Test diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java similarity index 99% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfigTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java index bfb6debc6f..09c64966f7 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/AwsAuthenticationConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/AwsAuthenticationConfigTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.databind.ObjectMapper; import org.hamcrest.CoreMatchers; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java similarity index 99% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfigTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 1f2b723919..863e7fb4b2 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java similarity index 90% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfigTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java index 342e9d70f6..21e83e4e39 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/configuration/KinesisStreamPollingConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisStreamPollingConfigTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.configuration; +package org.opensearch.dataprepper.plugins.kinesis.source.configuration; import org.junit.jupiter.api.Test; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java similarity index 97% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessorTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index 8ccfbd235a..bb851837ac 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.processor; +package org.opensearch.dataprepper.plugins.kinesis.source.processor; import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; @@ -17,8 +17,8 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; @@ -48,9 +48,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES; -import static org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS; -import static org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY; public class KinesisRecordProcessorTest { private KinesisRecordProcessor kinesisRecordProcessor; diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactoryTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java similarity index 94% rename from data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactoryTest.java rename to data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java index 1bfe08ad6a..a0cdf07ea8 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/source/kinesis/processor/KinesisShardRecordProcessorFactoryTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactoryTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.source.kinesis.processor; +package org.opensearch.dataprepper.plugins.kinesis.source.processor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -12,8 +12,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig; -import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import java.util.Collections; diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml new file mode 100644 index 0000000000..a1c9be1e9d --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml @@ -0,0 +1,3 @@ +extensions: + kinesis_lease_config: + lease_coordination_table: "kinesis-pipeline-kcl" \ No newline at end of file