Skip to content

Commit

Permalink
Add support for extensions to configure lease coordination table.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sbose2k21 committed Aug 16, 2024
1 parent 50f02c8 commit 4bcde00
Show file tree
Hide file tree
Showing 34 changed files with 425 additions and 67 deletions.
4 changes: 4 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import java.util.Optional;

public class DefaultKinesisLeaseConfigSupplier implements KinesisLeaseConfigSupplier {

private KinesisLeaseConfig kinesisLeaseConfig;

public DefaultKinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfig = kinesisLeaseConfig;
}

@Override
public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return kinesisLeaseConfig != null ? Optional.of(kinesisLeaseConfig) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

@Getter
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination_table")
private String leaseCoordinationTable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis_lease_config", allowInPipelineConfigurations = true)
public class KinesisLeaseConfigExtension implements ExtensionPlugin {

private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
@DataPrepperPluginConstructor
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfigSupplier = new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Override
public void apply(final ExtensionPoints extensionPoints) {
extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.plugin.ExtensionProvider;

import java.util.Optional;

class KinesisLeaseConfigProvider implements ExtensionProvider<KinesisLeaseConfigSupplier> {
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;

public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
}

@Override
public Optional<KinesisLeaseConfigSupplier> provideInstance(Context context) {
return Optional.of(this.kinesisLeaseConfigSupplier);
}

@Override
public Class<KinesisLeaseConfigSupplier> supportedClass() {
return KinesisLeaseConfigSupplier.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import java.util.Optional;

public interface KinesisLeaseConfigSupplier {

default Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.opensearch.dataprepper.plugins.source.kinesis;
package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.opensearch.dataprepper.plugins.source.kinesis;
package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.dataprepper.plugins.source.kinesis;
package org.opensearch.dataprepper.plugins.kinesis.source;

import io.micrometer.core.instrument.util.StringUtils;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -9,9 +8,10 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisShardRecordProcessorFactory;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisShardRecordProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
Expand Down Expand Up @@ -56,8 +56,9 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final AcknowledgementSetManager acknowledgementSetManager
){
final AcknowledgementSetManager acknowledgementSetManager,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier
){
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
Expand All @@ -67,8 +68,10 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient();
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.tableName = StringUtils.isNotEmpty(sourceConfig.getLeaseCoordinationTable()) ?
sourceConfig.getLeaseCoordinationTable() : applicationName;
if (kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().isEmpty()) {
throw new IllegalStateException("Lease Coordination table should be provided!");
}
this.tableName = kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable();
this.executorService = Executors.newFixedThreadPool(1);
}

Expand Down Expand Up @@ -115,7 +118,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
new WorkerIdentifierGenerator().generate(), processorFactory
).tableName(applicationName);
).tableName(tableName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis;
package org.opensearch.dataprepper.plugins.kinesis.source;

import lombok.Setter;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
Expand All @@ -12,14 +12,16 @@
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "kinesis", pluginType = Source.class, pluginConfigurationType = KinesisSourceConfig.class)
public class KinesisSource implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
private final KinesisSourceConfig kinesisSourceConfig;
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;

@Setter
private KinesisService kinesisService;
Expand All @@ -30,10 +32,13 @@ public KinesisSource(final KinesisSourceConfig kinesisSourceConfig,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
this.kinesisSourceConfig = kinesisSourceConfig;
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig());
this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager);
this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier);
}
@Override
public void start(final Buffer<Record<Event>> buffer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis;
package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.configuration;
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

/*
* Copyright OpenSearch Contributors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.configuration;
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import com.fasterxml.jackson.annotation.JsonValue;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.configuration;
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
Expand Down Expand Up @@ -48,10 +48,6 @@ public class KinesisSourceConfig {
@JsonProperty("polling")
private KinesisStreamPollingConfig pollingConfig;

@JsonProperty("lease_coordination_table")
@Getter
private String leaseCoordinationTable;

@Getter
@JsonProperty("codec")
private PluginModel codec;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.configuration;
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.configuration;
package org.opensearch.dataprepper.plugins.kinesis.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//package org.opensearch.dataprepper.plugins.kinesis.source.metrics;
//
//import com.google.common.collect.ImmutableSet;
//import org.opensearch.dataprepper.metrics.PluginMetrics;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
//import software.amazon.kinesis.metrics.FilteringMetricsScope;
//import software.amazon.kinesis.metrics.MetricsLevel;
//
//public class MicrometerFilterMetricsScope extends FilteringMetricsScope {
//
// private static final Logger LOG = LoggerFactory.getLogger(MicrometerFilterMetricsScope.class);
//
// private final PluginMetrics pluginMetrics;
//
// public MicrometerFilterMetricsScope(final PluginMetrics pluginMetrics) {
// super(MetricsLevel.SUMMARY, ImmutableSet.of(METRICS_DIMENSIONS_ALL));
// this.pluginMetrics = pluginMetrics;
// }
//
// @Override
// public void addData(String name, double value, StandardUnit unit) {
// // TODO: report metrics.
// LOG.info("{} {} {}", name, value, unit);
// }
//
// @Override
// public void addData(String name, double value, StandardUnit unit, MetricsLevel level) {
// // TODO: report metrics.
// LOG.info("{} {} {} {}", name, value, unit, level);
// }
//
// @Override
// public void end() {
// // TODO: Update this
// }
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//package org.opensearch.dataprepper.plugins.kinesis.source.metrics;
//
//import org.opensearch.dataprepper.metrics.PluginMetrics;
//import software.amazon.kinesis.metrics.MetricsFactory;
//import software.amazon.kinesis.metrics.MetricsScope;
//
//public class MicrometerMetricFactory implements MetricsFactory {
//
// final PluginMetrics pluginMetrics;
//
// public MicrometerMetricFactory(PluginMetrics pluginMetrics) {
// this.pluginMetrics = pluginMetrics;
// }
//
// public MetricsScope createMetrics() {
// return new MicrometerFilterMetricsScope(pluginMetrics);
// }
//}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.plugins.source.kinesis.processor;
package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -11,9 +11,9 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.kinesis.KinesisSource;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.KinesisSource;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.common.StreamIdentifier;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.opensearch.dataprepper.plugins.source.kinesis.processor;
package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
Expand Down
Loading

0 comments on commit 4bcde00

Please sign in to comment.