diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartition.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartition.java index 9f86e56ada..da73a7591b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartition.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartition.java @@ -18,12 +18,14 @@ public class SourcePartition { private final String partitionKey; private final T partitionState; + private final Long partitionClosedCount; private SourcePartition(final Builder builder) { Objects.requireNonNull(builder.partitionKey); this.partitionKey = builder.partitionKey; this.partitionState = builder.partitionState; + this.partitionClosedCount = builder.partitionClosedCount; } public String getPartitionKey() { @@ -34,6 +36,10 @@ public Optional getPartitionState() { return Optional.ofNullable(partitionState); } + public Long getPartitionClosedCount() { + return partitionClosedCount; + } + public static Builder builder(Class clazz) { return new Builder<>(clazz); } @@ -42,6 +48,7 @@ public static class Builder { private String partitionKey; private T partitionState; + private Long partitionClosedCount; public Builder(Class clazz) { @@ -57,6 +64,11 @@ public Builder withPartitionState(final T partitionState) { return this; } + public Builder withPartitionClosedCount(final Long partitionClosedCount) { + this.partitionClosedCount = partitionClosedCount; + return this; + } + public SourcePartition build() { return new SourcePartition(this); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartitionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartitionTest.java index 5a2dbf56e2..c7118621c9 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartitionTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/SourcePartitionTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; +import java.util.Random; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; @@ -31,15 +32,18 @@ void sourcePartitionBuilderWithNullPartitionThrowsNullPointerException() { void sourcePartitionBuilder_returns_expected_SourcePartition() { final String partitionKey = UUID.randomUUID().toString(); final String partitionState = UUID.randomUUID().toString(); + final Long partitionClosedCount = new Random().nextLong(); final SourcePartition sourcePartition = SourcePartition.builder(String.class) .withPartitionKey(partitionKey) .withPartitionState(partitionState) + .withPartitionClosedCount(partitionClosedCount) .build(); assertThat(sourcePartition, notNullValue()); assertThat(sourcePartition.getPartitionKey(), equalTo(partitionKey)); assertThat(sourcePartition.getPartitionState().isPresent(), equalTo(true)); assertThat(sourcePartition.getPartitionState().get(), equalTo(partitionState)); + assertThat(sourcePartition.getPartitionClosedCount(), equalTo(partitionClosedCount)); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index 45353774f5..a5eced3dcd 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -168,6 +168,7 @@ public Optional> getNextPartition(final Function sourcePartition = SourcePartition.builder(partitionProgressStateClass) .withPartitionKey(ownedPartitions.get().getSourcePartitionKey()) .withPartitionState(convertStringToPartitionProgressStateClass(ownedPartitions.get().getPartitionProgressState())) + .withPartitionClosedCount(ownedPartitions.get().getClosedCount()) .build(); partitionManager.setActivePartition(sourcePartition); diff --git a/data-prepper-plugins/s3-source/README.md b/data-prepper-plugins/s3-source/README.md index 9be36e27d3..63ea62bd69 100644 --- a/data-prepper-plugins/s3-source/README.md +++ b/data-prepper-plugins/s3-source/README.md @@ -117,6 +117,8 @@ All Duration values are a string that represents a duration. They support ISO_86 * `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`. +* `delete_on_read` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after all the events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled for deleting S3 objects. Defaults to `false`. + ### S3 Select Configuration * `expression` (Required if s3_select enabled) : Provide s3 select query to process the data using S3 select for the particular bucket. @@ -151,6 +153,7 @@ All Duration values are a string that represents a duration. They support ISO_86 * `start_time` (Optional) : Provide the start time to scan objects from all the buckets. This parameter defines a time range together with either end_time or range. Example: `2023-01-23T10:00:00`. * `end_time` (Optional) : Provide the end time to scan objects from all the buckets. This parameter defines a time range together with either start_time or range. Example: `2023-01-23T10:00:00`. * `range` (Optional) : Provide the duration to scan objects from all the buckets. This parameter defines a time range together with either start_time or end_time. +* `scheduling` (Optional): See [Scheduling Configuration](#scheduling_configuration) for details * `bucket`: Provide S3 bucket information * `name` (Required if bucket block is used): Provide S3 bucket name. * `key_prefix` (Optional) : Provide include and exclude the list items. @@ -162,6 +165,18 @@ All Duration values are a string that represents a duration. They support ISO_86 > Note: If a time range is not specified, all objects will be included by default. To set a time range, specify any two and only two configurations from start_time, end_time and range. The time range configured on a specific bucket will override the time range specified on the top level +### Scheduling Configuration + +Schedule frequency and amount of times an object should be processed when using S3 Scan. For example, +a `rate` of `PT1H` and a `job_count` of 3 would result in each object getting processed 3 times, starting after source is ready +and then every hour after the first time the object is processed. + +* `rate` (Optional) : A String that indicates the rate to process an S3 object based on the `job_count`. + Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms"). + Defaults to 8 hours, and is only applicable when `job_count` is greater than 1. +* `job_count` (Optional) : An Integer that specifies how many times each S3 object should be processed. Defaults to 1. + + ### AWS Configuration The AWS configuration is the same for both SQS and S3. @@ -179,10 +194,12 @@ The following policy shows the necessary permissions for S3 source. `kms:Decrypt "Sid": "s3policy", "Effect": "Allow", "Action": [ - "s3:GetObject", - "sqs:DeleteMessage", - "sqs:ReceiveMessage", - "kms:Decrypt" + "s3:GetObject", + "s3:ListBucket", + "s3:DeleteObject", + "sqs:DeleteMessage", + "sqs:ReceiveMessage", + "kms:Decrypt" ], "Resource": "*" } @@ -204,6 +221,9 @@ The following policy shows the necessary permissions for S3 source. `kms:Decrypt * `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source. * `sqsMessagesFailed` - The number of SQS messages that the S3 Source failed to parse. * `sqsMessagesDeleteFailed` - The number of SQS messages that the S3 Source failed to delete from the SQS queue. +* `s3ObjectsDeleted` - The number of S3 objects deleted by the S3 source. +* `s3ObjectsDeleteFailed` - The number of S3 objects that the S3 source failed to delete. +* `acknowledgementSetCallbackCounter` - The number of times End-to-end acknowledgments created an acknowledgment set. ### Timers diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java index 72e54d6c71..0a55ab9a7d 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java @@ -147,7 +147,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n private void parseObject(final String key, final S3ObjectWorker objectUnderTest) throws IOException { final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build(); - objectUnderTest.parseS3Object(s3ObjectReference, null); + objectUnderTest.parseS3Object(s3ObjectReference, null, null, null); } static class IntegrationTestArguments implements ArgumentsProvider { diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java index 9649a4f01f..c75dfa14c5 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java @@ -11,13 +11,19 @@ import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.noop.NoopTimer; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -28,6 +34,8 @@ import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption; @@ -63,12 +71,18 @@ import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETED_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETE_FAILED_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; +@ExtendWith(MockitoExtension.class) public class S3ScanObjectWorkerIT { private static final int TIMEOUT_IN_MILLIS = 200; @@ -84,6 +98,21 @@ public class S3ScanObjectWorkerIT { private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); private SourceCoordinator sourceCoordinator; + @Mock + private S3SourceConfig s3SourceConfig; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private S3ScanScanOptions s3ScanScanOptions; + @Mock + private S3ScanSchedulingOptions s3ScanSchedulingOptions; + + final Counter acknowledgementCounter = mock(Counter.class); + final Counter s3DeletedCounter = mock(Counter.class); + final Counter s3DeleteFailedCounter = mock(Counter.class); + private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequest){ if(Objects.nonNull(s3ObjectRequest.getExpression())) @@ -111,14 +140,14 @@ void setUp() { final Counter counter = mock(Counter.class); final DistributionSummary distributionSummary = mock(DistributionSummary.class); final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER)); - when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(counter); - when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(counter); - when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(counter); - when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(counter); - when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(distributionSummary); - when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(distributionSummary); - when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(distributionSummary); - when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(timer); + lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(counter); + lenient().when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(counter); + lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(counter); + lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(counter); + lenient().when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(distributionSummary); + lenient().when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(distributionSummary); + lenient().when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(distributionSummary); + lenient().when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(timer); bucketOwnerProvider = b -> Optional.empty(); final SourceCoordinationStore inMemoryStore = new InMemorySourceCoordinationStore(new PluginSetting("in_memory", Collections.emptyMap())); @@ -129,7 +158,7 @@ void setUp() { } private void stubBufferWriter(final Consumer additionalEventAssertions, final String key) throws Exception { - doAnswer(a -> { + lenient().doAnswer(a -> { final Collection> recordsCollection = a.getArgument(0); assertThat(recordsCollection.size(), greaterThanOrEqualTo(1)); for (Record eventRecord : recordsCollection) { @@ -169,8 +198,22 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen .codec(recordsGenerator.getCodec()) .compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE) .s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build(); + + when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(acknowledgementCounter); + when(pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME)).thenReturn(s3DeletedCounter); + when(pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME)).thenReturn(s3DeleteFailedCounter); + S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3Client, pluginMetrics); + + when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); + when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions); + lenient().when(s3ScanSchedulingOptions.getRate()).thenReturn(Duration.ofHours(1)); + lenient().when(s3ScanSchedulingOptions.getCount()).thenReturn(1); + + ExecutorService executor = Executors.newFixedThreadPool(2); + acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); + return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest) - ,bucketOwnerProvider, sourceCoordinator); + ,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); } @ParameterizedTest @@ -267,6 +310,62 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer( executorService.shutdownNow(); } + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_deletes_s3_object(final boolean deleteS3Objects) throws Exception { + final RecordsGenerator recordsGenerator = new NewlineDelimitedRecordsGenerator(); + final boolean shouldCompress = true; + final int numberOfRecords = 100; + final int numberOfRecordsToAccumulate = 50; + + when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(deleteS3Objects); + String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli(); + final String key = getKeyString(keyPrefix,recordsGenerator, shouldCompress); + final String buketOptionYaml = "name: " + bucket + "\n" + + "key_prefix:\n" + + " include:\n" + + " - " + keyPrefix; + + final ScanOptions.Builder startTimeAndEndTimeScanOptions = ScanOptions.builder() + .setStartDateTime(LocalDateTime.now().minus(Duration.ofMinutes(10))) + .setEndDateTime(LocalDateTime.now().plus(Duration.ofHours(1))); + + List scanOptions = List.of(startTimeAndEndTimeScanOptions); + final ScanOptions.Builder s3ScanOptionsBuilder = scanOptions.get(0).setBucketOption(objectMapper.readValue(buketOptionYaml, S3ScanBucketOption.class)); + + s3ObjectGenerator.write(numberOfRecords, key, recordsGenerator, shouldCompress); + stubBufferWriter(recordsGenerator::assertEventIsCorrect, key); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(recordsGenerator, + numberOfRecordsToAccumulate, + recordsGenerator.getS3SelectExpression(), + shouldCompress, + s3ScanOptionsBuilder.build(), + ThreadLocalRandom.current().nextBoolean()); + + + final int expectedWrites = numberOfRecords / numberOfRecordsToAccumulate; + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(scanObjectWorker::run); + + + await().atMost(Duration.ofSeconds(60)).until(() -> waitForAllRecordsToBeProcessed(numberOfRecords)); + + verify(buffer, times(expectedWrites)).writeAll(anyCollection(), eq(TIMEOUT_IN_MILLIS)); + assertThat(recordsReceived, equalTo(numberOfRecords)); + + // wait for S3 objects to be deleted to verify metrics + Thread.sleep(500); + + if (deleteS3Objects) + verify(s3DeletedCounter, times(1)).increment(); + verifyNoMoreInteractions(s3DeletedCounter); + verifyNoInteractions(s3DeleteFailedCounter); + + executorService.shutdownNow(); + } + private String getKeyString(final String keyPrefix , final RecordsGenerator recordsGenerator, final boolean shouldCompress) { diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerIT.java index ec93bc2cd4..a2db276b9b 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerIT.java @@ -193,7 +193,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n private void parseObject(final String key, final S3SelectObjectWorker objectUnderTest) throws IOException { final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build(); - objectUnderTest.parseS3Object(s3ObjectReference,null); + objectUnderTest.parseS3Object(s3ObjectReference,null, null, null); } static class IntegrationTestArguments implements ArgumentsProvider { diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorker.java new file mode 100644 index 0000000000..948eb3e726 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorker.java @@ -0,0 +1,49 @@ +package org.opensearch.dataprepper.plugins.source; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; + +public class S3ObjectDeleteWorker { + private static final Logger LOG = LoggerFactory.getLogger(S3ObjectDeleteWorker.class); + static final String S3_OBJECTS_DELETED_METRIC_NAME = "s3ObjectsDeleted"; + static final String S3_OBJECTS_DELETE_FAILED_METRIC_NAME = "s3ObjectsDeleteFailed"; + private final S3Client s3Client; + private final Counter s3ObjectsDeletedCounter; + private final Counter s3ObjectsDeleteFailedCounter; + + public S3ObjectDeleteWorker(final S3Client s3Client, final PluginMetrics pluginMetrics) { + this.s3Client = s3Client; + + s3ObjectsDeletedCounter = pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME); + s3ObjectsDeleteFailedCounter = pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME); + } + + public void deleteS3Object(final DeleteObjectRequest deleteObjectRequest) { + try { + final DeleteObjectResponse deleteObjectResponse = s3Client.deleteObject(deleteObjectRequest); + if (deleteObjectResponse.sdkHttpResponse().isSuccessful()) { + LOG.info("Deleted object: {} in S3 bucket: {}. ", deleteObjectRequest.key(), deleteObjectRequest.bucket()); + s3ObjectsDeletedCounter.increment(); + } else { + s3ObjectsDeleteFailedCounter.increment(); + } + } catch (final SdkException e) { + LOG.error("Failed to delete object: {} from S3 bucket: {}. ", deleteObjectRequest.key(), deleteObjectRequest.bucket(), e); + s3ObjectsDeleteFailedCounter.increment(); + } + } + + public DeleteObjectRequest buildDeleteObjectRequest(final String bucketName, final String key) { + return DeleteObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + } + +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java index b8072d8dff..1e0466552e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectHandler.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import java.io.IOException; @@ -21,5 +22,7 @@ public interface S3ObjectHandler { * @throws IOException exception is thrown every time because this is not supported */ void parseS3Object(final S3ObjectReference s3ObjectReference, - final AcknowledgementSet acknowledgementSet) throws IOException; + final AcknowledgementSet acknowledgementSet, + final SourceCoordinator sourceCoordinator, + final String partitionKey) throws IOException; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java index faf5729f9e..c7ea5fd296 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.slf4j.Logger; @@ -20,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; /** @@ -28,6 +30,7 @@ */ class S3ObjectWorker implements S3ObjectHandler { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class); + static final int RECORDS_TO_ACCUMULATE_TO_SAVE_STATE = 10_000; private final S3Client s3Client; private final Buffer> buffer; @@ -51,11 +54,14 @@ public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) { this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics(); } - public void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException { + public void parseS3Object(final S3ObjectReference s3ObjectReference, + final AcknowledgementSet acknowledgementSet, + final SourceCoordinator sourceCoordinator, + final String partitionKey) throws IOException { final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout); try { s3ObjectPluginMetrics.getS3ObjectReadTimer().recordCallable((Callable) () -> { - doParseObject(acknowledgementSet, s3ObjectReference, bufferAccumulator); + doParseObject(acknowledgementSet, s3ObjectReference, bufferAccumulator, sourceCoordinator, partitionKey); return null; }); } catch (final IOException | RuntimeException e) { @@ -68,7 +74,11 @@ public void parseS3Object(final S3ObjectReference s3ObjectReference, final Ackno s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment(); } - private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3ObjectReference s3ObjectReference, final BufferAccumulator> bufferAccumulator) throws IOException { + private void doParseObject(final AcknowledgementSet acknowledgementSet, + final S3ObjectReference s3ObjectReference, + final BufferAccumulator> bufferAccumulator, + final SourceCoordinator sourceCoordinator, + final String partitionKey) throws IOException { final long s3ObjectSize; final long totalBytesRead; @@ -79,6 +89,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3 final CompressionOption fileCompressionOption = compressionOption != CompressionOption.AUTOMATIC ? compressionOption : CompressionOption.fromFileName(s3ObjectReference.getKey()); + final AtomicInteger saveStateCounter = new AtomicInteger(); try { s3ObjectSize = inputFile.getLength(); @@ -93,6 +104,12 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3 acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + int recordsWrittenAfterLastSaveState = bufferAccumulator.getTotalWritten() - saveStateCounter.get() * RECORDS_TO_ACCUMULATE_TO_SAVE_STATE; + // Saving state to renew source coordination ownership for every 10,000 records, ownership time is 10 minutes + if (recordsWrittenAfterLastSaveState >= RECORDS_TO_ACCUMULATE_TO_SAVE_STATE && sourceCoordinator != null && partitionKey != null) { + sourceCoordinator.saveProgressStateForPartition(partitionKey, null); + saveStateCounter.getAndIncrement(); + } } catch (final Exception e) { LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage()); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java index 60cde0dd6e..19d8a9d679 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanService.java @@ -4,6 +4,8 @@ */ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; @@ -18,6 +20,7 @@ * objects and spawn a thread {@link S3SelectObjectWorker} */ public class S3ScanService { + private final S3SourceConfig s3SourceConfig; private final List s3ScanBucketOptions; private final S3ClientBuilderFactory s3ClientBuilderFactory; private final LocalDateTime endDateTime; @@ -30,12 +33,19 @@ public class S3ScanService { private final BucketOwnerProvider bucketOwnerProvider; private final SourceCoordinator sourceCoordinator; + private final AcknowledgementSetManager acknowledgementSetManager; + private final S3ObjectDeleteWorker s3ObjectDeleteWorker; + private final PluginMetrics pluginMetrics; public S3ScanService(final S3SourceConfig s3SourceConfig, final S3ClientBuilderFactory s3ClientBuilderFactory, final S3ObjectHandler s3ObjectHandler, final BucketOwnerProvider bucketOwnerProvider, - final SourceCoordinator sourceCoordinator) { + final SourceCoordinator sourceCoordinator, + final AcknowledgementSetManager acknowledgementSetManager, + final S3ObjectDeleteWorker s3ObjectDeleteWorker, + final PluginMetrics pluginMetrics) { + this.s3SourceConfig = s3SourceConfig; this.s3ScanBucketOptions = s3SourceConfig.getS3ScanScanOptions().getBuckets(); this.s3ClientBuilderFactory = s3ClientBuilderFactory; this.endDateTime = s3SourceConfig.getS3ScanScanOptions().getEndTime(); @@ -44,11 +54,14 @@ public S3ScanService(final S3SourceConfig s3SourceConfig, this.s3ObjectHandler = s3ObjectHandler; this.bucketOwnerProvider = bucketOwnerProvider; this.sourceCoordinator = sourceCoordinator; + this.acknowledgementSetManager = acknowledgementSetManager; + this.s3ObjectDeleteWorker = s3ObjectDeleteWorker; + this.pluginMetrics = pluginMetrics; } public void start() { scanObjectWorkerThread = new Thread(new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(), - getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator)); + getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics)); scanObjectWorkerThread.start(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java index 84993f92a5..100f9385cd 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorker.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption; @@ -99,7 +100,10 @@ public S3SelectObjectWorker(final S3ObjectRequest s3ObjectRequest) { this.bucketOwnerProvider = s3ObjectRequest.getBucketOwnerProvider(); } - public void parseS3Object(final S3ObjectReference s3ObjectReference, final AcknowledgementSet acknowledgementSet) throws IOException { + public void parseS3Object(final S3ObjectReference s3ObjectReference, + final AcknowledgementSet acknowledgementSet, + final SourceCoordinator sourceCoordinator, + final String partitionKey) throws IOException { try{ LOG.info("Read S3 object: {}", s3ObjectReference); selectObject(s3ObjectReference, acknowledgementSet); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Service.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Service.java index f87271d50c..99b0626a7e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Service.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Service.java @@ -15,7 +15,7 @@ public class S3Service { } void addS3Object(final S3ObjectReference s3ObjectReference, AcknowledgementSet acknowledgementSet) throws IOException { - s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet); + s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java index b078652e2a..7a0717d707 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java @@ -87,6 +87,8 @@ public void start(Buffer> buffer) { s3SourceConfig.getBufferTimeout(), s3ObjectPluginMetrics); final BiConsumer eventMetadataModifier = new EventMetadataModifier( s3SourceConfig.getMetadataRootKey()); + final S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3ClientBuilderFactory.getS3Client(), pluginMetrics); + if (s3SelectOptional.isPresent()) { S3SelectCSVOption csvOption = (s3SelectOptional.get().getS3SelectCSVOption() != null) ? s3SelectOptional.get().getS3SelectCSVOption() : new S3SelectCSVOption(); @@ -122,7 +124,7 @@ public void start(Buffer> buffer) { sqsService.start(); } if(s3ScanScanOptional.isPresent()) { - s3ScanService = new S3ScanService(s3SourceConfig,s3ClientBuilderFactory,s3Handler,bucketOwnerProvider, sourceCoordinator); + s3ScanService = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3Handler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); s3ScanService.start(); } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java index 9259e56b4d..817a6b7ca4 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3SourceConfig.java @@ -79,6 +79,9 @@ public class S3SourceConfig { @Valid private S3ScanScanOptions s3ScanScanOptions; + @JsonProperty("delete_s3_objects_on_read") + private boolean deleteS3ObjectsOnRead = false; + @AssertTrue(message = "A codec is required for reading objects.") boolean isCodecProvidedWhenNeeded() { if(s3SelectOptions == null) @@ -142,6 +145,10 @@ public S3ScanScanOptions getS3ScanScanOptions() { return s3ScanScanOptions; } + public boolean isDeleteS3ObjectsOnRead() { + return deleteS3ObjectsOnRead; + } + public Map getBucketOwners() { return bucketOwners; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index f8e46e1bc6..2b1d8388aa 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -4,21 +4,33 @@ */ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import java.util.function.Function; /** @@ -31,6 +43,9 @@ public class ScanObjectWorker implements Runnable{ private static final int STANDARD_BACKOFF_MILLIS = 30_000; + static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; + static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; + private final S3Client s3Client; private final List scanOptionsBuilderList; @@ -43,19 +58,39 @@ public class ScanObjectWorker implements Runnable{ private final Function, List> partitionCreationSupplier; + private final S3ScanSchedulingOptions s3ScanSchedulingOptions; + + private final boolean endToEndAcknowledgementsEnabled; + private final AcknowledgementSetManager acknowledgementSetManager; + // Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped private final boolean shouldStopProcessing = false; + private final boolean deleteS3ObjectsOnRead; + private final S3ObjectDeleteWorker s3ObjectDeleteWorker; + private final PluginMetrics pluginMetrics; + private final Counter acknowledgementSetCallbackCounter; public ScanObjectWorker(final S3Client s3Client, final List scanOptionsBuilderList, final S3ObjectHandler s3ObjectHandler, final BucketOwnerProvider bucketOwnerProvider, - final SourceCoordinator sourceCoordinator){ + final SourceCoordinator sourceCoordinator, + final S3SourceConfig s3SourceConfig, + final AcknowledgementSetManager acknowledgementSetManager, + final S3ObjectDeleteWorker s3ObjectDeleteWorker, + final PluginMetrics pluginMetrics){ this.s3Client = s3Client; this.scanOptionsBuilderList = scanOptionsBuilderList; this.s3ObjectHandler= s3ObjectHandler; this.bucketOwnerProvider = bucketOwnerProvider; this.sourceCoordinator = sourceCoordinator; + this.s3ScanSchedulingOptions = s3SourceConfig.getS3ScanScanOptions().getSchedulingOptions(); + this.endToEndAcknowledgementsEnabled = s3SourceConfig.getAcknowledgements(); + this.acknowledgementSetManager = acknowledgementSetManager; + this.deleteS3ObjectsOnRead = s3SourceConfig.isDeleteS3ObjectsOnRead(); + this.s3ObjectDeleteWorker = s3ObjectDeleteWorker; + this.pluginMetrics = pluginMetrics; + acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); this.sourceCoordinator.initialize(); this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList); @@ -91,19 +126,55 @@ private void startProcessingObject(final int waitTimeMillis) { final String objectKey = objectToProcess.get().getPartitionKey().split("\\|")[1]; try { - processS3Object(S3ObjectReference.bucketAndKey(bucket, objectKey).build()); - sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + List waitingForAcknowledgements = new ArrayList<>(); + AcknowledgementSet acknowledgementSet = null; + CompletableFuture completableFuture = new CompletableFuture<>(); + + if (endToEndAcknowledgementsEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + acknowledgementSetCallbackCounter.increment(); + // Delete only if this is positive acknowledgement + if (result == true) { + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + waitingForAcknowledgements.forEach(s3ObjectDeleteWorker::deleteS3Object); + } + completableFuture.complete(result); + }, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS)); + } + + + final Optional deleteObjectRequest = processS3Object(S3ObjectReference.bucketAndKey(bucket, objectKey).build(), + acknowledgementSet, sourceCoordinator, objectToProcess.get()); + + if (endToEndAcknowledgementsEnabled) { + deleteObjectRequest.ifPresent(waitingForAcknowledgements::add); + acknowledgementSet.complete(); + completableFuture.get(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } else { + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + deleteObjectRequest.ifPresent(s3ObjectDeleteWorker::deleteS3Object); + } } catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) { LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage()); sourceCoordinator.giveUpPartitions(); + } catch (final ExecutionException | TimeoutException | InterruptedException e) { + LOG.error("Exception occurred while waiting for acknowledgement.", e); } } - private void processS3Object(final S3ObjectReference s3ObjectReference){ + private Optional processS3Object(final S3ObjectReference s3ObjectReference, + final AcknowledgementSet acknowledgementSet, + final SourceCoordinator sourceCoordinator, + final SourcePartition sourcePartition) { try { - s3ObjectHandler.parseS3Object(s3ObjectReference,null); - } catch (IOException ex) { + s3ObjectHandler.parseS3Object(s3ObjectReference, acknowledgementSet, sourceCoordinator, sourcePartition.getPartitionKey()); + if (deleteS3ObjectsOnRead && endToEndAcknowledgementsEnabled) { + final DeleteObjectRequest deleteObjectRequest = s3ObjectDeleteWorker.buildDeleteObjectRequest(s3ObjectReference.getBucketName(), s3ObjectReference.getKey()); + return Optional.of(deleteObjectRequest); + } + } catch (final IOException ex) { LOG.error("Error while process the parseS3Object. ",ex); } + return Optional.empty(); } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java index 02f5745ef5..824e2d61bb 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java @@ -42,6 +42,9 @@ public class S3ScanScanOptions { @JsonProperty("buckets") private List buckets; + @JsonProperty("scheduling") + private S3ScanSchedulingOptions schedulingOptions = new S3ScanSchedulingOptions(); + @AssertTrue(message = "At most two options from start_time, end_time and range can be specified at the same time") public boolean hasValidTimeOptions() { return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3; @@ -60,4 +63,8 @@ public LocalDateTime getStartTime() { public List getBuckets() { return buckets; } + + public S3ScanSchedulingOptions getSchedulingOptions() { + return schedulingOptions; + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java new file mode 100644 index 0000000000..9dbe4b8840 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java @@ -0,0 +1,24 @@ +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Min; + +import java.time.Duration; + +public class S3ScanSchedulingOptions { + @JsonProperty("rate") + private Duration rate = Duration.ofHours(8); + + @Min(1) + @JsonProperty("count") + private int Count = 1; + + public Duration getRate() { + return rate; + } + + public int getCount() { + return Count; + } + +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/S3RetriesExhaustedException.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/S3RetriesExhaustedException.java new file mode 100644 index 0000000000..35e85c214e --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/exception/S3RetriesExhaustedException.java @@ -0,0 +1,8 @@ +package org.opensearch.dataprepper.plugins.source.exception; + +public class S3RetriesExhaustedException extends RuntimeException { + + public S3RetriesExhaustedException(final String errorMessage) { + super(errorMessage); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorkerTest.java new file mode 100644 index 0000000000..ca2e45d2fb --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectDeleteWorkerTest.java @@ -0,0 +1,108 @@ +package org.opensearch.dataprepper.plugins.source; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; + +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETED_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETE_FAILED_METRIC_NAME; + +@ExtendWith(MockitoExtension.class) +class S3ObjectDeleteWorkerTest { + @Mock + private S3Client s3Client; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private DeleteObjectRequest deleteObjectRequest; + + @Mock + private DeleteObjectResponse deleteObjectResponse; + + @Mock + private SdkHttpResponse sdkHttpResponse; + + private Counter s3ObjectsDeletedCounter; + + private Counter s3ObjectsDeleteFailedCounter; + + private S3ObjectDeleteWorker createObjectUnderTest() { + s3ObjectsDeletedCounter = mock(Counter.class); + s3ObjectsDeleteFailedCounter = mock(Counter.class);; + + when(pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME)).thenReturn(s3ObjectsDeletedCounter); + when(pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME)).thenReturn(s3ObjectsDeleteFailedCounter); + + return new S3ObjectDeleteWorker(s3Client, pluginMetrics); + } + + @Test + void buildDeleteObjectRequest_test_should_create_deleteObjectRequest_with_correct_bucket_and_key() { + final String testBucketName = UUID.randomUUID().toString(); + final String testKey = UUID.randomUUID().toString(); + final S3ObjectDeleteWorker objectUnderTest = createObjectUnderTest(); + final DeleteObjectRequest deleteObjectRequest = objectUnderTest.buildDeleteObjectRequest(testBucketName, testKey); + + assertThat(deleteObjectRequest.bucket(), equalTo(testBucketName)); + assertThat(deleteObjectRequest.key(), equalTo(testKey)); + } + + @Test + void deleteS3Object_with_successful_response_should_increment_success_metric() { + final S3ObjectDeleteWorker objectUnderTest = createObjectUnderTest(); + when(s3Client.deleteObject(deleteObjectRequest)).thenReturn(deleteObjectResponse); + when(deleteObjectResponse.sdkHttpResponse()).thenReturn(sdkHttpResponse); + when(sdkHttpResponse.isSuccessful()).thenReturn(true); + + objectUnderTest.deleteS3Object(deleteObjectRequest); + + verify(s3Client).deleteObject(deleteObjectRequest); + verify(s3ObjectsDeletedCounter).increment(); + verifyNoMoreInteractions(s3ObjectsDeleteFailedCounter); + } + + @Test + void deleteS3Object_with_failed_response_should_increment_failed_metric() { + final S3ObjectDeleteWorker objectUnderTest = createObjectUnderTest(); + when(s3Client.deleteObject(deleteObjectRequest)).thenReturn(deleteObjectResponse); + when(deleteObjectResponse.sdkHttpResponse()).thenReturn(sdkHttpResponse); + when(sdkHttpResponse.isSuccessful()).thenReturn(false); + + objectUnderTest.deleteS3Object(deleteObjectRequest); + + verify(s3Client).deleteObject(deleteObjectRequest); + verify(s3ObjectsDeleteFailedCounter).increment(); + verifyNoMoreInteractions(s3ObjectsDeletedCounter); + } + + @Test + void deleteS3Object_with_exception_should_increment_failed_metric() { + final S3ObjectDeleteWorker objectUnderTest = createObjectUnderTest(); + when(s3Client.deleteObject(deleteObjectRequest)).thenThrow(SdkException.class); + + objectUnderTest.deleteS3Object(deleteObjectRequest); + + verify(s3Client).deleteObject(deleteObjectRequest); + verify(s3ObjectsDeleteFailedCounter).increment(); + verifyNoMoreInteractions(s3ObjectsDeletedCounter); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java index 36e43fb96e..438a7feadf 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java @@ -24,6 +24,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -61,6 +62,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.S3ObjectWorker.RECORDS_TO_ACCUMULATE_TO_SAVE_STATE; @ExtendWith(MockitoExtension.class) class S3ObjectWorkerTest { @@ -123,6 +125,8 @@ class S3ObjectWorkerTest { private long objectSize; @Mock private S3ObjectPluginMetrics s3ObjectPluginMetrics; + @Mock + private SourceCoordinator sourceCoordinator; private int numEventsAdded; @@ -179,7 +183,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest() throws IOExce when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } @Test @@ -196,7 +200,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg c.accept(record); return null; }).when(codec).parse(any(InputFile.class), any(DecompressionEngine.class), any(Consumer.class)); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); assertThat(numEventsAdded, equalTo(1)); verifyNoInteractions(s3ObjectNoRecordsFound); } @@ -210,7 +214,7 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_when_bucketOwne when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter); when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } @Test @@ -221,7 +225,7 @@ void parseS3Object_calls_Codec_parse_on_S3InputStream() throws Exception { when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); final ArgumentCaptor inputFileArgumentCaptor = ArgumentCaptor.forClass(InputFile.class); verify(codec).parse(inputFileArgumentCaptor.capture(), any(DecompressionEngine.class), any(Consumer.class)); @@ -238,7 +242,7 @@ void parseS3Object_codec_parse_exception() throws Exception { assertThrows( IOException.class, - () -> createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet)); + () -> createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null)); final ArgumentCaptor inputFileArgumentCaptor = ArgumentCaptor.forClass(InputFile.class); verify(codec).parse(inputFileArgumentCaptor.capture(), any(DecompressionEngine.class), any(Consumer.class)); @@ -258,7 +262,7 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) .thenReturn(bufferAccumulator); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } final ArgumentCaptor>> eventConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); @@ -277,6 +281,41 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato inOrder.verify(bufferAccumulator).add(record); } + @Test + void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulator_and_saves_state() throws Exception { + when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse); + when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(s3ObjectEventsSummary); + when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(s3ObjectsSucceededCounter); + when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); + when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); + + final String testPartitionKey = UUID.randomUUID().toString(); + + final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) + .thenReturn(bufferAccumulator); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, sourceCoordinator, testPartitionKey); + } + + final ArgumentCaptor>> eventConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); + verify(codec).parse(any(InputFile.class), any(DecompressionEngine.class), eventConsumerArgumentCaptor.capture()); + + final Consumer> consumerUnderTest = eventConsumerArgumentCaptor.getValue(); + + final Record record = mock(Record.class); + final Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(bufferAccumulator.getTotalWritten()).thenReturn(RECORDS_TO_ACCUMULATE_TO_SAVE_STATE + 1); + + consumerUnderTest.accept(record); + + final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator, sourceCoordinator); + inOrder.verify(eventConsumer).accept(event, s3ObjectReference); + inOrder.verify(bufferAccumulator).add(record); + inOrder.verify(sourceCoordinator).saveProgressStateForPartition(testPartitionKey, null); + } + @Test void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exception { when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse); @@ -289,7 +328,7 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) .thenReturn(bufferAccumulator); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } final InOrder inOrder = inOrder(codec, bufferAccumulator); @@ -308,7 +347,7 @@ void parseS3Object_increments_success_counter_after_parsing_S3_object() throws I when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); final S3ObjectHandler objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics); - objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet); + objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet, null, null); verify(s3ObjectsSucceededCounter).increment(); verifyNoInteractions(s3ObjectsFailedCounter); @@ -326,7 +365,7 @@ void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_t .when(codec).parse(any(InputFile.class), any(DecompressionEngine.class), any(Consumer.class)); final S3ObjectHandler objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics); - final IOException actualException = assertThrows(IOException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet)); + final IOException actualException = assertThrows(IOException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet, null, null)); assertThat(actualException, sameInstance(expectedException)); @@ -343,7 +382,7 @@ void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_t when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(s3ObjectsFailedCounter); final S3ObjectHandler objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics); - final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet)); + final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet, null, null)); assertThat(actualException, sameInstance(expectedException)); @@ -361,7 +400,7 @@ void parseS3Object_calls_HeadObject_after_Callable() throws Exception { when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); final S3ObjectHandler objectUnderTest = createObjectUnderTest(s3ObjectPluginMetrics); - objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet); + objectUnderTest.parseS3Object(s3ObjectReference, acknowledgementSet, null, null); final InOrder inOrder = inOrder(s3ObjectReadTimer, s3Client); @@ -383,7 +422,7 @@ void parseS3Object_records_BufferAccumulator_getTotalWritten() throws IOExceptio try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) .thenReturn(bufferAccumulator); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); } verify(s3ObjectEventsSummary).record(totalWritten); @@ -397,7 +436,7 @@ void parseS3Object_records_S3_ObjectSize() throws IOException { when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); verify(s3ObjectSizeSummary).record(objectSize); } @@ -410,7 +449,7 @@ void parseS3Object_records_S3_Object_No_Records_Found() throws IOException { when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); verify(s3ObjectNoRecordsFound).increment(); } @@ -436,7 +475,7 @@ void parseS3Object_records_input_file_bytes_read() throws IOException { when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(s3ObjectSizeSummary); when(s3ObjectPluginMetrics.getS3ObjectNoRecordsFound()).thenReturn(s3ObjectNoRecordsFound); - createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest(s3ObjectPluginMetrics).parseS3Object(s3ObjectReference, acknowledgementSet, null, null); verify(s3ObjectSizeProcessedSummary).record(inputStringLength); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java index 8d71d25ce9..5f59aa76c6 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,31 +14,44 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; @ExtendWith(MockitoExtension.class) class S3ScanObjectWorkerTest { @@ -54,6 +68,33 @@ class S3ScanObjectWorkerTest { @Mock private SourceCoordinator sourceCoordinator; + @Mock + private S3SourceConfig s3SourceConfig; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private S3ScanScanOptions s3ScanScanOptions; + + @Mock + private S3ObjectDeleteWorker s3ObjectDeleteWorker; + + @Mock + private AcknowledgementSet acknowledgementSet; + + @Mock + private DeleteObjectRequest deleteObjectRequest; + + @Mock + private S3ScanSchedulingOptions s3ScanSchedulingOptions; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter counter; + private List scanOptionsList; @BeforeEach @@ -62,7 +103,11 @@ void setup() { } private ScanObjectWorker createObjectUnderTest() { - final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator); + when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions); + when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); + when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(counter); + final ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client, scanOptionsList, s3ObjectHandler, bucketOwnerProvider, + sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); verify(sourceCoordinator).initialize(); return objectUnderTest; } @@ -80,7 +125,7 @@ void giveUpPartitions_is_called_when_a_PartitionException_is_thrown_from_parseS3 given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); - doThrow(exception).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null)); + doThrow(exception).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); doNothing().when(sourceCoordinator).giveUpPartitions(); createObjectUnderTest().runWithoutInfiniteLoop(); @@ -95,13 +140,16 @@ void partition_from_getNextPartition_is_processed_correctly() throws IOException final String partitionKey = bucket + "|" + objectKey; - final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class).withPartitionKey(partitionKey).build(); + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); - doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null)); - doNothing().when(sourceCoordinator).completePartition(partitionKey); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).completePartition(anyString()); createObjectUnderTest().runWithoutInfiniteLoop(); @@ -110,6 +158,116 @@ void partition_from_getNextPartition_is_processed_correctly() throws IOException assertThat(processedObject.getKey(), equalTo(objectKey)); } + @Test + void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Objects_and_acknowledgements_is_true() throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; + + + when(s3SourceConfig.getAcknowledgements()).thenReturn(true); + when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(true); + when(s3ObjectDeleteWorker.buildDeleteObjectRequest(bucket, objectKey)).thenReturn(deleteObjectRequest); + + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); + + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).completePartition(anyString()); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); + + scanObjectWorker.runWithoutInfiniteLoop(); + + verify(sourceCoordinator).completePartition(partitionKey); + verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); + verify(acknowledgementSet).complete(); + verify(counter).increment(); + + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); + } + + @Test + void buildDeleteObjectRequest_should_not_be_invoked_after_processing_when_deleteS3Objects_is_true_acknowledgements_is_false() throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; + + + when(s3SourceConfig.getAcknowledgements()).thenReturn(false); + when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(true); + + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); + + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).completePartition(anyString()); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + + scanObjectWorker.runWithoutInfiniteLoop(); + + verify(sourceCoordinator).completePartition(partitionKey); + verifyNoInteractions(s3ObjectDeleteWorker); + verifyNoInteractions(acknowledgementSetManager); + + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); + } + + @Test + void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_is_false() throws IOException { + final String bucket = UUID.randomUUID().toString(); + final String objectKey = UUID.randomUUID().toString(); + final String partitionKey = bucket + "|" + objectKey; + + + when(s3SourceConfig.getAcknowledgements()).thenReturn(false); + when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(false); + + final SourcePartition partitionToProcess = SourcePartition.builder(S3SourceProgressState.class) + .withPartitionKey(partitionKey) + .withPartitionClosedCount(0L) + .build(); + + given(sourceCoordinator.getNextPartition(any(Function.class))).willReturn(Optional.of(partitionToProcess)); + + final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); + doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); + doNothing().when(sourceCoordinator).completePartition(anyString()); + + final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); + + scanObjectWorker.runWithoutInfiniteLoop(); + + verifyNoInteractions(acknowledgementSetManager); + verify(sourceCoordinator).completePartition(partitionKey); + verifyNoInteractions(s3ObjectDeleteWorker); + + final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); + assertThat(processedObject.getBucketName(), equalTo(bucket)); + assertThat(processedObject.getKey(), equalTo(objectKey)); + } + @Test void getNextPartition_supplier_is_expected_partitionCreationSupplier() { given(sourceCoordinator.getNextPartition(any(S3ScanPartitionCreationSupplier.class))).willReturn(Optional.empty()); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java index d4e732478f..9e5dc128a5 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.source; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions; @@ -12,7 +17,6 @@ import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; - import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -25,7 +29,22 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class S3ScanServiceTest { +@ExtendWith(MockitoExtension.class) +class S3ScanServiceTest { + @Mock + private S3ObjectDeleteWorker s3ObjectDeleteWorker; + @Mock + private SourceCoordinator sourceCoordinator; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + @Mock + private S3ClientBuilderFactory s3ClientBuilderFactory; + @Mock + private S3ObjectHandler s3ObjectHandler; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginMetrics pluginMetrics; @Test void scan_service_test_and_verify_thread_invoking() { @@ -35,7 +54,7 @@ void scan_service_test_and_verify_thread_invoking() { } @Test - public void scan_service_with_valid_s3_scan_configuration_test_and_verify() { + void scan_service_with_valid_s3_scan_configuration_test_and_verify() { final String bucketName="my-bucket-5"; final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00"); final Duration range = Duration.parse("P2DT1H"); @@ -54,8 +73,7 @@ public void scan_service_with_valid_s3_scan_configuration_test_and_verify() { when(bucket.getS3ScanBucketOption()).thenReturn(s3ScanBucketOption); when(s3ScanScanOptions.getBuckets()).thenReturn(List.of(bucket)); when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); - S3ScanService service = new S3ScanService(s3SourceConfig,mock(S3ClientBuilderFactory.class), - mock(S3ObjectHandler.class),mock(BucketOwnerProvider.class), mock(SourceCoordinator.class)); + S3ScanService service = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); final List scanOptionsBuilder = service.getScanOptions(); assertThat(scanOptionsBuilder.get(0).getBucketOption().getkeyPrefix().getS3scanIncludeOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName)); @@ -64,7 +82,7 @@ public void scan_service_with_valid_s3_scan_configuration_test_and_verify() { } @Test - public void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() { + void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() { final String bucketName="my-bucket-5"; final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00"); final Duration range = Duration.parse("P2DT1H"); @@ -82,8 +100,7 @@ public void scan_service_with_valid_bucket_time_range_configuration_test_and_ver when(bucket.getS3ScanBucketOption()).thenReturn(s3ScanBucketOption); when(s3ScanScanOptions.getBuckets()).thenReturn(List.of(bucket)); when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); - S3ScanService service = new S3ScanService(s3SourceConfig,mock(S3ClientBuilderFactory.class), - mock(S3ObjectHandler.class),mock(BucketOwnerProvider.class), mock(SourceCoordinator.class)); + S3ScanService service = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); final List scanOptionsBuilder = service.getScanOptions(); assertThat(scanOptionsBuilder.get(0).getBucketOption().getkeyPrefix().getS3scanIncludeOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName)); @@ -92,7 +109,7 @@ public void scan_service_with_valid_bucket_time_range_configuration_test_and_ver } @Test - public void scan_service_with_no_time_range_configuration_test_and_verify() { + void scan_service_with_no_time_range_configuration_test_and_verify() { final String bucketName="my-bucket-5"; final List includeKeyPathList = List.of("file1.csv","file2.csv"); final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); @@ -108,8 +125,7 @@ public void scan_service_with_no_time_range_configuration_test_and_verify() { when(bucket.getS3ScanBucketOption()).thenReturn(s3ScanBucketOption); when(s3ScanScanOptions.getBuckets()).thenReturn(List.of(bucket)); when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions); - S3ScanService service = new S3ScanService(s3SourceConfig,mock(S3ClientBuilderFactory.class), - mock(S3ObjectHandler.class),mock(BucketOwnerProvider.class), mock(SourceCoordinator.class)); + S3ScanService service = new S3ScanService(s3SourceConfig, s3ClientBuilderFactory, s3ObjectHandler, bucketOwnerProvider, sourceCoordinator, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics); final List scanOptionsBuilder = service.getScanOptions(); assertThat(scanOptionsBuilder.get(0).getBucketOption().getkeyPrefix().getS3scanIncludeOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName)); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerTest.java index 09c4fe555c..7393fab562 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SelectObjectWorkerTest.java @@ -178,7 +178,7 @@ void parseS3Object_where_s3SelectResponseHandler_returns_exception_throws_IO_exc given(selectObjectResponseFuture.join()).willReturn(mock(Void.class)); given(s3AsyncClient.selectObjectContent(any(SelectObjectContentRequest.class), eq(selectResponseHandler))).willReturn(selectObjectResponseFuture); - assertThrows(IOException.class, () -> createObjectUnderTest().parseS3Object(s3ObjectReference, null)); + assertThrows(IOException.class, () -> createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null)); assertHeadObjectRequestIsCorrect(); @@ -198,7 +198,7 @@ void parseS3Object_with_no_events_in_SelectObjectContentStream_does_not_throw_ex given(selectObjectResponseFuture.join()).willReturn(mock(Void.class)); given(s3AsyncClient.selectObjectContent(any(SelectObjectContentRequest.class), eq(selectResponseHandler))).willReturn(selectObjectResponseFuture); - createObjectUnderTest().parseS3Object(s3ObjectReference, null); + createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null); assertHeadObjectRequestIsCorrect(); @@ -232,7 +232,7 @@ void parseS3Object_with_different_formats_and_events_in_the_inputstream_works_as given(s3AsyncClient.selectObjectContent(any(SelectObjectContentRequest.class), eq(selectResponseHandler))).willReturn(selectObjectResponseFuture); doAnswer(invocation -> null).when(eventConsumer).accept(any(Event.class), eq(s3ObjectReference)); - createObjectUnderTest().parseS3Object(s3ObjectReference, null); + createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null); if (isBatchingExpected) { assertHeadObjectRequestIsCorrect(); @@ -270,7 +270,7 @@ void parseS3Object_with_different_formats_and_events_in_the_inputstream_works_as doAnswer(invocation -> null).when(eventConsumer).accept(any(Event.class), eq(s3ObjectReference)); numEventsAdded = 0; - createObjectUnderTest().parseS3Object(s3ObjectReference, acknowledgementSet); + createObjectUnderTest().parseS3Object(s3ObjectReference, acknowledgementSet, null, null); assertThat(numEventsAdded, equalTo(1)); if (isBatchingExpected) { @@ -305,7 +305,7 @@ void parseS3Object_retries_bufferAccumulatorFlush_when_bufferWriteAll_throwsTime doThrow(TimeoutException.class).doNothing().when(buffer).writeAll(any(Collection.class), anyInt()); - createObjectUnderTest().parseS3Object(s3ObjectReference, null); + createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null); assertHeadObjectRequestIsCorrect(); @@ -325,7 +325,7 @@ void parseS3Object_does_not_use_batching_for_unsupported_json_configuration(fina given(selectJsonOption.getType()).willReturn(jsonType); mockNonBatchedS3SelectCall(compressionType); - createObjectUnderTest().parseS3Object(s3ObjectReference, null); + createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null); verifyNonBatchedS3SelectCall(); } @@ -345,7 +345,7 @@ void parseS3Object_does_not_use_batching_for_unsupported_csv_configuration(final given(selectCSVOption.getQuiteEscape()).willReturn(quoteEscape); mockNonBatchedS3SelectCall(compressionType); - createObjectUnderTest().parseS3Object(s3ObjectReference, null); + createObjectUnderTest().parseS3Object(s3ObjectReference, null, null, null); verifyNonBatchedS3SelectCall(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java index e1ba936dd3..83a2f8990d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3SourceConfigTest.java @@ -44,6 +44,11 @@ void default_end_to_end_acknowledgements_test() { assertThat(new S3SourceConfig().getAcknowledgements(), equalTo(false)); } + @Test + void default_delete_s3_objects_test() { + assertThat(new S3SourceConfig().isDeleteS3ObjectsOnRead(), equalTo(false)); + } + @Test void default_notification_source_test() { assertThat(new S3SourceConfig().getNotificationSource(), equalTo(NotificationSourceOption.S3)); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptionsTest.java new file mode 100644 index 0000000000..9d707a2606 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptionsTest.java @@ -0,0 +1,27 @@ +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.Test; + +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class S3ScanSchedulingOptionsTest { + private final ObjectMapper objectMapper = + new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)).registerModule(new JavaTimeModule()); + + @Test + public void s3_scan_bucket_option_yaml_configuration_test() throws JsonProcessingException { + + final String schedulingOptionsYaml = "rate: \"PT1H\" \ncount: 2 \n"; + final S3ScanSchedulingOptions s3ScanSchedulingOptions = objectMapper.readValue(schedulingOptionsYaml, S3ScanSchedulingOptions.class); + assertThat(s3ScanSchedulingOptions.getCount(), equalTo(2)); + assertThat(s3ScanSchedulingOptions.getRate(), equalTo(Duration.ofHours(1))); + } +} \ No newline at end of file