Skip to content

Commit

Permalink
Adds S3 sink compression. Resolves opensearch-project#3130.
Browse files Browse the repository at this point in the history
Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Aug 10, 2023
1 parent 47a9bc0 commit e46c859
Show file tree
Hide file tree
Showing 18 changed files with 578 additions and 65 deletions.
8 changes: 5 additions & 3 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation libs.commons.lang3
testImplementation project(':data-prepper-test-common')
implementation project(':data-prepper-plugins:parquet-codecs')
implementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:parquet-codecs')
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:csv-processor')
testImplementation project(':data-prepper-plugins:avro-codecs')
}

test {
Expand Down Expand Up @@ -55,7 +57,7 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket')
systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region')
systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
Expand All @@ -24,9 +27,9 @@
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -43,13 +46,16 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig;
import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec;
import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
Expand All @@ -66,6 +72,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
Expand Down Expand Up @@ -129,7 +136,7 @@ class S3SinkServiceIT {

@BeforeEach
public void setUp() {
s3region = System.getProperty("tests.s3ink.region");
s3region = System.getProperty("tests.s3sink.region");

s3Client = S3Client.builder().region(Region.of(s3region)).build();
bucketName = System.getProperty("tests.s3sink.bucket");
Expand Down Expand Up @@ -168,19 +175,65 @@ void configureNewLineCodec() {
}

@Test
void verify_flushed_records_into_s3_bucketNewLine() {
void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException {
configureNewLineCodec();
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = setEventQueue();

s3SinkService.output(recordsData);
String objectData = getS3Object();
String objectData = new String(getS3Object());

final ObjectMapper objectMapperForDeserialization = new ObjectMapper();
int count = 0;
String[] objectDataArr = objectData.split("\r\n");

String[] objectDataArr = objectData.split(System.lineSeparator());
assertThat(objectDataArr.length, equalTo(recordsData.size()));

for (Record<Event> recordData : recordsData) {
String objectRecord = recordData.getData().toJsonString();
assertThat(objectDataArr[count], CoreMatchers.containsString(objectRecord));
final String actualLine = objectDataArr[count];
final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class);

final Map<String, Object> expectedMap = new HashMap<>(recordData.getData().toMap());
expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags()));
assertThat(actualDeserializedJson, equalTo(expectedMap));

final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString();
assertThat(actualLine, equalTo(expectedJsonString));
count++;
}
}

@Test
void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException {
configureNewLineCodec();
bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine());
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = setEventQueue();

s3SinkService.output(recordsData);
byte[] s3ObjectBytes = getS3Object();

ByteArrayInputStream s3ObjectInputStream = new ByteArrayInputStream(s3ObjectBytes);
InputStream decompressingInputStream = new GZipDecompressionEngine().createInputStream(s3ObjectInputStream);

String objectData = IOUtils.toString(decompressingInputStream, Charset.defaultCharset());

final ObjectMapper objectMapperForDeserialization = new ObjectMapper();
int count = 0;

String[] objectDataArr = objectData.split(System.lineSeparator());
assertThat(objectDataArr.length, equalTo(recordsData.size()));

for (Record<Event> recordData : recordsData) {
final String actualLine = objectDataArr[count];
final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class);

final Map<String, Object> expectedMap = new HashMap<>(recordData.getData().toMap());
expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags()));
assertThat(actualDeserializedJson, equalTo(expectedMap));

final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString();
assertThat(actualLine, equalTo(expectedJsonString));
count++;
}
}
Expand All @@ -202,7 +255,7 @@ private int gets3ObjectCount() {
return s3ObjectCount;
}

private String getS3Object() {
private byte[] getS3Object() {

ListObjectsRequest listObjects = ListObjectsRequest.builder()
.bucket(bucketName)
Expand All @@ -220,8 +273,7 @@ private String getS3Object() {
.bucket(bucketName).build();

ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(objectRequest);
byte[] data = objectBytes.asByteArray();
return new String(data);
return objectBytes.asByteArray();
}

private String getPathPrefix() {
Expand All @@ -240,20 +292,19 @@ private static Record<Event> createRecord() {
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder().
withEventType(EventType.LOG.toString()).
withTags(testTags).build();
Map<String, Object> json = generateJson(testTags);
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

private static Map<String, Object> generateJson(Set<String> testTags) {
private static Map<String, Object> generateJson() {
final Map<String, Object> jsonObject = new LinkedHashMap<>();
for (int i = 0; i < 2; i++) {
jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
jsonObject.put("Tag", testTags.toArray());
return jsonObject;
}

Expand All @@ -280,14 +331,15 @@ private static List<HashMap> generateRecords(int numberOfRecords) {
}

@Test
@Disabled
void verify_flushed_records_into_s3_bucket_Parquet() throws IOException {
configureParquetCodec();
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = getRecordList();

s3SinkService.output(recordsData);

List<HashMap<String, Object>> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object().getBytes()));
List<HashMap<String, Object>> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object()));
int index = 0;
for (final HashMap<String, Object> actualMap : actualRecords) {
assertThat(actualMap, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.LocalFileBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -64,11 +66,14 @@ public S3Sink(final PluginSetting pluginSetting,
codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
sinkInitialized = Boolean.FALSE;

final BufferFactory innerBufferFactory;
if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) {
bufferFactory = new LocalFileBufferFactory();
innerBufferFactory = new LocalFileBufferFactory();
} else {
bufferFactory = new InMemoryBufferFactory();
innerBufferFactory = new InMemoryBufferFactory();
}
final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine();
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine);
final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, OutputCodecContext.fromSinkContext(sinkContext), s3Client, pluginMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
Expand All @@ -36,6 +37,9 @@ public class S3SinkConfig {
@JsonProperty("object_key")
private ObjectKeyOptions objectKeyOptions;

@JsonProperty("compression")
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("threshold")
@NotNull
private ThresholdOptions thresholdOptions;
Expand Down Expand Up @@ -118,4 +122,8 @@ public int getMaxConnectionRetries() {
public int getMaxUploadRetries() {
return maxUploadRetries;
}

public CompressionOption getCompression() {
return compression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import software.amazon.awssdk.services.s3.S3Client;
import java.io.IOException;
import java.io.OutputStream;

/**
Expand All @@ -22,11 +21,9 @@ public interface Buffer {
int getEventCount();

long getDuration();
boolean isCodecStarted();
void setCodecStarted(boolean codecStarted);

void flushToS3(S3Client s3Client, String bucket, String key) ;
void writeEvent(byte[] bytes) throws IOException;

OutputStream getOutputStream();

void setEventCount(int eventCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

class CompressionBuffer implements Buffer {
private final Buffer innerBuffer;
private final CompressionEngine compressionEngine;
private volatile OutputStream outputStream;

CompressionBuffer(final Buffer innerBuffer, final CompressionEngine compressionEngine) {
this.innerBuffer = Objects.requireNonNull(innerBuffer);
this.compressionEngine = Objects.requireNonNull(compressionEngine);
}

@Override
public long getSize() {
return innerBuffer.getSize();
}

@Override
public int getEventCount() {
return innerBuffer.getEventCount();
}

@Override
public long getDuration() {
return innerBuffer.getDuration();
}

@Override
public void flushToS3(final S3Client s3Client, final String bucket, final String key) {
innerBuffer.flushToS3(s3Client, bucket, key);
}

@Override
public OutputStream getOutputStream() {
if(outputStream == null) {
synchronized (this) {
if(outputStream == null) {
final OutputStream innerBufferOutputStream = innerBuffer.getOutputStream();
try {
outputStream = compressionEngine.createOutputStream(innerBufferOutputStream);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
}
return outputStream;
}

@Override
public void setEventCount(final int eventCount) {
innerBuffer.setEventCount(eventCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;

import java.util.Objects;

public class CompressionBufferFactory implements BufferFactory {
private final BufferFactory innerBufferFactory;
private final CompressionEngine compressionEngine;

public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine) {
this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory);
this.compressionEngine = Objects.requireNonNull(compressionEngine);
}

@Override
public Buffer getBuffer() {
return new CompressionBuffer(innerBufferFactory.getBuffer(), compressionEngine);
}
}
Loading

0 comments on commit e46c859

Please sign in to comment.