forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improve the SQS shutdown process such that it does not prevent the pi…
…peline from shutting down and no longer results in failures. Resolves opensearch-project#4575 (opensearch-project#4748) The previous approach to shutting down the SQS thread closed the SqsClient. However, with acknowledgments enabled, asynchronous callbacks would result in further attempts to either ChangeVisibilityTimeout or DeleteMessages. These were failing because the client was closed. Also, the threads would remain and prevent Data Prepper from correctly shutting down. With this change, we correctly stop each processing thread. Then we close the client. Additionally, the SqsWorker now checks that it is not stopped before attempting to change the message visibility or delete messages. Additionally, I found some missing test cases. Also, modifying this code and especially unit testing it is becoming more difficult, so I performed some refactoring to move message parsing out of the SqsWorker. Signed-off-by: David Venable <[email protected]> Signed-off-by: Krishna Kondaka <[email protected]>
- Loading branch information
Showing
9 changed files
with
559 additions
and
236 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
44 changes: 44 additions & 0 deletions
44
...e/src/main/java/org/opensearch/dataprepper/plugins/source/s3/parser/SqsMessageParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.s3.parser; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig; | ||
import software.amazon.awssdk.services.sqs.model.Message; | ||
|
||
import java.util.Collection; | ||
import java.util.stream.Collectors; | ||
|
||
public class SqsMessageParser { | ||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
private final S3SourceConfig s3SourceConfig; | ||
private final S3NotificationParser s3NotificationParser; | ||
|
||
public SqsMessageParser(final S3SourceConfig s3SourceConfig) { | ||
this.s3SourceConfig = s3SourceConfig; | ||
s3NotificationParser = createNotificationParser(s3SourceConfig); | ||
} | ||
|
||
public Collection<ParsedMessage> parseSqsMessages(final Collection<Message> sqsMessages) { | ||
return sqsMessages.stream() | ||
.map(this::convertS3EventMessages) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private ParsedMessage convertS3EventMessages(final Message message) { | ||
return s3NotificationParser.parseMessage(message, OBJECT_MAPPER); | ||
} | ||
|
||
private static S3NotificationParser createNotificationParser(final S3SourceConfig s3SourceConfig) { | ||
switch (s3SourceConfig.getNotificationSource()) { | ||
case EVENTBRIDGE: | ||
return new S3EventBridgeNotificationParser(); | ||
case S3: | ||
default: | ||
return new S3EventNotificationParser(); | ||
} | ||
} | ||
} |
Oops, something went wrong.