Skip to content

Commit

Permalink
Addressed feedback on delete configuration and functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed committed Jul 31, 2023
1 parent eb8d0a3 commit 3547928
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 25 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ All Duration values are a string that represents a duration. They support ISO_86

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

* `delete_s3_objects` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after processing. If `acknowledgments` is enabled, S3 objects will be deleted only if positive acknowledgment is received by S3 source. Defaults to `false`.
* `delete_on_read` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after all the events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled for deleting S3 objects. Defaults to `false`.

### <a name="s3_select_configuration">S3 Select Configuration</a>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_del
final int numberOfRecords = 100;
final int numberOfRecordsToAccumulate = 50;

when(s3SourceConfig.isDeleteS3Objects()).thenReturn(deleteS3Objects);
when(s3SourceConfig.isDeleteOnRead()).thenReturn(deleteS3Objects);
String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli();
final String key = getKeyString(keyPrefix,recordsGenerator, shouldCompress);
final String buketOptionYaml = "name: " + bucket + "\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class S3SourceConfig {
@Valid
private S3ScanScanOptions s3ScanScanOptions;

@JsonProperty("delete_s3_objects")
private boolean deleteS3Objects = false;
@JsonProperty("delete_on_read")
private boolean deleteOnRead = false;

@AssertTrue(message = "A codec is required for reading objects.")
boolean isCodecProvidedWhenNeeded() {
Expand Down Expand Up @@ -138,7 +138,7 @@ public S3ScanScanOptions getS3ScanScanOptions() {
return s3ScanScanOptions;
}

public boolean isDeleteS3Objects() {
return deleteS3Objects;
public boolean isDeleteOnRead() {
return deleteOnRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ScanObjectWorker implements Runnable{

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE;
static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE;
static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter";

private final S3Client s3Client;
Expand All @@ -65,7 +65,7 @@ public class ScanObjectWorker implements Runnable{

// Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped
private final boolean shouldStopProcessing = false;
private final boolean deleteS3Objects;
private final boolean deleteOnRead;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
private final Counter acknowledgementSetCallbackCounter;
Expand All @@ -87,7 +87,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.s3ScanSchedulingOptions = s3SourceConfig.getS3ScanScanOptions().getSchedulingOptions();
this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements();
this.acknowledgementSetManager = acknowledgementSetManager;
this.deleteS3Objects = s3SourceConfig.isDeleteS3Objects();
this.deleteOnRead = s3SourceConfig.isDeleteOnRead();
this.s3ObjectDeleteWorker = s3ObjectDeleteWorker;
this.pluginMetrics = pluginMetrics;
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME);
Expand Down Expand Up @@ -165,10 +165,10 @@ private void startProcessingObject(final int waitTimeMillis) {
private Optional<DeleteObjectRequest> processS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final SourcePartition<S3SourceProgressState> sourcePartition){
final SourcePartition<S3SourceProgressState> sourcePartition) {
try {
s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, sourceCoordinator, sourcePartition.getPartitionKey());
if (deleteS3Objects && sourcePartition.getPartitionClosedCount() + 1 >= s3ScanSchedulingOptions.getJobCount()) {
if (deleteOnRead && endToEndAcknowledgementsEnabled && sourcePartition.getPartitionClosedCount() + 1 >= s3ScanSchedulingOptions.getJobCount()) {
final DeleteObjectRequest deleteObjectRequest = s3ObjectDeleteWorker.buildDeleteObjectRequest(s3ObjectReference.getBucketName(), s3ObjectReference.getKey());
return Optional.of(deleteObjectRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.dataprepper.plugins.source;


import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -28,10 +29,12 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -42,12 +45,14 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;

@ExtendWith(MockitoExtension.class)
class S3ScanObjectWorkerTest {
Expand Down Expand Up @@ -88,6 +93,9 @@ class S3ScanObjectWorkerTest {
@Mock
private PluginMetrics pluginMetrics;

@Mock
private Counter counter;

private List<ScanOptions> scanOptionsList;

@BeforeEach
Expand All @@ -98,6 +106,7 @@ void setup() {
private ScanObjectWorker createObjectUnderTest() {
when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions);
when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions);
when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter);
final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider,
sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
verify(sourceCoordinator).initialize();
Expand Down Expand Up @@ -151,14 +160,14 @@ void partition_from_getNextPartition_is_processed_correctly() throws IOException
}

@Test
void deleteS3Object_should_be_invoked_after_processing_when_deleteS3Objects_is_true() throws IOException {
void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Objects_and_acknowledgements_is_true() throws IOException {
final String bucket = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();
final String partitionKey = bucket + "|" + objectKey;


when(s3SourceConfig.getAcknowledgements()).thenReturn(false);
when(s3SourceConfig.isDeleteS3Objects()).thenReturn(true);
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
when(s3SourceConfig.isDeleteOnRead()).thenReturn(true);
when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest);
when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1);

Expand All @@ -170,17 +179,58 @@ void deleteS3Object_should_be_invoked_after_processing_when_deleteS3Objects_is_t
given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess));

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt());

final ScanObjectWorker scanObjectWorker = createObjectUnderTest();

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
consumer.accept(true);
return acknowledgementSet;
}).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class));

scanObjectWorker.runWithoutInfiniteLoop();

verifyNoInteractions(acknowledgementSetManager);
verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount());
verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
verify(s3ObjectDeleteWorker).deleteS3Object(deleteObjectRequest);
verify(acknowledgementSet).complete();
verify(counter).increment();

final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue();
assertThat(processedObject.getBucketName(), equalTo(bucket));
assertThat(processedObject.getKey(), equalTo(objectKey));
}

@Test
void buildDeleteObjectRequest_should_not_be_invoked_after_processing_when_deleteS3Objects_is_true_acknowledgements_is_false() throws IOException {
final String bucket = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();
final String partitionKey = bucket + "|" + objectKey;


when(s3SourceConfig.getAcknowledgements()).thenReturn(false);
when(s3SourceConfig.isDeleteOnRead()).thenReturn(true);
when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1);

final SourcePartition<S3SourceProgressState> partitionToProcess = SourcePartition.builder(S3SourceProgressState.class)
.withPartitionKey(partitionKey)
.withPartitionClosedCount(0L)
.build();

given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess));

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt());

final ScanObjectWorker scanObjectWorker = createObjectUnderTest();

scanObjectWorker.runWithoutInfiniteLoop();

verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount());
verifyNoInteractions(s3ObjectDeleteWorker);
verifyNoInteractions(acknowledgementSetManager);

final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue();
assertThat(processedObject.getBucketName(), equalTo(bucket));
Expand All @@ -195,7 +245,7 @@ void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_


when(s3SourceConfig.getAcknowledgements()).thenReturn(false);
when(s3SourceConfig.isDeleteS3Objects()).thenReturn(false);
when(s3SourceConfig.isDeleteOnRead()).thenReturn(false);
when(s3ScanSchedulingOptions.getJobCount()).thenReturn(1);

final SourcePartition<S3SourceProgressState> partitionToProcess = SourcePartition.builder(S3SourceProgressState.class)
Expand Down Expand Up @@ -223,12 +273,13 @@ void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_
}

@Test
void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_to_job_count() throws IOException {
void buildDeleteObjectRequest_should_be_invoked_after_closed_count_greater_than_or_equal_to_job_count() throws IOException {
final String bucket = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();
final String partitionKey = bucket + "|" + objectKey;

when(s3SourceConfig.isDeleteS3Objects()).thenReturn(true);
when(s3SourceConfig.isDeleteOnRead()).thenReturn(true);
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest);
when(s3ScanSchedulingOptions.getJobCount()).thenReturn(2);

Expand All @@ -240,14 +291,20 @@ void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_t
given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess));

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).closePartition(anyString(), any(), anyInt());
doNothing().when(acknowledgementSet).complete();

final ScanObjectWorker scanObjectWorker = createObjectUnderTest();

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
consumer.accept(true);
return acknowledgementSet;
}).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class));

scanObjectWorker.runWithoutInfiniteLoop();

verifyNoInteractions(acknowledgementSetManager);
verify(sourceCoordinator).closePartition(partitionKey, s3ScanSchedulingOptions.getRate(), s3ScanSchedulingOptions.getJobCount());
// no interactions when closed count < job count
verifyNoInteractions(s3ObjectDeleteWorker);
Expand All @@ -265,7 +322,6 @@ void deleteS3Object_should_be_invoked_after_closed_count_greater_than_or_equal_t
scanObjectWorker.runWithoutInfiniteLoop();

verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
verify(s3ObjectDeleteWorker).deleteS3Object(deleteObjectRequest);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void default_end_to_end_acknowledgements_test() {

@Test
void default_delete_s3_objects_test() {
assertThat(new S3SourceConfig().isDeleteS3Objects(), equalTo(false));
assertThat(new S3SourceConfig().isDeleteOnRead(), equalTo(false));
}

@Test
Expand Down

0 comments on commit 3547928

Please sign in to comment.