Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly add compression extensions to the generated S3 sink keys #3196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void verify_flushed_object_count_into_s3_bucket() {

void configureNewLineCodec() {
codec = new NdjsonOutputCodec(ndjsonOutputConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, codec);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}

@Test
Expand Down Expand Up @@ -356,7 +356,7 @@ private void configureParquetCodec() {
parquetOutputCodecConfig.setSchema(parseSchema().toString());
parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX);
codec = new ParquetOutputCodec(parquetOutputCodecConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, codec);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}

private Collection<Record<Event>> getRecordList() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.plugins.sink.s3;

public interface ExtensionProvider {
String getExtension();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;

public class KeyGenerator {
private final S3SinkConfig s3SinkConfig;
private final OutputCodec outputCodec;
private final ExtensionProvider extensionProvider;

public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) {
public KeyGenerator(S3SinkConfig s3SinkConfig, ExtensionProvider extensionProvider) {
this.s3SinkConfig = s3SinkConfig;
this.outputCodec = outputCodec;
this.extensionProvider = extensionProvider;
}

/**
Expand All @@ -24,7 +23,7 @@ public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) {
*/
String generateKey() {
final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig);
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, outputCodec.getExtension());
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension());
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -64,12 +65,15 @@ public S3Sink(final PluginSetting pluginSetting,
sinkInitialized = Boolean.FALSE;

final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, codec);
final BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory();
final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine();
CompressionOption compressionOption = s3SinkConfig.getCompression();
final CompressionEngine compressionEngine = compressionOption.getCompressionEngine();
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec);

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), s3SinkConfig.getCompression());
ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider);

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

class StandardExtensionProvider implements ExtensionProvider {
private final String extension;

static ExtensionProvider create(OutputCodec outputCodec, CompressionOption compressionOption) {

String codecExtension = outputCodec.getExtension();

if(outputCodec.isCompressionInternal()) {
return new StandardExtensionProvider(codecExtension);
}

String extension = compressionOption.getExtension()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the configured extension be ignored if there is a internal compression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be ignored.

With internal compression the file itself is not compressed. Different parts are independently compressed. A user should not try to run it through GZip/Snappy decompression.

With Parquet specifically, the compression is stored as metadata inside the file. And compressed files retain the .parquet extension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies that exact behavior: getExtension_returns_extension_of_codec_when_compression_internal.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarity

.map(compressionExtension -> codecExtension + "." + compressionExtension)
.orElse(codecExtension);


return new StandardExtensionProvider(extension);
}

private StandardExtensionProvider(String extension) {
this.extension = extension;
}

@Override
public String getExtension() {
return extension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public enum CompressionOption {
NONE("none", NoneCompressionEngine::new),
GZIP("gzip", GZipCompressionEngine::new),
SNAPPY("snappy", SnappyCompressionEngine::new);
NONE("none", null, NoneCompressionEngine::new),
GZIP("gzip", "gz", GZipCompressionEngine::new),
SNAPPY("snappy", "snappy", SnappyCompressionEngine::new);

private static final Map<String, CompressionOption> OPTIONS_MAP = Arrays.stream(CompressionOption.values())
.collect(Collectors.toMap(
Expand All @@ -25,9 +26,11 @@ public enum CompressionOption {

private final String option;

private final String extension;
private final Supplier<CompressionEngine> compressionEngineSupplier;
CompressionOption(final String option, final Supplier<CompressionEngine> compressionEngineSupplier) {
CompressionOption(final String option, String extension, final Supplier<CompressionEngine> compressionEngineSupplier) {
this.option = option.toLowerCase();
this.extension = extension;
this.compressionEngineSupplier = compressionEngineSupplier;
}

Expand All @@ -39,6 +42,10 @@ public String getOption() {
return option;
}

public Optional<String> getExtension() {
return Optional.ofNullable(extension);
}

@JsonCreator
public static CompressionOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;

Expand All @@ -32,7 +33,7 @@ class KeyGeneratorTest {
private S3SinkConfig s3SinkConfig;

@Mock
private OutputCodec outputCodec;
private ExtensionProvider extensionProvider;

@Mock
private ObjectKeyOptions objectKeyOptions;
Expand All @@ -44,7 +45,7 @@ void setUp() {
}

private KeyGenerator createObjectUnderTest() {
return new KeyGenerator(s3SinkConfig, outputCodec);
return new KeyGenerator(s3SinkConfig, extensionProvider);
}

@Test
Expand Down Expand Up @@ -74,4 +75,15 @@ void test_generateKey_with_date_prefix() {
assertThat(key, true);
assertThat(key, key.contains(pathPrefix + dateString));
}

@Test
void generateKey_ends_with_extension() {
String extension = UUID.randomUUID().toString();
when(extensionProvider.getExtension()).thenReturn(extension);
String pathPrefix = "events/";
when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix);
String key = createObjectUnderTest().generateKey();
assertThat(key, notNullValue());
assertThat(key, key.endsWith("." + extension));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opensearch.dataprepper.plugins.sink.s3;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class StandardExtensionProviderTest {

@Mock
private OutputCodec outputCodec;

@Mock
private CompressionOption compressionOption;

private String codecExtension;

@BeforeEach
void setUp() {
codecExtension = UUID.randomUUID().toString();
}

@Test
void getExtension_returns_extension_of_codec_when_compression_internal() {
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(outputCodec.isCompressionInternal()).thenReturn(true);

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension));

verify(compressionOption, never()).getExtension();
}

@Test
void getExtension_returns_extension_of_codec_compression_has_no_extension() {
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(compressionOption.getExtension()).thenReturn(Optional.empty());

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension));

verify(compressionOption).getExtension();
}

@Test
void getExtension_returns_extension_of_codec_compression_has_extension() {
String compressionExtension = UUID.randomUUID().toString();
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(compressionOption.getExtension()).thenReturn(Optional.of(compressionExtension));

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension + "." + compressionExtension));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Optional;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand All @@ -38,6 +40,29 @@ void getCompressionEngine_returns_expected_engine_type(final CompressionOption o
assertThat(option.getCompressionEngine(), instanceOf(expectedEngineType));
}

@ParameterizedTest
@EnumSource(CompressionOption.class)
void getExtension_returns_non_null_Optional(final CompressionOption option) {
assertThat(option.getExtension(), notNullValue());
}

@ParameterizedTest
@ArgumentsSource(OptionToExpectedExtension.class)
void getExtension_returns_expected_extension(final CompressionOption option, final String expectedExtension) {
Optional<String> extension = option.getExtension();
assertThat(extension, notNullValue());
assertThat(extension.isEmpty(), equalTo(false));
assertThat(extension.get(), equalTo(expectedExtension));
}

@ParameterizedTest
@EnumSource(value = CompressionOption.class, names = {"NONE"})
void getExtension_returns_empty_Optional_when_no_extension(final CompressionOption option) {
Optional<String> extension = option.getExtension();
assertThat(extension, notNullValue());
assertThat(extension.isEmpty(), equalTo(true));
}

static class OptionToExpectedEngine implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand All @@ -48,4 +73,14 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
);
}
}

static class OptionToExpectedExtension implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(CompressionOption.GZIP, "gz"),
arguments(CompressionOption.SNAPPY, "snappy")
);
}
}
}
Loading