Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sns Sink Plugin with junit test cases #2995

Merged
merged 28 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
643d06c
Sns Sink Plugin with junit test cases
udaych20 Jul 10, 2023
dce26a7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 12, 2023
23b12c7
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 13, 2023
69a0ca0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 17, 2023
caf5d1c
Sns Sink DLQ changes
udaych20 Jul 17, 2023
c459013
Incorporated FIFO Topic related Changes
udaych20 Jul 17, 2023
96d09b0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 18, 2023
1b5b45b
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
eb012af
SNS Sink incorporated the review comments.
udaych20 Jul 18, 2023
d46b778
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 20, 2023
370d57e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 24, 2023
08b1146
Dlq changes for sns sink
udaych20 Jul 24, 2023
0ca43a0
local date stamp removed from dlq
udaych20 Jul 25, 2023
6a9ef69
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 25, 2023
818a03f
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 26, 2023
1027c78
SNS Sink removed threshold and pushed records in specified batch
udaych20 Jul 26, 2023
536d6f0
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 27, 2023
2008138
SNS Sink Integration Tests
udaych20 Jul 27, 2023
7b94e33
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Jul 27, 2023
b7b6e5e
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Jul 31, 2023
4849418
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
325bf8a
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 1, 2023
526b521
Sns Sink Review Changes
udaych20 Aug 1, 2023
c779a1f
Sns Sink Test Case Changes
udaych20 Aug 1, 2023
67f10d9
Merge branch 'opensearch-project:main' into sns-sink-plugin
udaych20 Aug 2, 2023
3ba60d7
Sns Sink Review changes
udaych20 Aug 2, 2023
3a19a4e
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
udaych20 Aug 2, 2023
7088650
Merge branch 'main' into sns-sink-plugin
udaych20 Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions data-prepper-plugins/sns-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ pipeline:
...
sink:
- sns:
topic: arn:aws:sns:ap-south-1:524239988922:my-topic
topic_arn: arn:aws:sns:ap-south-1:524239988922:my-topic
message_group_id: /type
message_deduplication_id: /id
batch_size: 10
aws:
region: ap-south-1
Expand All @@ -29,10 +31,14 @@ pipeline:

## SNS Pipeline Configuration

- `topic` (Optional) : The SNS Topic Arn of the Topic to push events.
- `topic_arn` (Optional) : The SNS Topic Arn of the Topic to push events.

- `batch_size` (Optional) : An integer value indicates the maximum number of events required to ingest into sns topic. Defaults to 10.

- `message_group_id` (optional): A string of message group identifier which is used as `message_group_id` for the message group when it is stored in the sns topic. Default to Auto generated Random key.

- `message_deduplication_id` (Optional) : A string of message deduplication identifier which is used as `message_deduplication_id` for the message deduplication when it is stored in the sns topic. Default to Auto generated Random key.

- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.

Expand Down Expand Up @@ -66,6 +72,8 @@ The integration tests for this plugin do not run as part of the Data Prepper bui

The following command runs the integration tests:

Note: Subscribe sns topic to sqs queues to run the integration tests.

```
./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<<aws-region>> -Dtests.sns.sink.sts.role.arn=<<aws-sts-role-arn>> -Dtests.sns.sink.standard.topic=<<standard-topic-arn>> -Dtests.sns.sink.fifo.topic=<<fifo-topic-arn>> -Dtests.sns.sink.dlq.file.path=<<dlq-file-path>>
./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<<aws-region>> -Dtests.sns.sink.sts.role.arn=<<aws-sts-role-arn>> -Dtests.sns.sink.standard.topic=<<standard-topic-arn>> -Dtests.sns.sink.fifo.topic=<<fifo-topic-arn>> -Dtests.sns.sink.dlq.file.path=<<dlq-file-path>> -Dtests.sns.sink.standard.sqs.queue.url=<<sqs-standard-queue>> -Dtests.sns.sink.fifo.sqs.queue.url=<<sqs-fifo-queue>>
```
6 changes: 3 additions & 3 deletions data-prepper-plugins/sns-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ dependencies {
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:sns'
implementation 'software.amazon.awssdk:sts'
testImplementation 'software.amazon.awssdk:sqs'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation 'org.apache.commons:commons-lang3:3.12.0'
Expand Down Expand Up @@ -56,6 +54,8 @@ task integrationTest(type: Test) {
systemProperty 'tests.sns.sink.sts.role.arn', System.getProperty('tests.sns.sink.sts.role.arn')
systemProperty 'tests.sns.sink.standard.topic', System.getProperty('tests.sns.sink.standard.topic')
systemProperty 'tests.sns.sink.fifo.topic', System.getProperty('tests.sns.sink.fifo.topic')
systemProperty 'tests.sns.sink.standard.sqs.queue.url', System.getProperty('tests.sns.sink.standard.sqs.queue.url')
systemProperty 'tests.sns.sink.fifo.sqs.queue.url', System.getProperty('tests.sns.sink.fifo.sqs.queue.url')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;
package org.opensearch.dataprepper.plugins.sink.sns;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -20,27 +22,36 @@
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.SNSSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED;
import static org.opensearch.dataprepper.plugins.sink.SNSSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS;

public class SNSSinkServiceIT{
public class SnsSinkServiceIT {

private SnsClient snsClient;

Expand All @@ -53,7 +64,7 @@ public class SNSSinkServiceIT{
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


private static final String SNS_SINK_CONFIG_YAML = " topic: {0}\n" +
private static final String SNS_SINK_CONFIG_YAML = " topic_arn: {0}\n" +
" batch_size: 10\n" +
" aws:\n" +
" region: {1}\n" +
Expand All @@ -73,11 +84,17 @@ public class SNSSinkServiceIT{

private String dlqFilePath;


private Counter snsSinkObjectsEventsSucceeded;

private Counter numberOfRecordsFailedCounter;

private String standardSqsQueue;

private SqsClient sqsClient;

private String fifoSqsQueue;

private DlqPushHandler dlqPushHandler;

@BeforeEach
public void setup() {
Expand All @@ -86,7 +103,10 @@ public void setup() {
this.region = System.getProperty("tests.sns.sink.region");
this.stsRoleArn = System.getProperty("tests.sns.sink.sts.role.arn");
this.dlqFilePath = System.getProperty("tests.sns.sink.dlq.file.path");
this.standardSqsQueue = System.getProperty("tests.sns.sink.standard.sqs.queue.url");
this.fifoSqsQueue = System.getProperty("tests.sns.sink.fifo.sqs.queue.url");

this.dlqPushHandler = mock(DlqPushHandler.class);
this.pluginMetrics = mock(PluginMetrics.class);
this.pluginFactory = mock(PluginFactory.class);
this.pluginSetting = mock(PluginSetting.class);
Expand All @@ -95,6 +115,7 @@ public void setup() {
this.snsClient = SnsClient.builder()
.region(Region.of(region))
.build();
this.sqsClient = SqsClient.builder().region(Region.of(region)).build();
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(snsSinkObjectsEventsSucceeded);
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter);

Expand All @@ -108,45 +129,97 @@ private Collection<Record<Event>> setEventQueue(final int records) {
}

private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\""+ UUID.randomUUID() +"\"}]").build();
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

public SNSSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException {
public SnsSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException {
String[] values = { topicName,region,stsRoleArn,dlqFilePath };
final String configYaml = MessageFormat.format(SNS_SINK_CONFIG_YAML, values);
final SNSSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SNSSinkConfig.class);
return new SNSSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting);
final SnsSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SnsSinkConfig.class);
return new SnsSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting,mock(ExpressionEvaluator.class));
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10})
public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException {
final SNSSinkService objectUnderTest = createObjectUnderTest(standardTopic);
@ValueSource(ints = {5,9,10})
public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}

@Test
public void sns_sink_service_test_with_standard_queue_with_multiple_batch() throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(11);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded,times(2)).increment(anyDouble());
}

private List<String> readMessagesFromSNSTopicQueue(List<String> inputRecords, final String sqsQueue) {
final List<Message> messages = new ArrayList<>();
do{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use Awaitility to wait for a maximum period of time. Perhaps 1 minute.

As it is, this can get stuck indefinitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable for your feedback, we have incorporated the changes and available for review.

messages.addAll(sqsClient.receiveMessage(builder -> builder.queueUrl(sqsQueue)).messages());
if(messages.size() >= inputRecords.size()){
break;
}
} while(true);
List<String> topicData = messages.stream().map(Message::body).map(obj-> {
try {
Map<String,String> map = objectMapper.readValue(obj,Map.class);
return map.get("Message");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
return topicData;
}

private void deleteSqsMessages(String sqsQueue, List<Message> messages) throws InterruptedException {
for (Message message : messages) {
sqsClient.deleteMessage(builder -> builder.queueUrl(sqsQueue).receiptHandle(message.receiptHandle()));
Thread.sleep(Duration.ofSeconds(2).toMillis());
}
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10})
public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException {
final SNSSinkService objectUnderTest = createObjectUnderTest(fifoTopic);
public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(fifoTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,fifoSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}



@ParameterizedTest
@ValueSource(ints = {1,5,9})
public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException {
public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException, InterruptedException {
final ObjectMapper mapper = new ObjectMapper();
final String topic = "test";
Files.deleteIfExists(Path.of(dlqFilePath));
final SNSSinkService objectUnderTest = createObjectUnderTest(topic);
final SnsSinkService objectUnderTest = createObjectUnderTest(topic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
verify(numberOfRecordsFailedCounter).increment(recordCount);
final Map<String,String> map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class);
assertThat(map.get("topic"),equalTo(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;
package org.opensearch.dataprepper.plugins.sink.sns;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.sns.SnsClient;

public class SNSClientFactory {
public class SnsClientFactory {

public static SnsClient createSNSClient(final SNSSinkConfig snsSinkConfig,
public static SnsClient createSNSClient(final SnsSinkConfig snsSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(snsSinkConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
Expand All @@ -26,7 +26,7 @@ public static SnsClient createSNSClient(final SNSSinkConfig snsSinkConfig,
.overrideConfiguration(createOverrideConfiguration(snsSinkConfig)).build();
}

private static ClientOverrideConfiguration createOverrideConfiguration(final SNSSinkConfig snsSinkConfig) {
private static ClientOverrideConfiguration createOverrideConfiguration(final SnsSinkConfig snsSinkConfig) {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(snsSinkConfig.getMaxConnectionRetries()).build();
return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
Expand Down
Loading
Loading