From 35479280b77dec5dc01d8f00e92f7a464e9ca809 Mon Sep 17 00:00:00 2001 From: Asif Sohail Mohammed Date: Mon, 31 Jul 2023 00:16:53 -0500 Subject: [PATCH] Addressed feedback on delete configuration and functionality Signed-off-by: Asif Sohail Mohammed --- data-prepper-plugins/s3-source/README.md | 2 +- .../plugins/source/S3ScanObjectWorkerIT.java | 2 +- .../plugins/source/S3SourceConfig.java | 8 +- .../plugins/source/ScanObjectWorker.java | 10 +-- .../source/S3ScanObjectWorkerTest.java | 82 ++++++++++++++++--- .../plugins/source/S3SourceConfigTest.java | 2 +- 6 files changed, 81 insertions(+), 25 deletions(-) diff --git a/data-prepper-plugins/s3-source/README.md b/data-prepper-plugins/s3-source/README.md index fd2e0a10d4..63ea62bd69 100644 --- a/data-prepper-plugins/s3-source/README.md +++ b/data-prepper-plugins/s3-source/README.md @@ -117,7 +117,7 @@ All Duration values are a string that represents a duration. They support ISO_86 * `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`. -* `delete_s3_objects` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after processing. If `acknowledgments` is enabled, S3 objects will be deleted only if positive acknowledgment is received by S3 source. Defaults to `false`. +* `delete_on_read` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after all the events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled for deleting S3 objects. Defaults to `false`. ### S3 Select Configuration diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java index 46cd011fb2..95956c79c4 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java @@ -318,7 +318,7 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_del final int numberOfRecords = 100; final int numberOfRecordsToAccumulate = 50; - when(s3SourceConfig.isDeleteS3Objects()).thenReturn(deleteS3Objects); + when(s3SourceConfig.isDeleteOnRead()).thenReturn(deleteS3Objects); String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli(); final String key = getKeyString(keyPrefix,recordsGenerator, shouldCompress); final String buketOptionYaml = "name: " + bucket + "\n" + diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java index 34551bf406..1cc1881f69 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java @@ -72,8 +72,8 @@ public class S3SourceConfig { @Valid private S3ScanScanOptions s3ScanScanOptions; - @JsonProperty("delete_s3_objects") - private boolean deleteS3Objects = false; + @JsonProperty("delete_on_read") + private boolean deleteOnRead = false; @AssertTrue(message = "A codec is required for reading objects.") boolean isCodecProvidedWhenNeeded() { @@ -138,7 +138,7 @@ public S3ScanScanOptions getS3ScanScanOptions() { return s3ScanScanOptions; } - public boolean isDeleteS3Objects() { - return deleteS3Objects; + public boolean isDeleteOnRead() { + return deleteOnRead; } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index d06458d475..409c190c97 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -43,7 +43,7 @@ public class ScanObjectWorker implements Runnable{ private static final int STANDARD_BACKOFF_MILLIS = 30_000; - private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; + static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; private final S3Client s3Client; @@ -65,7 +65,7 @@ public class ScanObjectWorker implements Runnable{ // Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped private final boolean shouldStopProcessing = false; - private final boolean deleteS3Objects; + private final boolean deleteOnRead; private final S3ObjectDeleteWorker s3ObjectDeleteWorker; private final PluginMetrics pluginMetrics; private final Counter acknowledgementSetCallbackCounter; @@ -87,7 +87,7 @@ public ScanObjectWorker(final S3Client s3Client, this.s3ScanSchedulingOptions = s3SourceConfig.getS3ScanScanOptions().getSchedulingOptions(); this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements(); this.acknowledgementSetManager = acknowledgementSetManager; - this.deleteS3Objects = s3SourceConfig.isDeleteS3Objects(); + this.deleteOnRead = s3SourceConfig.isDeleteOnRead(); this.s3ObjectDeleteWorker = s3ObjectDeleteWorker; this.pluginMetrics = pluginMetrics; acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); @@ -165,10 +165,10 @@ private void startProcessingObject(final int waitTimeMillis) { private Optional processS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet, final SourceCoordinator sourceCoordinator, - final SourcePartition sourcePartition){ + final SourcePartition sourcePartition) { try { s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, sourceCoordinator, sourcePartition.getPartitionKey()); - if (deleteS3Objects && sourcePartition.getPartitionClosedCount() + 1 >= s3ScanSchedulingOptions.getJobCount()) { + if (deleteOnRead && endToEndAcknowledgementsEnabled && sourcePartition.getPartitionClosedCount() + 1 >= s3ScanSchedulingOptions.getJobCount()) { final DeleteObjectRequest deleteObjectRequest = s3ObjectDeleteWorker.buildDeleteObjectRequest(s3ObjectReference.getBucketName(), s3ObjectReference.getKey()); return Optional.of(deleteObjectRequest); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java index 4754442b3e..f4f1259cc3 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,10 +29,12 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -42,12 +45,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; @ExtendWith(MockitoExtension.class) class S3ScanObjectWorkerTest { @@ -88,6 +93,9 @@ class S3ScanObjectWorkerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private Counter counter; + private List scanOptionsList; @BeforeEach @@ -98,6 +106,7 @@ void setup() { private ScanObjectWorker createObjectUnderTest() { when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions); when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); + when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter); final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); verify(sourceCoordinator).initialize(); @@ -151,14 +160,14 @@ void partition_from_getNextPartition_is_processed_correctly() throws IOException } @Test - void deleteS3Object_should_be_invoked_after_processing_when_deleteS3Objects_is_true() throws IOException { + void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Objects_and_acknowledgements_is_true() throws IOException { final String bucket = UUID.randomUUID().toString(); final String objectKey = UUID.randomUUID().toString(); final String partitionKey = bucket + "|" + objectKey; - when(s3SourceConfig.getAcknowledgements()).thenReturn(false); - when(s3SourceConfig.isDeleteS3Objects()).thenReturn(true); + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + when(s3SourceConfig.isDeleteOnRead()).thenReturn(true); when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest); when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1); @@ -170,17 +179,58 @@ void deleteS3Object_should_be_invoked_after_processing_when_deleteS3Objects_is_t given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); - doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey)); doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt()); final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + scanObjectWorker.runWithoutInfiniteLoop(); - verifyNoInteractions(acknowledgementSetManager); verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount()); verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); - verify(s3ObjectDeleteWorker).deleteS3Object(deleteObjectRequest); + verify(acknowledgementSet).complete(); + verify(counter).increment(); + + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); + } + + @Test + void buildDeleteObjectRequest_should_not_be_invoked_after_processing_when_deleteS3Objects_is_true_acknowledgements_is_false() throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; + + + when(s3SourceConfig.getAcknowledgements()).thenReturn(false); + when(s3SourceConfig.isDeleteOnRead()).thenReturn(true); + when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1); + + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); + + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt()); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + + scanObjectWorker.runWithoutInfiniteLoop(); + + verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount()); + verifyNoInteractions(s3ObjectDeleteWorker); + verifyNoInteractions(acknowledgementSetManager); final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); assertThat(processedObject.getBucketName(), equalTo(bucket)); @@ -195,7 +245,7 @@ void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_ when(s3SourceConfig.getAcknowledgements()).thenReturn(false); - when(s3SourceConfig.isDeleteS3Objects()).thenReturn(false); + when(s3SourceConfig.isDeleteOnRead()).thenReturn(false); when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1); final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) @@ -223,12 +273,13 @@ void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_ } @Test - void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_to_job_count() throws IOException { + void buildDeleteObjectRequest_should_be_invoked_after_closed_count_greater_than_or_equal_to_job_count() throws IOException { final String bucket = UUID.randomUUID().toString(); final String objectKey = UUID.randomUUID().toString(); final String partitionKey = bucket + "|" + objectKey; - when(s3SourceConfig.isDeleteS3Objects()).thenReturn(true); + when(s3SourceConfig.isDeleteOnRead()).thenReturn(true); + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest); when(s3ScanSchedulingOptions.getJobCount()).thenReturn(2); @@ -240,14 +291,20 @@ void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_t given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); - doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey)); doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt()); + doNothing().when(acknowledgementSet).complete(); final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + scanObjectWorker.runWithoutInfiniteLoop(); - verifyNoInteractions(acknowledgementSetManager); verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount()); // no interactions when closed count < job count verifyNoInteractions(s3ObjectDeleteWorker); @@ -265,7 +322,6 @@ void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_t scanObjectWorker.runWithoutInfiniteLoop(); verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); - verify(s3ObjectDeleteWorker).deleteS3Object(deleteObjectRequest); } @Test diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java index ba54d8ac3a..22e49ee478 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java @@ -46,7 +46,7 @@ void default_end_to_end_acknowledgements_test() { @Test void default_delete_s3_objects_test() { - assertThat(new S3SourceConfig().isDeleteS3Objects(), equalTo(false)); + assertThat(new S3SourceConfig().isDeleteOnRead(), equalTo(false)); } @Test