From 4e2e911c3eef81ac20d1c6664b56d78806c0aab0 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 11 Aug 2023 10:54:42 -0500 Subject: [PATCH 01/10] Add support for scheduled scan to s3 scan (#3140) Add support for scheduled scan to s3 scan Signed-off-by: Taylor Gray --- .../S3ScanPartitionCreationSupplier.java | 114 +++++++++++++- .../plugins/source/ScanObjectWorker.java | 2 +- .../configuration/S3ScanBucketOption.java | 2 - .../configuration/S3ScanScanOptions.java | 13 +- .../S3ScanSchedulingOptions.java | 13 +- .../S3ScanPartitionCreationSupplierTest.java | 142 +++++++++++++++++- .../configuration/S3ScanScanOptionsTest.java | 3 - 7 files changed, 266 insertions(+), 23 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java index cadcaf71e8..731803f7cf 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java @@ -7,10 +7,14 @@ import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.utils.Pair; import java.time.Instant; @@ -18,6 +22,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -26,23 +31,38 @@ public class S3ScanPartitionCreationSupplier implements Function, List> { + private static final Logger LOG = LoggerFactory.getLogger(S3ScanPartitionCreationSupplier.class); + private static final String BUCKET_OBJECT_PARTITION_KEY_FORMAT = "%s|%s"; + static final String SCAN_COUNT = "SCAN_COUNT"; + static final String LAST_SCAN_TIME = "LAST_SCAN_TIME"; private final S3Client s3Client; private final BucketOwnerProvider bucketOwnerProvider; private final List scanOptionsList; + private final S3ScanSchedulingOptions schedulingOptions; public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, - final List scanOptionsList) { + final List scanOptionsList, + final S3ScanSchedulingOptions schedulingOptions) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; this.scanOptionsList = scanOptionsList; + this.schedulingOptions = schedulingOptions; } @Override public List apply(final Map globalStateMap) { + if (globalStateMap.isEmpty()) { + initializeGlobalStateMap(globalStateMap); + } + + if (shouldScanBeSkipped(globalStateMap)) { + return Collections.emptyList(); + } + final List objectsToProcess = new ArrayList<>(); for (final ScanOptions scanOptions : scanOptionsList) { @@ -60,27 +80,33 @@ public List apply(final Map globalStateMap) s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> { listObjectsV2Request.prefix(includePath); objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); }); else objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); } + globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1); + globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli()); + return objectsToProcess; } private List listFilteredS3ObjectsForBucket(final List excludeKeyPaths, - final ListObjectsV2Request.Builder listObjectsV2Request, - final String bucket, - final LocalDateTime startDateTime, - final LocalDateTime endDateTime) { + final ListObjectsV2Request.Builder listObjectsV2Request, + final String bucket, + final LocalDateTime startDateTime, + final LocalDateTime endDateTime, + final Map globalStateMap) { + Instant mostRecentLastModifiedTimestamp = globalStateMap.containsKey(bucket) ? Instant.parse((String) globalStateMap.get(bucket)) : null; final List allPartitionIdentifiers = new ArrayList<>(); ListObjectsV2Response listObjectsV2Response = null; do { listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() + .filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(bucket, s3Object, globalStateMap)) .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) .filter(keyTimestampPair -> excludeKeyPaths.stream() @@ -89,8 +115,11 @@ private List listFilteredS3ObjectsForBucket(final List PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) .collect(Collectors.toList())); + + mostRecentLastModifiedTimestamp = getMostRecentLastModifiedTimestamp(listObjectsV2Response, mostRecentLastModifiedTimestamp); } while (listObjectsV2Response.isTruncated()); + globalStateMap.put(bucket, Objects.nonNull(mostRecentLastModifiedTimestamp) ? mostRecentLastModifiedTimestamp.toString() : null); return allPartitionIdentifiers; } @@ -106,9 +135,78 @@ private LocalDateTime instantToLocalDateTime(final Instant instant) { private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime, final LocalDateTime startDateTime, final LocalDateTime endDateTime){ - if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime)) { + if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime) || Objects.nonNull(schedulingOptions)) { return true; } return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime); } + + private void initializeGlobalStateMap(final Map globalStateMap) { + globalStateMap.put(SCAN_COUNT, 0); + } + + private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final String bucketName, + final S3Object s3Object, + final Map globalStateMap) { + if (!globalStateMap.containsKey(bucketName) || Objects.isNull(globalStateMap.get(bucketName))) { + return true; + } + + final Instant lastProcessedObjectTimestamp = Instant.parse((String) globalStateMap.get(bucketName)); + + return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp) > 0; + } + + private Instant getMostRecentLastModifiedTimestamp(final ListObjectsV2Response listObjectsV2Response, + Instant mostRecentLastModifiedTimestamp) { + + if (Objects.isNull(schedulingOptions)) { + return null; + } + + for (final S3Object s3Object : listObjectsV2Response.contents()) { + if (Objects.isNull(mostRecentLastModifiedTimestamp) || s3Object.lastModified().isAfter(mostRecentLastModifiedTimestamp)) { + mostRecentLastModifiedTimestamp = s3Object.lastModified(); + } + } + + return mostRecentLastModifiedTimestamp; + } + + private boolean shouldScanBeSkipped(final Map globalStateMap) { + if (Objects.isNull(schedulingOptions) && hasAlreadyBeenScanned(globalStateMap)) { + LOG.info("Skipping scan because the buckets have already been scanned once"); + return true; + } + + if (Objects.nonNull(schedulingOptions) && + (hasReachedMaxScanCount(globalStateMap) || !hasReachedScheduledScanTime(globalStateMap))) { + + if (hasReachedMaxScanCount(globalStateMap)) { + LOG.info("Skipping scan as the max scan count {} has been reached", schedulingOptions.getCount()); + } else { + LOG.info("Skipping scan as the interval of {} seconds has not been reached yet", schedulingOptions.getInterval().toSeconds()); + } + + return true; + } + + return false; + } + + private boolean hasAlreadyBeenScanned(final Map globalStateMap) { + return (Integer) globalStateMap.get(SCAN_COUNT) > 0; + } + + private boolean hasReachedMaxScanCount(final Map globalStateMap) { + return (Integer) globalStateMap.get(SCAN_COUNT) >= schedulingOptions.getCount(); + } + + private boolean hasReachedScheduledScanTime(final Map globalStateMap) { + if (!globalStateMap.containsKey(LAST_SCAN_TIME)) { + return true; + } + + return Instant.now().minus(schedulingOptions.getInterval()).isAfter(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME))); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index 2b1d8388aa..1257efeaf4 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -93,7 +93,7 @@ public ScanObjectWorker(final S3Client s3Client, acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); this.sourceCoordinator.initialize(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions); } @Override diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java index c840b82907..323b0480f4 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; @@ -30,7 +29,6 @@ public class S3ScanBucketOption { @JsonProperty("end_time") private LocalDateTime endTime; - @JsonDeserialize(using = DurationDeserializer.class) @JsonProperty("range") private Duration range; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java index 6948597119..3db6abb179 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer; +import jakarta.validation.Valid; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; @@ -20,7 +20,7 @@ * Class consists the scan options list bucket configuration properties. */ public class S3ScanScanOptions { - @JsonDeserialize(using = DurationDeserializer.class) + @JsonProperty("range") private Duration range; @@ -33,16 +33,23 @@ public class S3ScanScanOptions { private LocalDateTime endTime; @JsonProperty("buckets") + @Valid private List buckets; @JsonProperty("scheduling") - private S3ScanSchedulingOptions schedulingOptions = new S3ScanSchedulingOptions(); + @Valid + private S3ScanSchedulingOptions schedulingOptions; @AssertTrue(message = "At most two options from start_time, end_time and range can be specified at the same time") public boolean hasValidTimeOptions() { return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3; } + @AssertTrue(message = "start_time, end_time, and range are not valid options when using scheduling with s3 scan") + public boolean hasValidTimeOptionsWithScheduling() { + return !Objects.nonNull(schedulingOptions) || Stream.of(startTime, endTime, range).noneMatch(Objects::nonNull); + } + public Duration getRange() { return range; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java index c474675818..604b8debc2 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java @@ -7,16 +7,23 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; import java.time.Duration; public class S3ScanSchedulingOptions { + @JsonProperty("interval") - private Duration interval = Duration.ofHours(8); + @NotNull + @DurationMin(seconds = 30L, message = "S3 scan interval must be at least 30 seconds") + @DurationMax(days = 365L, message = "S3 scan interval must be less than or equal to 365 days") + private Duration interval; - @Min(1) + @Min(2) @JsonProperty("count") - private int count = 1; + private int count = Integer.MAX_VALUE; public Duration getInterval() { return interval; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java index d3c557aa06..40a9501766 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java @@ -15,17 +15,20 @@ import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; +import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +42,10 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opensearch.dataprepper.plugins.source.S3ScanPartitionCreationSupplier.LAST_SCAN_TIME; +import static org.opensearch.dataprepper.plugins.source.S3ScanPartitionCreationSupplier.SCAN_COUNT; @ExtendWith(MockitoExtension.class) public class S3ScanPartitionCreationSupplierTest { @@ -51,6 +58,8 @@ public class S3ScanPartitionCreationSupplierTest { private List scanOptionsList; + private S3ScanSchedulingOptions schedulingOptions; + @BeforeEach void setup() { scanOptionsList = new ArrayList<>(); @@ -58,11 +67,12 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions); } @Test - void getNextPartition_supplier_returns_expected_PartitionIdentifiers() { + void getNextPartition_supplier_without_scheduling_options_returns_expected_PartitionIdentifiers() { + schedulingOptions = null; final String firstBucket = UUID.randomUUID().toString(); final String secondBucket = UUID.randomUUID().toString(); @@ -130,11 +140,137 @@ void getNextPartition_supplier_returns_expected_PartitionIdentifiers() { final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); - final List resultingPartitions = partitionCreationSupplier.apply(new HashMap<>()); + final Map globalStateMap = new HashMap<>(); + final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); + + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); assertThat(resultingPartitions, notNullValue()); assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); } + + @Test + void getNextPartition_supplier_with_scheduling_options_returns_expected_PartitionIdentifiers() { + schedulingOptions = mock(S3ScanSchedulingOptions.class); + given(schedulingOptions.getInterval()).willReturn(Duration.ofMillis(0)); + given(schedulingOptions.getCount()).willReturn(2); + + final String firstBucket = "bucket-one"; + final String secondBucket = "bucket-two"; + + final ScanOptions firstBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption); + given(firstBucketScanBucketOption.getName()).willReturn(firstBucket); + given(firstBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(firstBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(firstBucketScanBucketOption.getS3ScanFilter()).willReturn(firstBucketScanKeyPath); + given(firstBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(List.of(UUID.randomUUID().toString())); + given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid")); + scanOptionsList.add(firstBucketScanOptions); + + final ScanOptions secondBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption secondBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(secondBucketScanOptions.getBucketOption()).willReturn(secondBucketScanBucketOption); + given(secondBucketScanBucketOption.getName()).willReturn(secondBucket); + given(secondBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(secondBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(secondBucketScanBucketOption.getS3ScanFilter()).willReturn(secondBucketScanKeyPath); + given(secondBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(null); + given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null); + scanOptionsList.add(secondBucketScanOptions); + + final Function, List> partitionCreationSupplier = createObjectUnderTest(); + + final List expectedPartitionIdentifiers = new ArrayList<>(); + + final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); + final List s3ObjectsList = new ArrayList<>(); + + final S3Object invalidFolderObject = mock(S3Object.class); + given(invalidFolderObject.key()).willReturn("folder-key/"); + given(invalidFolderObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidFolderObject); + + final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); + given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidForFirstBucketSuffixObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + + final Instant mostRecentFirstScan = Instant.now().plusSeconds(1); + final S3Object validObject = mock(S3Object.class); + given(validObject.key()).willReturn("valid"); + given(validObject.lastModified()).willReturn(mostRecentFirstScan); + s3ObjectsList.add(validObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + + final S3Object secondScanObject = mock(S3Object.class); + final Instant mostRecentSecondScan = Instant.now().plusSeconds(10); + given(secondScanObject.key()).willReturn("second-scan"); + given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan); + + final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + + final List secondScanObjects = new ArrayList<>(s3ObjectsList); + secondScanObjects.add(secondScanObject); + given(listObjectsResponse.contents()) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects); + + final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + + final Map globalStateMap = new HashMap<>(); + final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + + assertThat(resultingPartitions, notNullValue()); + assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(globalStateMap.get(firstBucket), equalTo(mostRecentFirstScan.toString())); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(globalStateMap.get(secondBucket), equalTo(mostRecentFirstScan.toString())); + + final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); + assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); + assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(2)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(globalStateMap.get(firstBucket), equalTo(mostRecentSecondScan.toString())); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(globalStateMap.get(secondBucket), equalTo(mostRecentSecondScan.toString())); + assertThat(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME)).isBefore(Instant.now()), equalTo(true)); + + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + + verify(listObjectsResponse, times(8)).contents(); + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java index e0c7890520..f0e31de164 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import org.junit.Test; -import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -25,7 +24,6 @@ public class S3ScanScanOptionsTest { @Test public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonProcessingException { final String scanYaml = " start_time: 2023-01-21T18:00:00\n" + - " range: P90DT3H4M\n" + " end_time: 2023-04-21T18:00:00\n" + " buckets:\n" + " - bucket:\n" + @@ -38,7 +36,6 @@ public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonP final S3ScanScanOptions s3ScanScanOptions = objectMapper.readValue(scanYaml, S3ScanScanOptions.class); assertThat(s3ScanScanOptions.getStartTime(),equalTo(LocalDateTime.parse("2023-01-21T18:00:00"))); assertThat(s3ScanScanOptions.getEndTime(),equalTo(LocalDateTime.parse("2023-04-21T18:00:00"))); - assertThat(s3ScanScanOptions.getRange(),equalTo(Duration.parse("P90DT3H4M"))); assertThat(s3ScanScanOptions.getBuckets(),instanceOf(List.class)); assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getName(),equalTo("test-s3-source-test-output")); assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class)); From f3f63b56c4843f3d7c373032457b41bc81317681 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 11 Aug 2023 10:58:22 -0500 Subject: [PATCH 02/10] Adds S3 sink compression. Resolves #3130. (#3138) Signed-off-by: David Venable --- data-prepper-plugins/s3-sink/build.gradle | 8 +- .../plugins/sink/s3/S3SinkServiceIT.java | 80 ++++++++-- .../dataprepper/plugins/sink/s3/S3Sink.java | 9 +- .../plugins/sink/s3/S3SinkConfig.java | 8 + .../plugins/sink/s3/accumulator/Buffer.java | 5 +- .../s3/accumulator/CompressionBuffer.java | 66 +++++++++ .../accumulator/CompressionBufferFactory.java | 25 ++++ .../sink/s3/accumulator/InMemoryBuffer.java | 22 --- .../sink/s3/accumulator/LocalFileBuffer.java | 20 --- .../s3/compression/CompressionEngine.java | 13 ++ .../s3/compression/CompressionOption.java | 41 +++++ .../s3/compression/GZipCompressionEngine.java | 18 +++ .../s3/compression/NoneCompressionEngine.java | 15 ++ .../plugins/sink/s3/S3SinkTest.java | 2 + .../CompressionBufferFactoryTest.java | 73 +++++++++ .../s3/accumulator/CompressionBufferTest.java | 140 ++++++++++++++++++ .../GZipCompressionEngineTest.java | 60 ++++++++ .../NoneCompressionEngineTest.java | 38 +++++ 18 files changed, 578 insertions(+), 65 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 00bc6d0f11..5d74fd169d 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -23,8 +23,10 @@ dependencies { implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21' implementation libs.commons.lang3 testImplementation project(':data-prepper-test-common') - implementation project(':data-prepper-plugins:parquet-codecs') - implementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:parquet-codecs') + testImplementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:csv-processor') + testImplementation project(':data-prepper-plugins:avro-codecs') } test { @@ -55,7 +57,7 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket') - systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region') + systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 7134dc47fc..d679663f11 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -5,10 +5,13 @@ package org.opensearch.dataprepper.plugins.sink.s3; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; @@ -24,9 +27,9 @@ import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -43,13 +46,16 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -66,6 +72,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.time.Duration; @@ -129,7 +136,7 @@ class S3SinkServiceIT { @BeforeEach public void setUp() { - s3region = System.getProperty("tests.s3ink.region"); + s3region = System.getProperty("tests.s3sink.region"); s3Client = S3Client.builder().region(Region.of(s3region)).build(); bucketName = System.getProperty("tests.s3sink.bucket"); @@ -168,19 +175,65 @@ void configureNewLineCodec() { } @Test - void verify_flushed_records_into_s3_bucketNewLine() { + void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException { configureNewLineCodec(); S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = setEventQueue(); s3SinkService.output(recordsData); - String objectData = getS3Object(); + String objectData = new String(getS3Object()); + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); int count = 0; - String[] objectDataArr = objectData.split("\r\n"); + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + for (Record recordData : recordsData) { - String objectRecord = recordData.getData().toJsonString(); - assertThat(objectDataArr[count], CoreMatchers.containsString(objectRecord)); + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); + count++; + } + } + + @Test + void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException { + configureNewLineCodec(); + bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine()); + S3SinkService s3SinkService = createObjectUnderTest(); + Collection> recordsData = setEventQueue(); + + s3SinkService.output(recordsData); + byte[] s3ObjectBytes = getS3Object(); + + ByteArrayInputStream s3ObjectInputStream = new ByteArrayInputStream(s3ObjectBytes); + InputStream decompressingInputStream = new GZipDecompressionEngine().createInputStream(s3ObjectInputStream); + + String objectData = IOUtils.toString(decompressingInputStream, Charset.defaultCharset()); + + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); + int count = 0; + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + + for (Record recordData : recordsData) { + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); count++; } } @@ -202,7 +255,7 @@ private int gets3ObjectCount() { return s3ObjectCount; } - private String getS3Object() { + private byte[] getS3Object() { ListObjectsRequest listObjects = ListObjectsRequest.builder() .bucket(bucketName) @@ -220,8 +273,7 @@ private String getS3Object() { .bucket(bucketName).build(); ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); - byte[] data = objectBytes.asByteArray(); - return new String(data); + return objectBytes.asByteArray(); } private String getPathPrefix() { @@ -240,20 +292,19 @@ private static Record createRecord() { final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder(). withEventType(EventType.LOG.toString()). withTags(testTags).build(); - Map json = generateJson(testTags); + Map json = generateJson(); final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build(); event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } - private static Map generateJson(Set testTags) { + private static Map generateJson() { final Map jsonObject = new LinkedHashMap<>(); for (int i = 0; i < 2; i++) { jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); - jsonObject.put("Tag", testTags.toArray()); return jsonObject; } @@ -280,6 +331,7 @@ private static List generateRecords(int numberOfRecords) { } @Test + @Disabled void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { configureParquetCodec(); S3SinkService s3SinkService = createObjectUnderTest(); @@ -287,7 +339,7 @@ void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { s3SinkService.output(recordsData); - List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object().getBytes())); + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object())); int index = 0; for (final HashMap actualMap : actualRecords) { assertThat(actualMap, notNullValue()); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 11aa67637d..c880a72464 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -21,8 +21,10 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.LocalFileBufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -64,11 +66,14 @@ public S3Sink(final PluginSetting pluginSetting, codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); sinkInitialized = Boolean.FALSE; + final BufferFactory innerBufferFactory; if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { - bufferFactory = new LocalFileBufferFactory(); + innerBufferFactory = new LocalFileBufferFactory(); } else { - bufferFactory = new InMemoryBufferFactory(); + innerBufferFactory = new InMemoryBufferFactory(); } + final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine(); + bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine); final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, OutputCodecContext.fromSinkContext(sinkContext), s3Client, pluginMetrics); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index 1b18994f66..e39856cb12 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -11,6 +11,7 @@ import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -36,6 +37,9 @@ public class S3SinkConfig { @JsonProperty("object_key") private ObjectKeyOptions objectKeyOptions; + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("threshold") @NotNull private ThresholdOptions thresholdOptions; @@ -118,4 +122,8 @@ public int getMaxConnectionRetries() { public int getMaxUploadRetries() { return maxUploadRetries; } + + public CompressionOption getCompression() { + return compression; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java index b90775ed47..afd695db2b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import software.amazon.awssdk.services.s3.S3Client; -import java.io.IOException; import java.io.OutputStream; /** @@ -22,11 +21,9 @@ public interface Buffer { int getEventCount(); long getDuration(); - boolean isCodecStarted(); - void setCodecStarted(boolean codecStarted); void flushToS3(S3Client s3Client, String bucket, String key) ; - void writeEvent(byte[] bytes) throws IOException; + OutputStream getOutputStream(); void setEventCount(int eventCount); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java new file mode 100644 index 0000000000..440c030ac0 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +class CompressionBuffer implements Buffer { + private final Buffer innerBuffer; + private final CompressionEngine compressionEngine; + private volatile OutputStream outputStream; + + CompressionBuffer(final Buffer innerBuffer, final CompressionEngine compressionEngine) { + this.innerBuffer = Objects.requireNonNull(innerBuffer); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public long getSize() { + return innerBuffer.getSize(); + } + + @Override + public int getEventCount() { + return innerBuffer.getEventCount(); + } + + @Override + public long getDuration() { + return innerBuffer.getDuration(); + } + + @Override + public void flushToS3(final S3Client s3Client, final String bucket, final String key) { + innerBuffer.flushToS3(s3Client, bucket, key); + } + + @Override + public OutputStream getOutputStream() { + if(outputStream == null) { + synchronized (this) { + if(outputStream == null) { + final OutputStream innerBufferOutputStream = innerBuffer.getOutputStream(); + try { + outputStream = compressionEngine.createOutputStream(innerBufferOutputStream); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + } + return outputStream; + } + + @Override + public void setEventCount(final int eventCount) { + innerBuffer.setEventCount(eventCount); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java new file mode 100644 index 0000000000..5dcb652f0f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import java.util.Objects; + +public class CompressionBufferFactory implements BufferFactory { + private final BufferFactory innerBufferFactory; + private final CompressionEngine compressionEngine; + + public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine) { + this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public Buffer getBuffer() { + return new CompressionBuffer(innerBufferFactory.getBuffer(), compressionEngine); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index ea1f3bc697..58121912d7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -10,7 +10,6 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.TimeUnit; @@ -61,27 +60,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { RequestBody.fromBytes(byteArray)); } - /** - * write byte array to output stream. - * - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - byteArrayOutputStream.write(bytes); - byteArrayOutputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index 733a2b86fa..52b6229d92 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -77,18 +77,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { removeTemporaryFile(); } - /** - * write byte array to output stream. - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - outputStream.write(bytes); - outputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - /** * Flushing the buffered data into the output stream. */ @@ -113,15 +101,7 @@ protected void removeTemporaryFile() { } } } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java new file mode 100644 index 0000000000..46ffc503ad --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CompressionEngine { + OutputStream createOutputStream(OutputStream outputStream) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java new file mode 100644 index 0000000000..7e759909d5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public enum CompressionOption { + NONE("none", NoneCompressionEngine::new), + GZIP("gzip", GZipCompressionEngine::new); + + private static final Map OPTIONS_MAP = Arrays.stream(CompressionOption.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + private final Supplier compressionEngineSupplier; + + CompressionOption(final String option, final Supplier compressionEngineSupplier) { + this.option = option.toLowerCase(); + this.compressionEngineSupplier = compressionEngineSupplier; + } + + public CompressionEngine getCompressionEngine() { + return compressionEngineSupplier.get(); + } + + @JsonCreator + public static CompressionOption fromOptionValue(final String option) { + return OPTIONS_MAP.get(option.toLowerCase()); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java new file mode 100644 index 0000000000..f59956a8ed --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class GZipCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) throws IOException { + return new GzipCompressorOutputStream(outputStream); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java new file mode 100644 index 0000000000..9c852b4f85 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.OutputStream; + +public class NoneCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) { + return outputStream; + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 61d27cecae..75ae2dde1c 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -70,6 +71,7 @@ void setUp() { when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(MAX_EVENTS); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(MAXIMUM_SIZE)); when(s3SinkConfig.getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(MAX_RETRIES)); + when(s3SinkConfig.getCompression()).thenReturn(CompressionOption.NONE); when(objectKeyOptions.getNamePattern()).thenReturn(OBJECT_KEY_NAME_PATTERN); when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java new file mode 100644 index 0000000000..a27798f3df --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferFactoryTest { + @Mock + private BufferFactory innerBufferFactory; + + @Mock + private CompressionEngine compressionEngine; + + private CompressionBufferFactory createObjectUnderTest() { + return new CompressionBufferFactory(innerBufferFactory, compressionEngine); + } + + @Test + void constructor_throws_if_inner_BufferFactory_is_null() { + innerBufferFactory = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_CompressionEngine_is_null() { + compressionEngine = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class WithBuffer { + @Mock + private Buffer innerBuffer; + + @BeforeEach + void setUp() { + when(innerBufferFactory.getBuffer()).thenReturn(innerBuffer); + } + + @Test + void getBuffer_returns_CompressionBuffer() { + final Buffer buffer = createObjectUnderTest().getBuffer(); + assertThat(buffer, instanceOf(CompressionBuffer.class)); + } + + @Test + void getBuffer_returns_new_on_each_call() { + final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); + final Buffer firstBuffer = objectUnderTest.getBuffer(); + + assertThat(objectUnderTest.getBuffer(), not(equalTo(firstBuffer))); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java new file mode 100644 index 0000000000..3a7055414b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferTest { + @Mock + private Buffer innerBuffer; + + @Mock + private CompressionEngine compressionEngine; + + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + private CompressionBuffer createObjectUnderTest() { + return new CompressionBuffer(innerBuffer, compressionEngine); + } + + @Test + void constructor_throws_if_innerBuffer_is_null() { + innerBuffer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_compressionEngine_is_null() { + compressionEngine = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void getSize_returns_inner_getSize() { + final long size = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getSize()).thenReturn(size); + + assertThat(objectUnderTest.getSize(), equalTo(size)); + } + + @Test + void getEventCount_returns_inner_getEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getEventCount()).thenReturn(eventCount); + + assertThat(objectUnderTest.getEventCount(), equalTo(eventCount)); + } + + @Test + void getDuration_returns_inner_getDuration() { + final long duration = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getDuration()).thenReturn(duration); + + assertThat(objectUnderTest.getDuration(), equalTo(duration)); + } + + @Test + void flushToS3_calls_inner_flushToS3() { + final S3Client s3Client = mock(S3Client.class); + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + + createObjectUnderTest().flushToS3(s3Client, bucket, key); + + verify(innerBuffer).flushToS3(s3Client, bucket, key); + } + + @Test + void getOutputStream_returns_outputStream_via_CompressionEngine() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final OutputStream actualOutputStream = createObjectUnderTest().getOutputStream(); + + + assertThat(actualOutputStream, sameInstance(compressionEngineOutputStream)); + } + + @Test + void getOutputStream_wraps_OutputStream_only_once() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + final OutputStream outputStream = objectUnderTest.getOutputStream(); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + + verify(compressionEngine, times(1)).createOutputStream(any()); + } + + @Test + void setEventCount_calls_inner_setEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + createObjectUnderTest().setEventCount(eventCount); + + verify(innerBuffer).setEventCount(eventCount); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java new file mode 100644 index 0000000000..a92930e958 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class GZipCompressionEngineTest { + + private GZipCompressionEngine createObjectUnderTest() { + return new GZipCompressionEngine(); + } + + @Test + void createOutputStream_should_return_GzipCompressorOutputStream() throws IOException { + final OutputStream innerOutputStream = mock(OutputStream.class); + final OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, instanceOf(GzipCompressorOutputStream.class)); + } + + @Test + void createOutputStream_should_write_compressed_data() throws IOException { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + final OutputStream outputStream = createObjectUnderTest().createOutputStream(byteArrayOutputStream); + + final byte[] inputBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); + outputStream.close(); + + final byte[] writtenBytes = byteArrayOutputStream.toByteArray(); + + assertTrue(GzipCompressorInputStream.matches(writtenBytes, 2)); + + final ByteArrayInputStream verificationInputStream = new ByteArrayInputStream(writtenBytes); + + final GzipCompressorInputStream uncompressingInputStream = new GzipCompressorInputStream(verificationInputStream); + final byte[] uncompressedBytes = uncompressingInputStream.readAllBytes(); + assertThat(uncompressedBytes, equalTo(inputBytes)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java new file mode 100644 index 0000000000..17c581b0c7 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.OutputStream; + +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; + +class NoneCompressionEngineTest { + + private OutputStream innerOutputStream; + + @BeforeEach + void setUp() { + innerOutputStream = mock(OutputStream.class); + } + + private NoneCompressionEngine createObjectUnderTest() { + return new NoneCompressionEngine(); + } + + @Test + void createOutputStream_returns_innerOutputStream() { + OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, sameInstance(innerOutputStream)); + verifyNoInteractions(innerOutputStream); + } +} \ No newline at end of file From 675c2fa73cfa1898579eb191fa2b54134d6388b4 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Fri, 11 Aug 2023 11:14:43 -0500 Subject: [PATCH 03/10] Add default value to cardinality keys (#3144) Signed-off-by: Jonah Calvo --- .../anomalydetector/AnomalyDetectorProcessorConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java index c92fdb9000..7e796e660a 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import java.util.Collections; import java.util.List; public class AnomalyDetectorProcessorConfig { @@ -22,7 +23,7 @@ public class AnomalyDetectorProcessorConfig { private List keys; @JsonProperty("identification_keys") - private List identificationKeys; + private List identificationKeys = Collections.emptyList(); @JsonProperty("verbose") private Boolean verbose = false; From a5c4fe2255eeef4939e26d3fb21f286ab6c86462 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Fri, 11 Aug 2023 10:28:08 -0700 Subject: [PATCH 04/10] Glue registry fixes. Fixed a bug in getMSKBootstrapServers (#3142) * Glue registry fixes. Fixed a bug in getMSKBootstrapServers Signed-off-by: Krishna Kondaka * Changed default auto commit reset to earliest Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../kafka/configuration/TopicConfig.java | 2 +- .../consumer/KafkaSourceCustomConsumer.java | 2 +- .../plugins/kafka/source/KafkaSource.java | 21 ++++++------- .../util/KafkaSourceSecurityConfigurer.java | 30 +++++++++++++++++-- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index d613db3cf5..e8244098d6 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -22,7 +22,7 @@ public class TopicConfig { static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; - static final String DEFAULT_AUTO_OFFSET_RESET = "latest"; + static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index d083b4e98b..805cfb6497 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -162,7 +162,7 @@ public void consumeRecords() throws Exception { Thread.sleep(10000); } catch (RecordDeserializationException e) { LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record", - e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e); topicMetrics.getNumberOfDeserializationErrors().increment(); consumer.seek(e.topicPartition(), e.offset()+1); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 92cf2527f8..b0c752c200 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -48,8 +48,6 @@ import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +98,8 @@ public class KafkaSource implements Source> { private static final String SCHEMA_TYPE = "schemaType"; private final AcknowledgementSetManager acknowledgementSetManager; private static CachedSchemaRegistryClient schemaRegistryClient; + private GlueSchemaRegistryKafkaDeserializer glueDeserializer; + private StringDeserializer stringDeserializer; @DataPrepperPluginConstructor public KafkaSource(final KafkaSourceConfig sourceConfig, @@ -110,13 +110,14 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.pipelineName = pipelineDescription.getPipelineName(); + this.stringDeserializer = new StringDeserializer(); shutdownInProgress = new AtomicBoolean(false); } @Override public void start(Buffer> buffer) { Properties authProperties = new Properties(); - KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + glueDeserializer = KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); @@ -135,7 +136,11 @@ public void start(Buffer> buffer) { break; case PLAINTEXT: default: - kafkaConsumer = new KafkaConsumer(consumerProperties); + if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { + kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); + } else { + kafkaConsumer = new KafkaConsumer(consumerProperties); + } break; } consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); @@ -296,7 +301,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi } if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) { - setPropertiesForGlueSchemaRegistry(properties); return; } @@ -309,13 +313,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi } } - private void setPropertiesForGlueSchemaRegistry(Properties properties) { - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); - properties.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); - properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); - } - private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) { MessageFormat dataFormat = topicConfig.getSerdeFormat(); schemaType = dataFormat.toString(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index 4a6aaf30da..e9e612266f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.apache.kafka.clients.consumer.ConsumerConfig; import software.amazon.awssdk.services.kafka.KafkaClient; @@ -25,9 +26,17 @@ import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.regions.Region; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import software.amazon.awssdk.services.glue.model.Compatibility; + import org.slf4j.Logger; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -73,6 +82,8 @@ public class KafkaSourceSecurityConfigurer { private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds + private static AwsCredentialsProvider credentialsProvider; + /*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig, final Properties properties) { @@ -173,7 +184,6 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu } public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig, final Logger LOG) { - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { String sessionName = "data-prepper-kafka-session" + UUID.randomUUID(); StsClient stsClient = StsClient.builder() @@ -216,10 +226,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth try { Thread.sleep(10000); } catch (InterruptedException exp) {} + retryable = true; } catch (Exception e) { throw new RuntimeException("Failed to get bootstrap server information from MSK.", e); } - } while (numRetries++ < MAX_KAFKA_CLIENT_RETRIES); + } while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES); if (Objects.isNull(result)) { throw new RuntimeException("Failed to get bootstrap server information from MSK after trying multiple times with retryable exceptions."); } @@ -234,11 +245,14 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth } } - public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { + public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { final AwsConfig awsConfig = sourceConfig.getAwsConfig(); final AuthConfig authConfig = sourceConfig.getAuthConfig(); final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig(); final EncryptionType encryptionType = encryptionConfig.getType(); + GlueSchemaRegistryKafkaDeserializer glueDeserializer = null; + + credentialsProvider = DefaultCredentialsProvider.create(); String bootstrapServers = sourceConfig.getBootStrapServers(); AwsIamAuthConfig awsIamAuthConfig = null; @@ -269,6 +283,15 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); } } + if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { + Map configs = new HashMap(); + configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); + configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); + configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); + configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); + glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); + } if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) { if (encryptionType == EncryptionType.SSL) { properties.put(SECURITY_PROTOCOL, "SSL"); @@ -278,6 +301,7 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon throw new RuntimeException("Bootstrap servers are not specified"); } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return glueDeserializer; } } From 28fdf903b791ec7365e5783022b859a7910040eb Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Fri, 11 Aug 2023 18:35:17 -0700 Subject: [PATCH 05/10] Fix Null pointer exception when schema registry not specified (#3147) * Fix Null pointer exception when schema registry not specified Signed-off-by: Krishna Kondaka * Fix failing test cases Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../plugins/kafka/source/KafkaSource.java | 5 ++-- .../util/KafkaSourceSecurityConfigurer.java | 29 ++++++++++++------- .../plugins/kafka/source/KafkaSourceTest.java | 27 +++++++++++++++++ 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index b0c752c200..a388d3ee6e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -117,7 +117,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { Properties authProperties = new Properties(); - glueDeserializer = KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics); @@ -136,7 +136,8 @@ public void start(Buffer> buffer) { break; case PLAINTEXT: default: - if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { + glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); + if (Objects.nonNull(glueDeserializer)) { kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); } else { kafkaConsumer = new KafkaConsumer(consumerProperties); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index e9e612266f..77fcd6e2fc 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; @@ -83,6 +84,7 @@ public class KafkaSourceSecurityConfigurer { private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds private static AwsCredentialsProvider credentialsProvider; + private static GlueSchemaRegistryKafkaDeserializer glueDeserializer; /*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig, @@ -245,12 +247,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth } } - public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { + public static void setAuthProperties(Properties properties, final KafkaSourceConfig sourceConfig, final Logger LOG) { final AwsConfig awsConfig = sourceConfig.getAwsConfig(); final AuthConfig authConfig = sourceConfig.getAuthConfig(); final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig(); final EncryptionType encryptionType = encryptionConfig.getType(); - GlueSchemaRegistryKafkaDeserializer glueDeserializer = null; credentialsProvider = DefaultCredentialsProvider.create(); @@ -283,15 +284,6 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p properties.put("ssl.engine.factory.class", InsecureSslEngineFactory.class); } } - if (sourceConfig.getSchemaConfig().getType() == SchemaRegistryType.AWS_GLUE) { - Map configs = new HashMap(); - configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); - configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); - configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); - configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); - configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); - glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); - } if (Objects.isNull(authConfig) || Objects.isNull(authConfig.getSaslAuthConfig())) { if (encryptionType == EncryptionType.SSL) { properties.put(SECURITY_PROTOCOL, "SSL"); @@ -301,7 +293,22 @@ public static GlueSchemaRegistryKafkaDeserializer setAuthProperties(Properties p throw new RuntimeException("Bootstrap servers are not specified"); } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaSourceConfig sourceConfig) { + SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); + if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { + return null; + } + Map configs = new HashMap(); + configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); + configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); + configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); + configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); + glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); return glueDeserializer; } + } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index be868b3e6f..05843ed1a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.junit.jupiter.api.Assertions; @@ -25,8 +27,10 @@ import org.mockito.quality.Strictness; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Objects; import java.time.Duration; @ExtendWith(MockitoExtension.class) @@ -37,6 +41,9 @@ class KafkaSourceTest { @Mock private KafkaSourceConfig sourceConfig; + @Mock + private KafkaSourceConfig.EncryptionConfig encryptionConfig; + @Mock private PluginMetrics pluginMetrics; @@ -64,6 +71,7 @@ public KafkaSource createObjectUnderTest() { @BeforeEach void setUp() throws Exception { sourceConfig = mock(KafkaSourceConfig.class); + encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); pipelineDescription = mock(PipelineDescription.class); pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); @@ -79,12 +87,21 @@ void setUp() throws Exception { when(topic2.getConsumerMaxPollRecords()).thenReturn(1); when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); when(topic1.getAutoCommit()).thenReturn(false); + when(topic1.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topic2.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topic2.getAutoCommit()).thenReturn(false); when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(sourceConfig.getBootStrapServers()).thenReturn("http://localhost:1234"); when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); + when(sourceConfig.getSchemaConfig()).thenReturn(null); + when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); } /* @Test @@ -108,4 +125,14 @@ void test_kafkaSource_start_execution_exception() { kafkaSource = createObjectUnderTest(); Assertions.assertThrows(Exception.class, () -> kafkaSource.start(buffer)); } + + @Test + void test_kafkaSource_basicFunctionality() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + kafkaSource = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaSource)); + kafkaSource.start(buffer); + assertTrue(Objects.nonNull(kafkaSource.getConsumer())); + } } From 754839062318912b5e74b46cab1767af5e4e4c4c Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Mon, 14 Aug 2023 13:10:21 -0500 Subject: [PATCH 06/10] Fix flakey test caused by RCF variance. Update metric for RCF Instances (#3145) * Fix flakey test caused by RCF variance Signed-off-by: Jonah Calvo * Change metric name and type. Update test readability Signed-off-by: Jonah Calvo * Fix unit test to account for metric change Signed-off-by: Jonah Calvo --------- Signed-off-by: Jonah Calvo --- .../anomalydetector/AnomalyDetectorProcessor.java | 11 ++++++----- .../AnomalyDetectorProcessorTests.java | 5 +++-- .../modes/RandomCutForestModeTests.java | 6 +++++- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java index 57872c7ecd..0eb33f979f 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.anomalydetector; -import io.micrometer.core.instrument.Counter; + import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -23,19 +23,20 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; @DataPrepperPlugin(name = "anomaly_detector", pluginType = Processor.class, pluginConfigurationType = AnomalyDetectorProcessorConfig.class) public class AnomalyDetectorProcessor extends AbstractProcessor, Record> { public static final String DEVIATION_KEY = "deviation_from_expected"; public static final String GRADE_KEY = "grade"; - static final String NUMBER_RCF_INSTANCES = "numberRCFInstances"; + static final String NUMBER_RCF_INSTANCES = "RCFInstances"; private final Boolean verbose; private final IdentificationKeysHasher identificationKeysHasher; - private final Counter numberRCFInstances; private final List keys; private final PluginFactory pluginFactory; private final HashMap forestMap; + private final AtomicInteger cardinality; private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig; @DataPrepperPluginConstructor @@ -44,9 +45,9 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete this.identificationKeysHasher = new IdentificationKeysHasher(anomalyDetectorProcessorConfig.getIdentificationKeys()); this.anomalyDetectorProcessorConfig = anomalyDetectorProcessorConfig; this.pluginFactory = pluginFactory; - this.numberRCFInstances = pluginMetrics.counter(NUMBER_RCF_INSTANCES); this.keys = anomalyDetectorProcessorConfig.getKeys(); this.verbose = anomalyDetectorProcessorConfig.getVerbose(); + this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger()); forestMap = new HashMap<>(); } @@ -71,10 +72,10 @@ public Collection> doExecute(Collection> records) { forest = loadAnomalyDetectorMode(pluginFactory); forest.initialize(keys, verbose); forestMap.put(identificationKeysMap.hashCode(), forest); - this.numberRCFInstances.increment(); } recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record))); } + cardinality.set(forestMap.size()); return recordsOut; } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java index 65c39518d2..302a692dd7 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -48,7 +49,7 @@ public class AnomalyDetectorProcessorTests { @Mock private PluginMetrics pluginMetrics; @Mock - private Counter numberRCFInstances; + private AtomicInteger numberRCFInstances; @Mock private Counter recordsIn; @@ -81,7 +82,7 @@ void setUp() { when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) .thenAnswer(invocation -> new RandomCutForestMode(randomCutForestModeConfig)); - when(pluginMetrics.counter(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES)).thenReturn(numberRCFInstances); + when(pluginMetrics.gauge(eq(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES), any())).thenReturn(numberRCFInstances); when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(recordsIn); when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(recordsOut); when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed); diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java index c52788433f..818ab14d7b 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java @@ -21,6 +21,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.greaterThan; import org.junit.jupiter.api.Test; @@ -28,6 +29,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; import org.mockito.Mock; import static org.mockito.Mockito.when; @@ -205,6 +208,7 @@ void testRandomCutForestModeVerboseFalse() { } final List> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());; - assertThat(anomalyRecords.size(), equalTo(1)); + // Due to inherent variance in the RCF algorithm, 1-3 anomalies will be detected after the level shift. + assertThat(anomalyRecords.size(), both(greaterThanOrEqualTo(1)).and(lessThanOrEqualTo(3))); } } From 0f70d524c90193c98c6576c01ce440afaee82488 Mon Sep 17 00:00:00 2001 From: David Venable Date: Mon, 14 Aug 2023 16:02:01 -0500 Subject: [PATCH 07/10] Fix flaky integration test by wrapping a list in a new list to avoid a ConcurrentModificationException. Resolves #3139. (#3152) Signed-off-by: David Venable --- .../opensearch/dataprepper/plugins/InMemorySinkAccessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java index 77d659f816..21f359f361 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java @@ -34,7 +34,7 @@ public class InMemorySinkAccessor { public List> get(final String testingKey) { lock.lock(); try { - return recordsMap.getOrDefault(testingKey, Collections.emptyList()); + return new ArrayList<>(recordsMap.getOrDefault(testingKey, Collections.emptyList())); } finally { lock.unlock(); } @@ -49,7 +49,7 @@ public List> get(final String testingKey) { public List> getAndClear(final String testingKey) { lock.lock(); try { - final List> records = recordsMap.getOrDefault(testingKey, Collections.emptyList()); + final List> records = get(testingKey); recordsMap.remove(testingKey); From 60f69b3c565e7a10e6459d6bbe694d74ae6e6d96 Mon Sep 17 00:00:00 2001 From: David Venable Date: Mon, 14 Aug 2023 16:02:18 -0500 Subject: [PATCH 08/10] Fixes two flaky unit tests that have failed recently (#3150) Fixes two unit tests that have failed and are probably flaky. The ParseTreeEvaluatorListenerTest appears to be using negative values sometimes, which seems to be unsupported. The OTelLogsSourceTest test failed as well, but it appears this code may not always be executed because it was mixing Hamcrest and Mockito. Signed-off-by: David Venable --- .../ParseTreeEvaluatorListenerTest.java | 4 ++-- .../source/otellogs/OTelLogsSourceTest.java | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java index 9b517d8ae0..f0e1fdb9f4 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java @@ -193,7 +193,7 @@ void testSimpleRelationalOperatorExpressionWithInValidLiteralType() { @Test void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() { final String testKey = "testKey"; - final int testValue = random.nextInt(1000); + final int testValue = random.nextInt(1000) + 2; final Map data = Map.of(testKey, testValue); final Event testEvent = createTestEvent(data); final String greaterThanStatement = String.format(" /%s > %d", testKey, testValue - 1); @@ -207,7 +207,7 @@ void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() { } @Test - void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValue() { + void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValueWithPositiveInteger() { final String testKey = "testKey"; final boolean testValue = true; final Map data = Map.of(testKey, testValue); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index eee926a698..5543e7c21e 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -5,11 +5,6 @@ package org.opensearch.dataprepper.plugins.source.otellogs; -import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; -import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -18,11 +13,13 @@ import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -69,6 +66,9 @@ import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -89,25 +89,22 @@ import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_PORT; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doThrow; @@ -117,6 +114,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_PORT; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL; @ExtendWith(MockitoExtension.class) class OTelLogsSourceTest { @@ -800,8 +800,9 @@ private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse res .stream() .map(Map.Entry::getKey) .map(AsciiString::toString) + .map(String::toLowerCase) .collect(Collectors.toList()); - assertThat("Response Header Keys", headerKeys, not(contains("server"))); + assertThat("Response Header Keys", headerKeys, not(hasItem("server"))); } private byte[] createGZipCompressedPayload(final String payload) throws IOException { From f11d8824fdf8fc9188cd78928273f0adbc681233 Mon Sep 17 00:00:00 2001 From: Asif Sohail Mohammed Date: Tue, 15 Aug 2023 10:39:01 -0500 Subject: [PATCH 09/10] Support s3:// prefix (#3156) Support s3:// prefix Signed-off-by: Asif Sohail Mohammed --- .../dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java | 11 ++++++++--- .../plugins/dlq/s3/S3DlqWriterConfigTest.java | 9 +++++++++ .../dataprepper/plugins/sink/s3/S3SinkConfig.java | 7 ++++++- .../dataprepper/plugins/sink/s3/S3SinkConfigTest.java | 10 ++++++++++ .../source/configuration/S3ScanBucketOption.java | 9 +++++++++ .../dataprepper/plugins/source/ScanOptionsTest.java | 7 +++++-- .../source/configuration/S3ScanBucketOptionTest.java | 7 +++++++ 7 files changed, 54 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java index 8eb8e401fd..f21b109ab5 100644 --- a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.dlq.s3; import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -33,9 +33,11 @@ public class S3DlqWriterConfig { private static final String DEFAULT_AWS_REGION = "us-east-1"; private static final String AWS_IAM_ROLE = "role"; private static final String AWS_IAM = "iam"; + private static final String S3_PREFIX = "s3://"; + @JsonProperty("bucket") - @NotNull - @Size(min = 3, max = 63, message = "bucket lengthy should be between 3 and 63 characters") + @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucket; @JsonProperty("key_path_prefix") @@ -55,6 +57,9 @@ public class S3DlqWriterConfig { private String stsExternalId; public String getBucket() { + if (bucket.startsWith(S3_PREFIX)) { + return bucket.substring(S3_PREFIX.length()); + } return bucket; } diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index d1f61ae14e..0629256277 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.regions.Region; @@ -42,6 +43,14 @@ public void getS3ClientWithInvalidStsRoleArnThrowException(final String stsRoleA assertThrows(IllegalArgumentException.class, config::getS3Client); } + @ParameterizedTest + @CsvSource({"bucket-name, bucket-name", "s3://bucket-name, bucket-name"}) + public void getS3BucketNameShouldReturnCorrectBucketName(final String bucketName, final String expectedBucketName) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "bucket", bucketName); + assertThat(config.getBucket(), is(equalTo(expectedBucketName))); + } + @ParameterizedTest @NullSource @ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"}) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index e39856cb12..6124f20538 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -9,6 +9,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; @@ -20,6 +21,7 @@ * s3 sink configuration class contains properties, used to read yaml configuration. */ public class S3SinkConfig { + static final String S3_PREFIX = "s3://"; private static final int DEFAULT_CONNECTION_RETRIES = 5; private static final int DEFAULT_UPLOAD_RETRIES = 5; @@ -30,8 +32,8 @@ public class S3SinkConfig { private AwsAuthenticationOptions awsAuthenticationOptions; @JsonProperty("bucket") - @NotNull @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucketName; @JsonProperty("object_key") @@ -77,6 +79,9 @@ public ThresholdOptions getThresholdOptions() { * @return bucket name. */ public String getBucketName() { + if (bucketName.startsWith(S3_PREFIX)) { + return bucketName.substring(S3_PREFIX.length()); + } return bucketName; } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java index 297a1ef818..d1660ebc63 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java @@ -16,6 +16,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig.S3_PREFIX; class S3SinkConfigTest { @@ -45,6 +46,15 @@ void get_bucket_name_test() throws NoSuchFieldException, IllegalAccessException assertThat(objectUnderTest.getBucketName(), equalTo(bucketName)); } + @Test + void get_bucket_name_with_s3_prefix_test() throws NoSuchFieldException, IllegalAccessException { + final String bucketName = UUID.randomUUID().toString(); + final String bucketNameWithPrefix = S3_PREFIX + bucketName; + final S3SinkConfig objectUnderTest = new S3SinkConfig(); + ReflectivelySetField.setField(S3SinkConfig.class, objectUnderTest, "bucketName", bucketNameWithPrefix); + assertThat(objectUnderTest.getBucketName(), equalTo(bucketName)); + } + @Test void get_object_key_test() { assertThat("Object key is not an instance of ObjectKeyOptions", diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java index 323b0480f4..b54dab4075 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; import java.time.Duration; @@ -18,7 +20,11 @@ * Class consists the bucket related configuration properties. */ public class S3ScanBucketOption { + private static final String S3_PREFIX = "s3://"; + @JsonProperty("name") + @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String name; @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) @@ -41,6 +47,9 @@ public boolean hasValidTimeOptions() { } public String getName() { + if (name.startsWith(S3_PREFIX)) { + return name.substring(S3_PREFIX.length()); + } return name; } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java index e4d50ef9ba..804d6e1e52 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java @@ -42,12 +42,15 @@ public void s3scan_options_with_valid_global_time_range_build_success( @ParameterizedTest @MethodSource("invalidTimeRangeOptions") public void s3scan_options_with_invalid_global_time_range_throws_exception_when_build( - LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range) { + LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range) throws NoSuchFieldException, IllegalAccessException { + S3ScanBucketOption bucketOption = new S3ScanBucketOption(); + setField(S3ScanBucketOption.class, bucketOption, "name", "bucket_name"); + assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder() .setStartDateTime(startDateTime) .setEndDateTime(endDateTime) .setRange(range) - .setBucketOption(new S3ScanBucketOption()) + .setBucketOption(bucketOption) .build()); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java index 69661e6efd..629c6726e7 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java @@ -41,4 +41,11 @@ public void s3scan_bucket_options_with_scan_buckets_yaml_configuration_test() th assertThat(s3ScanBucketOption.getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class)); assertThat(s3ScanBucketOption.getS3ScanFilter().getS3ScanExcludeSuffixOptions().get(1),equalTo(".png")); } + + @Test + public void s3scan_bucket_options_with_s3_prefix_test() throws JsonProcessingException { + final String bucketOptionsYaml = "---\nname: s3://test-s3-source-test-output"; + final S3ScanBucketOption s3ScanBucketOption = objectMapper.readValue(bucketOptionsYaml, S3ScanBucketOption.class); + assertThat(s3ScanBucketOption.getName(), equalTo("test-s3-source-test-output")); + } } From 252a0dd874fcb09c7bd5bb1a4818d72c7b742d5f Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 15 Aug 2023 15:36:04 -0500 Subject: [PATCH 10/10] Add catching and logging of exceptions for s3 scan worker (#3159) Signed-off-by: Taylor Gray --- .../source/S3ScanPartitionCreationSupplier.java | 4 ++++ .../plugins/source/ScanObjectWorker.java | 14 +++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java index 731803f7cf..e1ebea9fa0 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java @@ -116,10 +116,14 @@ private List listFilteredS3ObjectsForBucket(final List PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) .collect(Collectors.toList())); + LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); + mostRecentLastModifiedTimestamp = getMostRecentLastModifiedTimestamp(listObjectsV2Response, mostRecentLastModifiedTimestamp); } while (listObjectsV2Response.isTruncated()); globalStateMap.put(bucket, Objects.nonNull(mostRecentLastModifiedTimestamp) ? mostRecentLastModifiedTimestamp.toString() : null); + + LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); return allPartitionIdentifiers; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index 1257efeaf4..39a251cddc 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -42,6 +42,7 @@ public class ScanObjectWorker implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class); private static final int STANDARD_BACKOFF_MILLIS = 30_000; + private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; @@ -99,7 +100,18 @@ public ScanObjectWorker(final S3Client s3Client, @Override public void run() { while (!shouldStopProcessing) { - startProcessingObject(STANDARD_BACKOFF_MILLIS); + + try { + startProcessingObject(STANDARD_BACKOFF_MILLIS); + } catch (final Exception e) { + LOG.error("Received an exception while processing S3 objects, backing off and retrying", e); + try { + Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); + } catch (InterruptedException ex) { + LOG.error("S3 Scan worker thread interrupted while backing off.", ex); + } + } + } }