diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java index 0b411e32b8..07c7997cdb 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java @@ -69,7 +69,7 @@ public SqsWorker(final SqsClient sqsClient, @Override public void run() { - while(true) { + while (true) { int messagesProcessed = 0; try { messagesProcessed = processSqsMessages(); @@ -152,26 +152,35 @@ private ParsedMessage convertS3EventMessages(final Message message) { } private List processS3EventNotificationRecords(final Collection s3EventNotificationRecords) { - List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); + final List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); + final List parsedMessagesToRead = new ArrayList<>(); + for (ParsedMessage parsedMessage : s3EventNotificationRecords) { if (parsedMessage.failedParsing) { sqsMessagesFailedCounter.increment(); if (s3SourceConfig.getOnErrorOption().equals(OnErrorOption.DELETE_MESSAGES)) { deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.message)); } - } - else { + } else { final List notificationRecords = parsedMessage.notificationRecords; - if(!notificationRecords.isEmpty() && isEventNameCreated(notificationRecords.get(0))) { - final S3ObjectReference s3ObjectReference = populateS3Reference(notificationRecords.get(0)); - final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference); - deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); + if (!notificationRecords.isEmpty() && isEventNameCreated(notificationRecords.get(0))) { + parsedMessagesToRead.add(parsedMessage); } else { // Add SQS message to delete collection if the eventName is not ObjectCreated deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.message)); } } } + + LOG.info("Received {} messages from SQS. Read {} messages from S3.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); + + for (ParsedMessage parsedMessage : parsedMessagesToRead) { + final List notificationRecords = parsedMessage.notificationRecords; + final S3ObjectReference s3ObjectReference = populateS3Reference(notificationRecords.get(0)); + final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference); + deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); + } + return deleteMessageBatchRequestEntryCollection; } @@ -194,6 +203,7 @@ private Optional processS3Object( } private void deleteSqsMessages(final List deleteMessageBatchRequestEntryCollection) { + LOG.debug("Deleting {} messages from SQS.", deleteMessageBatchRequestEntryCollection.size()); final DeleteMessageBatchRequest deleteMessageBatchRequest = buildDeleteMessageBatchRequest(deleteMessageBatchRequestEntryCollection); final DeleteMessageBatchResponse deleteMessageBatchResponse = sqsClient.deleteMessageBatch(deleteMessageBatchRequest); if (deleteMessageBatchResponse.hasSuccessful()) { @@ -224,7 +234,7 @@ private boolean isEventNameCreated(final S3EventNotification.S3EventNotification private S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) { final S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getS3(); return S3ObjectReference.bucketAndKey(s3Entity.getBucket().getName(), - s3Entity.getObject().getUrlDecodedKey()) + s3Entity.getObject().getUrlDecodedKey()) .build(); }