Skip to content

Commit

Permalink
Catch when no object exists and mark as completed in s3 scan (#3241)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 24, 2023
1 parent 89e8f39 commit c2fc949
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -167,6 +168,9 @@ private void startProcessingObject(final int waitTimeMillis) {
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey());
deleteObjectRequest.ifPresent(s3ObjectDeleteWorker::deleteS3Object);
}
} catch (final NoSuchKeyException e) {
LOG.warn("Object {} from bucket {} could not be found, marking this object as complete and continuing processing", objectKey, bucket);
sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey());
} catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) {
LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage());
sourceCoordinator.giveUpPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -47,10 +48,10 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -275,6 +276,26 @@ void getNextPartition_supplier_is_expected_partitionCreationSupplier() {
objectUnderTest.runWithoutInfiniteLoop();
}

@Test
void partitionIsCompleted_when_NoObjectKeyException_is_thrown_from_process_object() throws IOException {
final String bucket = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();
final String partitionKey = bucket + "|" + objectKey;


final SourcePartition<S3SourceProgressState> partitionToProcess = SourcePartition.builder(S3SourceProgressState.class).withPartitionKey(partitionKey).build();

given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess));

final ArgumentCaptor<S3ObjectReference> objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class);
doThrow(NoSuchKeyException.class).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey));
doNothing().when(sourceCoordinator).completePartition(partitionKey);

createObjectUnderTest().runWithoutInfiniteLoop();

verifyNoMoreInteractions(sourceCoordinator);
}

static Stream<Class> exceptionProvider() {
return Stream.of(PartitionUpdateException.class, PartitionNotFoundException.class, PartitionNotOwnedException.class);
}
Expand Down

0 comments on commit c2fc949

Please sign in to comment.