-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sns Sink Plugin with junit test cases #2995
Conversation
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
...er-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkConfig.java
Outdated
Show resolved
Hide resolved
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/dlq/DlqPushHandler.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/dlq/DlqPushHandler.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
…r.git into sns-sink-plugin
...-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceIT.java
Outdated
Show resolved
Hide resolved
...-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceIT.java
Outdated
Show resolved
Hide resolved
...-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceIT.java
Outdated
Show resolved
Hide resolved
...-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceIT.java
Outdated
Show resolved
Hide resolved
...er-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkConfig.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...ugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/SNSSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSClientFactory.java
Outdated
Show resolved
Hide resolved
...er-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkConfig.java
Outdated
Show resolved
Hide resolved
...r-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/SNSSinkService.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Kumar Chintala <[email protected]>
|
||
private List<String> readMessagesFromSNSTopicQueue(List<String> inputRecords, final String sqsQueue) { | ||
final List<Message> messages = new ArrayList<>(); | ||
do{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use Awaitility to wait for a maximum period of time. Perhaps 1 minute.
As it is, this can get stuck indefinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable for your feedback, we have incorporated the changes and available for review.
private String topicArn; | ||
|
||
@JsonProperty("message_group_id") | ||
private String messageGroupId; | ||
public String messageGroupId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be private
and provide a public getter - getMessageGroupId()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable for your feedback, we have incorporated the changes and available for review.
Collection<Record<Event>> records = List.of(eventRecord); | ||
snsSinkService.output(records); | ||
verify(numberOfRecordsSuccessCounter).increment(records.size()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this in an earlier comment.
#2995 (comment)
We need to verify a couple of things:
- The sink calls the client.
- The data that the sink provides to the client is correct.
It should look somewhat like the following.
final PublishBatchRequest request = createPublicRequestByTopic(topicName, processRecordsList);
final PublishBatchResponse publishBatchResponse = snsClient.publishBatch(request);
final ArgumentCaptor<PublishBatchRequest> publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
verify(snsClient).publishBash(publishBatchRequestCaptor.capture());
final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue();
assertThat(actualRequest.topicArn(), equalTo(topicArn));
assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1));
// Dive into the first publish batch request entry and verify that it has the correct string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable for your feedback, we have incorporated the changes and available for review.
...s/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
Outdated
Show resolved
Hide resolved
...s/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
Show resolved
Hide resolved
...s/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
Show resolved
Hide resolved
...s/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
Show resolved
Hide resolved
Signed-off-by: Uday Kumar Chintala <[email protected]>
…r.git into sns-sink-plugin
@udaych20 , Please rebase your branch from |
Signed-off-by: Uday Chintala <[email protected]>
@dlvenable done. |
This PR was previously approved by @graytaylor0 , and has only been rebased. So I'll go ahead and merge. |
Description
Draft PR for sns sink #2938
Issues Resolved
This PR consists sns-sink plugin code with junit test cases.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.