Skip to content

Commit

Permalink
Fix timestamp to use 24 hour instead of 12 hour, add compression exte…
Browse files Browse the repository at this point in the history
…nsion to files

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Aug 15, 2023
1 parent 60f69b3 commit 22e846d
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
*/
protected String generateKey(OutputCodec codec) {
final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig);
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, codec.getExtension());
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, codec.getExtension(), s3SinkConfig.getCompression().getExtension());
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import java.util.Objects;
import java.util.regex.Pattern;

import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
import org.slf4j.Logger;
Expand All @@ -18,7 +20,7 @@
public class ObjectKey {

public static final Logger LOG = LoggerFactory.getLogger(ObjectKey.class);
private static final String DEFAULT_CODEC_FILE_EXTENSION = "json";
private static final String FILE_EXTENSION_FORMAT = "%s%s";
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}";
private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION);

Expand Down Expand Up @@ -53,15 +55,18 @@ public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) {
* @param codecExtension extension
* @return s3 object name with prefix
*/
public static String objectFileName(S3SinkConfig s3SinkConfig, String codecExtension) {
public static String objectFileName(S3SinkConfig s3SinkConfig, String codecExtension, String compressionExtension) {
Objects.requireNonNull(codecExtension);
Objects.requireNonNull(compressionExtension);

final String fileExtension = String.format(FILE_EXTENSION_FORMAT, codecExtension, compressionExtension);
String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern();

int extensionIndex = configNamePattern.lastIndexOf('.');
if (extensionIndex > 0) {
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern.substring(0, extensionIndex)) + "."
+ (codecExtension!=null? codecExtension :configNamePattern.substring(extensionIndex + 1));
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern.substring(0, extensionIndex)) + "." + fileExtension;
} else {
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." +
(codecExtension!=null? codecExtension : DEFAULT_CODEC_FILE_EXTENSION);
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + fileExtension;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import java.util.stream.Collectors;

public enum CompressionOption {
NONE("none", NoneCompressionEngine::new),
GZIP("gzip", GZipCompressionEngine::new);
NONE("none", NoneCompressionEngine::new, ""),
GZIP("gzip", GZipCompressionEngine::new, ".gz");

private static final Map<String, CompressionOption> OPTIONS_MAP = Arrays.stream(CompressionOption.values())
.collect(Collectors.toMap(
Expand All @@ -24,16 +24,22 @@ public enum CompressionOption {

private final String option;
private final Supplier<CompressionEngine> compressionEngineSupplier;
private final String extension;

CompressionOption(final String option, final Supplier<CompressionEngine> compressionEngineSupplier) {
CompressionOption(final String option, final Supplier<CompressionEngine> compressionEngineSupplier, final String extension) {
this.option = option.toLowerCase();
this.compressionEngineSupplier = compressionEngineSupplier;
this.extension = extension;
}

public CompressionEngine getCompressionEngine() {
return compressionEngineSupplier.get();
}

public String getExtension() {
return this.extension;
}

@JsonCreator
public static CompressionOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* An implementation class of path prefix and file pattern configuration Options
*/
public class ObjectKeyOptions {
private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}";
private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}";

@JsonProperty("path_prefix")
private String pathPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBuffer;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
Expand Down Expand Up @@ -125,6 +126,8 @@ void setUp() {
when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION));
when(s3SinkConfig.getCodec()).thenReturn(pluginModel);
when(codec.getExtension()).thenReturn(UUID.randomUUID().toString());
when(s3SinkConfig.getCompression()).thenReturn(CompressionOption.NONE);
when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME);
when(pluginFactory.loadPlugin(OutputCodec.class, pluginSetting)).thenReturn(codec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;

Expand All @@ -23,18 +22,9 @@

@ExtendWith(MockitoExtension.class)
class ObjectKeyTest {

@Mock
private ObjectKey objectKey;
@Mock
private S3SinkConfig s3SinkConfig;
@Mock
private PluginModel pluginModel;
@Mock
private PluginSetting pluginSetting;
@Mock
private PluginFactory pluginFactory;
@Mock
private ObjectKeyOptions objectKeyOptions;

@BeforeEach
Expand All @@ -51,32 +41,24 @@ void test_buildingPathPrefix() {
assertThat(pathPrefix, startsWith("events"));
}

@Test
void test_objectFileName() {
@ParameterizedTest
@CsvSource({"csv,.gz","csv,''"})
void test_objectFileName(final String codecExtension, final String compressionExtension) {
when(objectKeyOptions.getNamePattern()).thenReturn("my-elb");
String objectFileName = ObjectKey.objectFileName(s3SinkConfig, codecExtension, compressionExtension);

when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}");
String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null);
Assertions.assertNotNull(objectFileName);
assertThat(objectFileName, startsWith("my-elb"));
Assertions.assertEquals(objectFileName, "my-elb." + codecExtension + compressionExtension);
}

@Test
void test_objectFileName_with_fileExtension() {

@ParameterizedTest
@CsvSource({"pdf,.gz","pdf,''"})
void test_objectFileName_with_fileExtension(final String codecExtension, final String compressionExtension) {
when(s3SinkConfig.getObjectKeyOptions().getNamePattern())
.thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}.pdf");
String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null);
Assertions.assertNotNull(objectFileName);
Assertions.assertTrue(objectFileName.contains(".pdf"));
}
.thenReturn("events." + codecExtension);
String objectFileName = ObjectKey.objectFileName(s3SinkConfig, codecExtension, compressionExtension);

@Test
void test_objectFileName_default_fileExtension() {

when(s3SinkConfig.getObjectKeyOptions().getNamePattern())
.thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}");
String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null);
Assertions.assertNotNull(objectFileName);
Assertions.assertTrue(objectFileName.contains(".json"));
Assertions.assertEquals(objectFileName, "events." + codecExtension + compressionExtension);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class ObjectKeyOptionsTest {

private static final String DEFAULT_FILE_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}";
private static final String DEFAULT_FILE_PATTERN = "events-%{yyyy-MM-dd'T'HH-mm-ss'Z'}";

@Test
void default_file_pattern_test() {
Expand Down

0 comments on commit 22e846d

Please sign in to comment.