Skip to content

Commit

Permalink
Removes code which isn't used for the Avro and Parquet codecs. This w…
Browse files Browse the repository at this point in the history
…ill keep untested and errant code paths out of the project. Resolves #3201. (#3205)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 22, 2023
1 parent ffa8c4a commit 3d24500
Show file tree
Hide file tree
Showing 12 changed files with 3 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) {

if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = AvroSchemaParserFromS3.parseSchema(config);
}
}

Expand Down Expand Up @@ -118,8 +112,4 @@ Schema parseSchema(final String schemaString) {
throw new RuntimeException("There is an error in the schema: " + e.getMessage());
}
}

private boolean checkS3SchemaValidity() {
return config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,19 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Size;

/**
* Configuration class for {@link AvroOutputCodec}.
*/
public class AvroOutputCodecConfig {

@JsonProperty("schema")
private String schema;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("schema_file_location")
private String fileLocation;

@Valid
@Size(max = 0, message = "Schema from schema registry is not supported.")
@JsonProperty("schema_registry_url")
private String schemaRegistryUrl;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("region")
private String region;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("bucket_name")
private String bucketName;

@Valid
@Size(max = 0, message = "Schema from file is not supported.")
@JsonProperty("file_key")
private String fileKey;

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public String getFileLocation() {
return fileLocation;
}

public String getSchemaRegistryUrl() {
return schemaRegistryUrl;
}

public String getRegion() {
return region;
}

public String getBucketName() {
return bucketName;
}

public String getFileKey() {
return fileKey;
}


}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ void verify_flushed_records_into_s3_bucket_Parquet() throws IOException {
private void configureParquetCodec() {
parquetOutputCodecConfig = new ParquetOutputCodecConfig();
parquetOutputCodecConfig.setSchema(parseSchema().toString());
parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX);
codec = new ParquetOutputCodec(parquetOutputCodecConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,23 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.fs.LocalInputFile;
import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility;
import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

@DataPrepperPlugin(name = "parquet", pluginType = OutputCodec.class, pluginConfigurationType = ParquetOutputCodecConfig.class)
public class ParquetOutputCodec implements OutputCodec {
private static final String PARQUET = "parquet";
private final ParquetOutputCodecConfig config;
private static Schema schema;
private final AvroEventConverter avroEventConverter;
private final AvroAutoSchemaGenerator avroAutoSchemaGenerator;
private ParquetWriter<GenericRecord> writer;
private OutputCodecContext codecContext;
private static final String PARQUET = "parquet";

private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}";
private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION);
private String key;


@DataPrepperPluginConstructor
Expand Down Expand Up @@ -79,16 +73,9 @@ public boolean isCompressionInternal() {
void buildSchemaAndKey(final Event event) throws IOException {
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
schema = parseSchema(ParquetSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl()));
} else if (checkS3SchemaValidity()) {
schema = ParquetSchemaParserFromS3.parseSchema(config);
} else {
schema = buildInlineSchemaFromEvent(event);
}
key = generateKey();
}

public Schema buildInlineSchemaFromEvent(final Event event) throws IOException {
Expand Down Expand Up @@ -141,42 +128,4 @@ public String getExtension() {
static Schema parseSchema(final String schemaString) {
return new Schema.Parser().parse(schemaString);
}

/**
* Generate the s3 object path prefix and object file name.
*
* @return object key path.
*/
protected String generateKey() {
final String pathPrefix = buildObjectPath(config.getPathPrefix());
final String namePattern = buildObjectFileName(config.getNamePattern());
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}

private static String buildObjectPath(final String pathPrefix) {
final StringBuilder s3ObjectPath = new StringBuilder();
if (pathPrefix != null && !pathPrefix.isEmpty()) {
String[] pathPrefixList = pathPrefix.split("\\/");
for (final String prefixPath : pathPrefixList) {
if (SIMPLE_DURATION_PATTERN.matcher(prefixPath).find()) {
s3ObjectPath.append(S3ObjectIndexUtility.getObjectPathPrefix(prefixPath)).append("/");
} else {
s3ObjectPath.append(prefixPath).append("/");
}
}
}
return s3ObjectPath.toString();
}

private String buildObjectFileName(final String configNamePattern) {
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + getExtension();
}

boolean checkS3SchemaValidity() {
if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) {
return true;
} else {
return false;
}
}
}
Loading

0 comments on commit 3d24500

Please sign in to comment.