diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java index 60d48c97d3..dc09cd989c 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java @@ -345,7 +345,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks(final int numberO ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); @@ -420,7 +420,7 @@ void processSqsMessages_with_acks_and_progress_check_callbacks_expires(final int ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(sqsOptions.getMaxVisibilityTimeoutExtension()).thenReturn(Duration.ofSeconds(60)); - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); final SqsWorker objectUnderTest = createObjectUnderTest(); final int sqsMessagesProcessed = objectUnderTest.processSqsMessages(); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index d02d8f214c..a0f7a0bb16 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -239,15 +239,15 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { final int progressCheckInterval = visibilityTimeout/2 - 1; if (endToEndAcknowledgementsEnabled) { int expiryTimeout = visibilityTimeout - 2; - final boolean extendedVisibilityTimeoutEnabled = sqsOptions.getExtendVisibilityTimeout(); - if (extendedVisibilityTimeoutEnabled) { + final boolean visibilityDuplicateProtectionEnabled = sqsOptions.getVisibilityDuplicateProtection(); + if (visibilityDuplicateProtectionEnabled) { expiryTimeout = maxVisibilityTimeout; } acknowledgementSet = acknowledgementSetManager.create( (result) -> { acknowledgementSetCallbackCounter.increment(); // Delete only if this is positive acknowledgement - if (extendedVisibilityTimeoutEnabled) { + if (visibilityDuplicateProtectionEnabled) { parsedMessageVisibilityTimesMap.remove(parsedMessage); } if (result == true) { @@ -255,7 +255,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } }, Duration.ofSeconds(expiryTimeout)); - if (extendedVisibilityTimeoutEnabled) { + if (visibilityDuplicateProtectionEnabled) { acknowledgementSet.addProgressCheck( (ratio) -> { final int newVisibilityTimeoutSeconds = visibilityTimeout; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java index 6c34c715db..c4acd3abfd 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java @@ -16,7 +16,7 @@ public class SqsOptions { private static final int DEFAULT_MAXIMUM_MESSAGES = 10; - private static final Boolean DEFAULT_EXTEND_VISIBILITY_TIMEOUT = false; + private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false; private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30); private static final Duration DEFAULT_MAX_VISIBILITY_TIMEOUT_EXTENSION = Duration.ofSeconds(1800); // 30 minutes private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); @@ -36,8 +36,8 @@ public class SqsOptions { @DurationMax(seconds = 43200) private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS; - @JsonProperty("extend_visibility_timeout") - private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT; + @JsonProperty("visibility_duplication_protection") + private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION; @JsonProperty("max_visibility_timeout_extension") @DurationMin(seconds = 30) @@ -69,8 +69,8 @@ public Duration getMaxVisibilityTimeoutExtension() { return maxVisibilityTimeoutExtension; } - public Boolean getExtendVisibilityTimeout() { - return extendVisibilityTimeout; + public Boolean getVisibilityDuplicateProtection() { + return visibilityDuplicateProtection; } public Duration getWaitTime() { diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java index dcab3184c5..9fc800eac0 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java @@ -221,7 +221,7 @@ void processSqsMessages_should_return_number_of_messages_processed_with_acknowle @ParameterizedTest @ValueSource(strings = {"ObjectCreated:Put", "ObjectCreated:Post", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"}) void processSqsMessages_should_return_number_of_messages_processed_with_acknowledgements_and_progress_check(final String eventName) throws IOException { - when(sqsOptions.getExtendVisibilityTimeout()).thenReturn(true); + when(sqsOptions.getVisibilityDuplicateProtection()).thenReturn(true); when(sqsOptions.getVisibilityTimeout()).thenReturn(Duration.ofSeconds(6)); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); when(s3SourceConfig.getAcknowledgements()).thenReturn(true);