Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sns Sink Plugin with junit test cases #2995

Merged
merged 28 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
643d06c
Sns Sink Plugin with junit test cases
udaych20 Jul 10, 2023
dce26a7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 12, 2023
23b12c7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 13, 2023
69a0ca0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 17, 2023
caf5d1c
Sns Sink DLQ changes
udaych20 Jul 17, 2023
c459013
Incorporated FIFO Topic related Changes
udaych20 Jul 17, 2023
96d09b0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 18, 2023
1b5b45b
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
eb012af
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
d46b778
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 20, 2023
370d57e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 24, 2023
08b1146
Dlq changes for sns sink
udaych20 Jul 24, 2023
0ca43a0
local date stamp removed from dlq
udaych20 Jul 25, 2023
6a9ef69
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 25, 2023
818a03f
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 26, 2023
1027c78
SNS Sink removed threshold and pushed records in specified batch
udaych20 Jul 26, 2023
536d6f0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 27, 2023
2008138
SNS Sink Integration Tests
udaych20 Jul 27, 2023
7b94e33
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Jul 27, 2023
b7b6e5e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 31, 2023
4849418
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
325bf8a
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
526b521
Sns Sink Review Changes
udaych20 Aug 1, 2023
c779a1f
Sns Sink Test Case Changes
udaych20 Aug 1, 2023
67f10d9
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 2, 2023
3ba60d7
Sns Sink Review changes
udaych20 Aug 2, 2023
3a19a4e
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Aug 2, 2023
7088650
Merge branch 'main' into sns-sink-plugin
udaych20 Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions data-prepper-plugins/sns-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:sns'
implementation 'software.amazon.awssdk:sts'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation project(':data-prepper-plugins:failures-common')
testImplementation project(':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket')
systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region')

filter {
includeTestsMatching '*IT'
udaych20 marked this conversation as resolved.
Show resolved Hide resolved
}
}
udaych20 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.sns.SnsClient;

public class SNSClientFactory {

public static SnsClient createSNSClient(final SNSSinkConfig snsSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(snsSinkConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return SnsClient.builder()
.region(snsSinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(createOverrideConfiguration(snsSinkConfig)).build();
}

private static ClientOverrideConfiguration createOverrideConfiguration(final SNSSinkConfig snsSinkConfig) {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(snsSinkConfig.getMaxConnectionRetries()).build();
return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.build();
}

private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) {
return AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.accumulator.LocalFileBufferFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sns.SnsClient;

import java.util.Collection;

/**
* Implementation class of sns-sink plugin. It is responsible for receive the collection of
* {@link Event} and upload to amazon sns based on thresholds configured.
*/
@DataPrepperPlugin(name = "sns", pluginType = Sink.class, pluginConfigurationType = SNSSinkConfig.class)
public class SNSSink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(SNSSink.class);
private final SNSSinkConfig snsSinkConfig;
private volatile boolean sinkInitialized;
private final SNSSinkService snsSinkService;
private final BufferFactory bufferFactory;

/**
* @param pluginSetting dp plugin settings.
* @param snsSinkConfig sns sink configurations.
* @param pluginFactory dp plugin factory.
*/
@DataPrepperPluginConstructor
public SNSSink(final PluginSetting pluginSetting,
final SNSSinkConfig snsSinkConfig,
final PluginFactory pluginFactory,
final SinkContext sinkContext,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
this.snsSinkConfig = snsSinkConfig;
final PluginModel codecConfiguration = snsSinkConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
// TODO: Sink codec changes are pending
// codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
sinkInitialized = Boolean.FALSE;

if (snsSinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) {
bufferFactory = new LocalFileBufferFactory();
} else {
bufferFactory = new InMemoryBufferFactory();
}
final SnsClient snsClient = SNSClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier);
snsSinkService = new SNSSinkService(snsSinkConfig, bufferFactory,snsClient,pluginMetrics,pluginFactory,pluginSetting);

}

@Override
public boolean isReady() {
return sinkInitialized;
}

@Override
public void doInitialize() {
try {
doInitializeInternal();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Invalid plugin configuration, Hence failed to initialize sns-sink plugin.");
this.shutdown();
throw e;
} catch (Exception e) {
LOG.error("Failed to initialize sns-sink plugin.");
this.shutdown();
throw e;
}
}

/**
* Initialize {@link SNSSinkService}
*/
private void doInitializeInternal() {
sinkInitialized = Boolean.TRUE;
}

/**
* @param records Records to be output
*/
@Override
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}
snsSinkService.output(records);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions;

import java.util.Map;
import java.util.Objects;

/**
* sns sink configuration class contains properties, used to read yaml configuration.
*/
public class SNSSinkConfig {

private static final int DEFAULT_CONNECTION_RETRIES = 5;
private static final int DEFAULT_UPLOAD_RETRIES = 5;
public static final String STS_REGION = "sts_region";
udaych20 marked this conversation as resolved.
Show resolved Hide resolved
public static final String STS_ROLE_ARN = "sts_role_arn";

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("topic")
udaych20 marked this conversation as resolved.
Show resolved Hide resolved
@NotNull
@NotEmpty
private String topicArn;

@JsonProperty("id")
private String id;

@JsonProperty("threshold")
@NotNull
private ThresholdOptions thresholdOptions;

@JsonProperty("codec")
@NotNull
private PluginModel codec;

@JsonProperty("dlq")
private PluginModel dlq;

@JsonProperty("dlq_file")
private String dlqFile;

@JsonProperty("buffer_type")
private BufferTypeOptions bufferType = BufferTypeOptions.IN_MEMORY;

private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonProperty("max_retries")
private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;

public PluginModel getDlq() {
return dlq;
}

public String getDlqFile() {
return dlqFile;
}

/**
* Aws Authentication configuration Options.
* @return aws authentication options.
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

/**
* Threshold configuration Options.
* @return threshold option object.
*/
public ThresholdOptions getThresholdOptions() {
return thresholdOptions;
}

public String getTopicArn() {
return topicArn;
}

public String getId() {
return id;
}

/**
* Sink codec configuration Options.
* @return codec plugin model.
*/
public PluginModel getCodec() {
return codec;
}

/**
* Buffer type configuration Options.
* @return buffer type option object.
*/
public BufferTypeOptions getBufferType() {
return bufferType;
}

/**
* SNS client connection retries configuration Options.
* @return max connection retries value.
*/
public int getMaxConnectionRetries() {
return maxConnectionRetries;
}

/**
* SNS object upload retries configuration Options.
* @return maximum upload retries value.
*/
public int getMaxUploadRetries() {
return maxUploadRetries;
}

public String getDlqStsRoleARN(){
return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ?
String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) :
awsAuthenticationOptions.getAwsStsRoleArn();
}

public String getDlqStsRegion(){
return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ?
String.valueOf(getDlqPluginSetting().get(STS_REGION)) :
awsAuthenticationOptions.getAwsRegion().toString();
}

public Map<String, Object> getDlqPluginSetting(){
return dlq != null ? dlq.getPluginSettings() : Map.of();
}
}
Loading