diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 00bc6d0f11..5d74fd169d 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -23,8 +23,10 @@ dependencies { implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21' implementation libs.commons.lang3 testImplementation project(':data-prepper-test-common') - implementation project(':data-prepper-plugins:parquet-codecs') - implementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:parquet-codecs') + testImplementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:csv-processor') + testImplementation project(':data-prepper-plugins:avro-codecs') } test { @@ -55,7 +57,7 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket') - systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region') + systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 7134dc47fc..d679663f11 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -5,10 +5,13 @@ package org.opensearch.dataprepper.plugins.sink.s3; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; @@ -24,9 +27,9 @@ import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -43,13 +46,16 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -66,6 +72,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.time.Duration; @@ -129,7 +136,7 @@ class S3SinkServiceIT { @BeforeEach public void setUp() { - s3region = System.getProperty("tests.s3ink.region"); + s3region = System.getProperty("tests.s3sink.region"); s3Client = S3Client.builder().region(Region.of(s3region)).build(); bucketName = System.getProperty("tests.s3sink.bucket"); @@ -168,19 +175,65 @@ void configureNewLineCodec() { } @Test - void verify_flushed_records_into_s3_bucketNewLine() { + void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException { configureNewLineCodec(); S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = setEventQueue(); s3SinkService.output(recordsData); - String objectData = getS3Object(); + String objectData = new String(getS3Object()); + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); int count = 0; - String[] objectDataArr = objectData.split("\r\n"); + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + for (Record recordData : recordsData) { - String objectRecord = recordData.getData().toJsonString(); - assertThat(objectDataArr[count], CoreMatchers.containsString(objectRecord)); + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); + count++; + } + } + + @Test + void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException { + configureNewLineCodec(); + bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine()); + S3SinkService s3SinkService = createObjectUnderTest(); + Collection> recordsData = setEventQueue(); + + s3SinkService.output(recordsData); + byte[] s3ObjectBytes = getS3Object(); + + ByteArrayInputStream s3ObjectInputStream = new ByteArrayInputStream(s3ObjectBytes); + InputStream decompressingInputStream = new GZipDecompressionEngine().createInputStream(s3ObjectInputStream); + + String objectData = IOUtils.toString(decompressingInputStream, Charset.defaultCharset()); + + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); + int count = 0; + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + + for (Record recordData : recordsData) { + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); count++; } } @@ -202,7 +255,7 @@ private int gets3ObjectCount() { return s3ObjectCount; } - private String getS3Object() { + private byte[] getS3Object() { ListObjectsRequest listObjects = ListObjectsRequest.builder() .bucket(bucketName) @@ -220,8 +273,7 @@ private String getS3Object() { .bucket(bucketName).build(); ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); - byte[] data = objectBytes.asByteArray(); - return new String(data); + return objectBytes.asByteArray(); } private String getPathPrefix() { @@ -240,20 +292,19 @@ private static Record createRecord() { final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder(). withEventType(EventType.LOG.toString()). withTags(testTags).build(); - Map json = generateJson(testTags); + Map json = generateJson(); final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build(); event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } - private static Map generateJson(Set testTags) { + private static Map generateJson() { final Map jsonObject = new LinkedHashMap<>(); for (int i = 0; i < 2; i++) { jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); - jsonObject.put("Tag", testTags.toArray()); return jsonObject; } @@ -280,6 +331,7 @@ private static List generateRecords(int numberOfRecords) { } @Test + @Disabled void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { configureParquetCodec(); S3SinkService s3SinkService = createObjectUnderTest(); @@ -287,7 +339,7 @@ void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { s3SinkService.output(recordsData); - List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object().getBytes())); + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object())); int index = 0; for (final HashMap actualMap : actualRecords) { assertThat(actualMap, notNullValue()); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 11aa67637d..c880a72464 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -21,8 +21,10 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.LocalFileBufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -64,11 +66,14 @@ public S3Sink(final PluginSetting pluginSetting, codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); sinkInitialized = Boolean.FALSE; + final BufferFactory innerBufferFactory; if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { - bufferFactory = new LocalFileBufferFactory(); + innerBufferFactory = new LocalFileBufferFactory(); } else { - bufferFactory = new InMemoryBufferFactory(); + innerBufferFactory = new InMemoryBufferFactory(); } + final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine(); + bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine); final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, OutputCodecContext.fromSinkContext(sinkContext), s3Client, pluginMetrics); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index 1b18994f66..e39856cb12 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -11,6 +11,7 @@ import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -36,6 +37,9 @@ public class S3SinkConfig { @JsonProperty("object_key") private ObjectKeyOptions objectKeyOptions; + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("threshold") @NotNull private ThresholdOptions thresholdOptions; @@ -118,4 +122,8 @@ public int getMaxConnectionRetries() { public int getMaxUploadRetries() { return maxUploadRetries; } + + public CompressionOption getCompression() { + return compression; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java index b90775ed47..afd695db2b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import software.amazon.awssdk.services.s3.S3Client; -import java.io.IOException; import java.io.OutputStream; /** @@ -22,11 +21,9 @@ public interface Buffer { int getEventCount(); long getDuration(); - boolean isCodecStarted(); - void setCodecStarted(boolean codecStarted); void flushToS3(S3Client s3Client, String bucket, String key) ; - void writeEvent(byte[] bytes) throws IOException; + OutputStream getOutputStream(); void setEventCount(int eventCount); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java new file mode 100644 index 0000000000..440c030ac0 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +class CompressionBuffer implements Buffer { + private final Buffer innerBuffer; + private final CompressionEngine compressionEngine; + private volatile OutputStream outputStream; + + CompressionBuffer(final Buffer innerBuffer, final CompressionEngine compressionEngine) { + this.innerBuffer = Objects.requireNonNull(innerBuffer); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public long getSize() { + return innerBuffer.getSize(); + } + + @Override + public int getEventCount() { + return innerBuffer.getEventCount(); + } + + @Override + public long getDuration() { + return innerBuffer.getDuration(); + } + + @Override + public void flushToS3(final S3Client s3Client, final String bucket, final String key) { + innerBuffer.flushToS3(s3Client, bucket, key); + } + + @Override + public OutputStream getOutputStream() { + if(outputStream == null) { + synchronized (this) { + if(outputStream == null) { + final OutputStream innerBufferOutputStream = innerBuffer.getOutputStream(); + try { + outputStream = compressionEngine.createOutputStream(innerBufferOutputStream); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + } + return outputStream; + } + + @Override + public void setEventCount(final int eventCount) { + innerBuffer.setEventCount(eventCount); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java new file mode 100644 index 0000000000..5dcb652f0f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import java.util.Objects; + +public class CompressionBufferFactory implements BufferFactory { + private final BufferFactory innerBufferFactory; + private final CompressionEngine compressionEngine; + + public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine) { + this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public Buffer getBuffer() { + return new CompressionBuffer(innerBufferFactory.getBuffer(), compressionEngine); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index ea1f3bc697..58121912d7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -10,7 +10,6 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.TimeUnit; @@ -61,27 +60,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { RequestBody.fromBytes(byteArray)); } - /** - * write byte array to output stream. - * - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - byteArrayOutputStream.write(bytes); - byteArrayOutputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index 733a2b86fa..52b6229d92 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -77,18 +77,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { removeTemporaryFile(); } - /** - * write byte array to output stream. - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - outputStream.write(bytes); - outputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - /** * Flushing the buffered data into the output stream. */ @@ -113,15 +101,7 @@ protected void removeTemporaryFile() { } } } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java new file mode 100644 index 0000000000..46ffc503ad --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CompressionEngine { + OutputStream createOutputStream(OutputStream outputStream) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java new file mode 100644 index 0000000000..7e759909d5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public enum CompressionOption { + NONE("none", NoneCompressionEngine::new), + GZIP("gzip", GZipCompressionEngine::new); + + private static final Map OPTIONS_MAP = Arrays.stream(CompressionOption.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + private final Supplier compressionEngineSupplier; + + CompressionOption(final String option, final Supplier compressionEngineSupplier) { + this.option = option.toLowerCase(); + this.compressionEngineSupplier = compressionEngineSupplier; + } + + public CompressionEngine getCompressionEngine() { + return compressionEngineSupplier.get(); + } + + @JsonCreator + public static CompressionOption fromOptionValue(final String option) { + return OPTIONS_MAP.get(option.toLowerCase()); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java new file mode 100644 index 0000000000..f59956a8ed --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class GZipCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) throws IOException { + return new GzipCompressorOutputStream(outputStream); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java new file mode 100644 index 0000000000..9c852b4f85 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.OutputStream; + +public class NoneCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) { + return outputStream; + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 61d27cecae..75ae2dde1c 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -70,6 +71,7 @@ void setUp() { when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(MAX_EVENTS); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(MAXIMUM_SIZE)); when(s3SinkConfig.getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(MAX_RETRIES)); + when(s3SinkConfig.getCompression()).thenReturn(CompressionOption.NONE); when(objectKeyOptions.getNamePattern()).thenReturn(OBJECT_KEY_NAME_PATTERN); when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java new file mode 100644 index 0000000000..a27798f3df --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferFactoryTest { + @Mock + private BufferFactory innerBufferFactory; + + @Mock + private CompressionEngine compressionEngine; + + private CompressionBufferFactory createObjectUnderTest() { + return new CompressionBufferFactory(innerBufferFactory, compressionEngine); + } + + @Test + void constructor_throws_if_inner_BufferFactory_is_null() { + innerBufferFactory = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_CompressionEngine_is_null() { + compressionEngine = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class WithBuffer { + @Mock + private Buffer innerBuffer; + + @BeforeEach + void setUp() { + when(innerBufferFactory.getBuffer()).thenReturn(innerBuffer); + } + + @Test + void getBuffer_returns_CompressionBuffer() { + final Buffer buffer = createObjectUnderTest().getBuffer(); + assertThat(buffer, instanceOf(CompressionBuffer.class)); + } + + @Test + void getBuffer_returns_new_on_each_call() { + final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); + final Buffer firstBuffer = objectUnderTest.getBuffer(); + + assertThat(objectUnderTest.getBuffer(), not(equalTo(firstBuffer))); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java new file mode 100644 index 0000000000..3a7055414b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferTest { + @Mock + private Buffer innerBuffer; + + @Mock + private CompressionEngine compressionEngine; + + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + private CompressionBuffer createObjectUnderTest() { + return new CompressionBuffer(innerBuffer, compressionEngine); + } + + @Test + void constructor_throws_if_innerBuffer_is_null() { + innerBuffer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_compressionEngine_is_null() { + compressionEngine = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void getSize_returns_inner_getSize() { + final long size = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getSize()).thenReturn(size); + + assertThat(objectUnderTest.getSize(), equalTo(size)); + } + + @Test + void getEventCount_returns_inner_getEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getEventCount()).thenReturn(eventCount); + + assertThat(objectUnderTest.getEventCount(), equalTo(eventCount)); + } + + @Test + void getDuration_returns_inner_getDuration() { + final long duration = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getDuration()).thenReturn(duration); + + assertThat(objectUnderTest.getDuration(), equalTo(duration)); + } + + @Test + void flushToS3_calls_inner_flushToS3() { + final S3Client s3Client = mock(S3Client.class); + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + + createObjectUnderTest().flushToS3(s3Client, bucket, key); + + verify(innerBuffer).flushToS3(s3Client, bucket, key); + } + + @Test + void getOutputStream_returns_outputStream_via_CompressionEngine() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final OutputStream actualOutputStream = createObjectUnderTest().getOutputStream(); + + + assertThat(actualOutputStream, sameInstance(compressionEngineOutputStream)); + } + + @Test + void getOutputStream_wraps_OutputStream_only_once() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + final OutputStream outputStream = objectUnderTest.getOutputStream(); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + + verify(compressionEngine, times(1)).createOutputStream(any()); + } + + @Test + void setEventCount_calls_inner_setEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + createObjectUnderTest().setEventCount(eventCount); + + verify(innerBuffer).setEventCount(eventCount); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java new file mode 100644 index 0000000000..a92930e958 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class GZipCompressionEngineTest { + + private GZipCompressionEngine createObjectUnderTest() { + return new GZipCompressionEngine(); + } + + @Test + void createOutputStream_should_return_GzipCompressorOutputStream() throws IOException { + final OutputStream innerOutputStream = mock(OutputStream.class); + final OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, instanceOf(GzipCompressorOutputStream.class)); + } + + @Test + void createOutputStream_should_write_compressed_data() throws IOException { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + final OutputStream outputStream = createObjectUnderTest().createOutputStream(byteArrayOutputStream); + + final byte[] inputBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); + outputStream.close(); + + final byte[] writtenBytes = byteArrayOutputStream.toByteArray(); + + assertTrue(GzipCompressorInputStream.matches(writtenBytes, 2)); + + final ByteArrayInputStream verificationInputStream = new ByteArrayInputStream(writtenBytes); + + final GzipCompressorInputStream uncompressingInputStream = new GzipCompressorInputStream(verificationInputStream); + final byte[] uncompressedBytes = uncompressingInputStream.readAllBytes(); + assertThat(uncompressedBytes, equalTo(inputBytes)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java new file mode 100644 index 0000000000..17c581b0c7 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.OutputStream; + +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; + +class NoneCompressionEngineTest { + + private OutputStream innerOutputStream; + + @BeforeEach + void setUp() { + innerOutputStream = mock(OutputStream.class); + } + + private NoneCompressionEngine createObjectUnderTest() { + return new NoneCompressionEngine(); + } + + @Test + void createOutputStream_returns_innerOutputStream() { + OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, sameInstance(innerOutputStream)); + verifyNoInteractions(innerOutputStream); + } +} \ No newline at end of file