Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

-Support for Sink Codecs #3081

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions data-prepper-plugins/avro-codecs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ pipeline:
" {\"name\": \"name\", \"type\": \"string\"}," +
" {\"name\": \"age\", \"type\": \"int\"}]" +
"}";
tabular_schema: |
TABLE Person (colname datatype,
colname2 datatype,
colname3 datatype,
colname4 datatype)
exclude_keys:
- s3
buffer_type: in_memory
Expand All @@ -45,6 +50,7 @@ pipeline:

1) `schema`: A json string that user can provide in the yaml file itself. The codec parses schema object from this schema string.
2) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records.
3) `tabular_schema`: A multiline schema string like glue schema string that user can provide in the yaml file itself. The codec build schema object from this schema string.

### Note:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void start(final OutputStream outputStream, final Event event, final Stri
Objects.requireNonNull(outputStream);
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if(config.getTabularSchemaString() != null){
schema = AvroSchemaParserFromTabularFormat.generateSchemaFromTabularString(config.getTabularSchemaString());
} else if (config.getFileLocation() != null) {
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
} else if (config.getSchemaRegistryUrl() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class AvroOutputCodecConfig {
@JsonProperty("file_key")
private String fileKey;

@JsonProperty("tabular_schema")
private String tabularSchemaString;

public List<String> getExcludeKeys() {
return excludeKeys;
}
Expand Down Expand Up @@ -80,4 +83,7 @@ public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
}

public String getTabularSchemaString() {
return tabularSchemaString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package org.opensearch.dataprepper.plugins.codec.avro;

import org.apache.avro.Schema;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class AvroSchemaParserFromTabularFormat {

private static final String END_SCHEMA_STRING = "]}";
private static final Pattern pattern = Pattern.compile("\\(([^()]*|)*\\)");

static Schema generateSchemaFromTabularString(final String inputString) throws IOException {
String recordSchemaOutputString = inputString;
recordSchemaOutputString = recordSchemaOutputString.trim();

final String tableName = extractTableName(recordSchemaOutputString.split("\\s+"));
recordSchemaOutputString = getStringFromParanthesis(recordSchemaOutputString);
recordSchemaOutputString = removeSpaceAndReplaceParanthesis(recordSchemaOutputString);

final StringBuilder mainSchemaBuilder = new StringBuilder();
final String baseSchemaStr = "{\"type\":\"record\",\"name\":\"" + tableName + "\",\"fields\":[";
mainSchemaBuilder.append(baseSchemaStr);

iterateRecursively(mainSchemaBuilder, recordSchemaOutputString, false, tableName, 0);

mainSchemaBuilder.append(END_SCHEMA_STRING);

return new org.apache.avro.Schema.Parser().parse(mainSchemaBuilder.toString());
}

private static String removeSpaceAndReplaceParanthesis(String recordSchemaOutputString) {
recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\(", "");
recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\)", "");

recordSchemaOutputString = recordSchemaOutputString.trim();

recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\n", "");
recordSchemaOutputString = recordSchemaOutputString.replaceAll(",\\s+", ",");
recordSchemaOutputString = recordSchemaOutputString.replaceAll(">\\s+", ">");
recordSchemaOutputString = recordSchemaOutputString.replaceAll("<\\s+", "<");
return recordSchemaOutputString;
}

private static String getStringFromParanthesis(String recordSchemaOutputString) {
final Matcher matcher = pattern.matcher(recordSchemaOutputString);
if (matcher.find()) {
recordSchemaOutputString = matcher.group(0);
}
return recordSchemaOutputString;
}

private static String extractTableName(final String[] words) throws IOException {
String tableName = null;
if (words.length >= 2) {
tableName = words[1];
} else {
throw new IOException("Invalid schema string.");
}
return tableName;
}

private static String buildBaseSchemaString(final String part) {
return "{\"type\":\"record\",\"name\":\"" + part + "\",\"fields\":[";
}

private static String buildSchemaStringForArr() {
return "{\"type\":\"array\", \"items\":\"string\"}";
}

public static void iterateRecursively(final StringBuilder mainSchemaBuilder, final String recordSchema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code appears to be taking a SQL-like schema and then converting it into an Avro schema. Right?

Why not just accept the Avro schema in the pipeline configuration?

This appears to be introducing an undefined language and there may be many different edge cases that we are not accounting for.

If this is using a well defined language, can we perform a model mapping to make sure it is valid? Say for example, it is a Postgresql DDL, can we parse the DDL using a Postgresql library into a Java model? Then we can perform a more accurate mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @dlvenable , This is Raj's suggestion to support glue like schema and converting it onto avro schema, We are already accepting Avro schema in pipeline YAML itself,
As of now there is no library to do mapping between glue schema like structure to Avro/Parquet schema as there are nested and logical types in Avro/Parquet also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the syntax of this table structure?

Copy link
Contributor Author

@omkarmmore95 omkarmmore95 Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable , Shared over mail, as I cant paste here

final boolean isStructString, final String tableName, int innerRecCounter) {
boolean isNameStringFormed = false;
StringBuilder fieldNameBuilder = new StringBuilder();
StringBuilder fieldTypeBuilder = new StringBuilder();
boolean isFirstRecordForName = true;

final char[] schemaStrCharArr = recordSchema.toCharArray();
int curPosInSchemaStrCharArr = 0;

while (curPosInSchemaStrCharArr < schemaStrCharArr.length) {
final char currentCharFromArr = schemaStrCharArr[curPosInSchemaStrCharArr];
curPosInSchemaStrCharArr++;

if (!isNameStringFormed) {
if (isStructString && currentCharFromArr == ':') {
if (isFirstRecordForName) {
mainSchemaBuilder.append("{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\"");
} else {
mainSchemaBuilder.append(",{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\"");
}
isNameStringFormed = true;
fieldNameBuilder = new StringBuilder();
isFirstRecordForName = false;
continue;
} else if (currentCharFromArr == ' ') {
if (isFirstRecordForName) {
mainSchemaBuilder.append("{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\"");
} else {
mainSchemaBuilder.append(",{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\"");
}
isNameStringFormed = true;
fieldNameBuilder = new StringBuilder();
isFirstRecordForName = false;
continue;
}
fieldNameBuilder.append(currentCharFromArr);
}

if (isNameStringFormed) {

if (currentCharFromArr == ',' || curPosInSchemaStrCharArr == schemaStrCharArr.length) {
if (curPosInSchemaStrCharArr == schemaStrCharArr.length) {
fieldTypeBuilder.append(currentCharFromArr);
}
final String type = fieldTypeBuilder.toString().trim() + "\"}";

mainSchemaBuilder.append(type);
isNameStringFormed = false;
fieldTypeBuilder = new StringBuilder();
continue;
}

fieldTypeBuilder.append(currentCharFromArr);
if ("struct".equals(fieldTypeBuilder.toString())) {
mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1);
mainSchemaBuilder.append(buildBaseSchemaString(tableName + "_" + innerRecCounter));
final String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr);
final StringBuilder structString = new StringBuilder();
int openClosedCounter = 0;
int structSchemaStrEndBracketPos = 0;
for (final char innerChar : structSchemaStr.toCharArray()) {
structSchemaStrEndBracketPos++;
if (innerChar == '<') {
openClosedCounter++;
} else if (innerChar == '>') {
openClosedCounter--;
}
structString.append(innerChar);
if (openClosedCounter == 0) {
break;
}
}

final String innerRecord = structString.toString().substring(1, structSchemaStrEndBracketPos - 1);
iterateRecursively(mainSchemaBuilder, innerRecord, true,
tableName, innerRecCounter + 1);
mainSchemaBuilder.append("}");
curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos;
if (curPosInSchemaStrCharArr < schemaStrCharArr.length) {
// Skip one comma after the close struct close
curPosInSchemaStrCharArr++;
}
isNameStringFormed = false;
fieldTypeBuilder = new StringBuilder();
} else if ("array".equals(fieldTypeBuilder.toString())) {
mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1);
mainSchemaBuilder.append(buildSchemaStringForArr());
final String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr);
int openClosedCounter = 0;
int structSchemaStrEndBracketPos = 0;

for (final char innerChar : structSchemaStr.toCharArray()) {
structSchemaStrEndBracketPos++;
if (innerChar == '<') {
openClosedCounter++;
} else if (innerChar == '>') {
openClosedCounter--;
}
if (openClosedCounter == 0) {
break;
}
}

mainSchemaBuilder.append("}");
curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos;
if (curPosInSchemaStrCharArr < schemaStrCharArr.length) {
// Skip one comma after the close struct close
curPosInSchemaStrCharArr++;
}

isNameStringFormed = false;
fieldTypeBuilder = new StringBuilder();
}
}
}

if (isStructString) {
mainSchemaBuilder.append(END_SCHEMA_STRING);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public class AvroOutputCodecTest {
":\"name\",\"type\":\"string\"},{\"name\":\"nestedRecord\",\"type\":{\"type\":\"record\",\"name\":" +
"\"NestedRecord1\",\"fields\":[{\"name\":\"secondFieldInNestedRecord\",\"type\":\"int\"},{\"name\":\"" +
"firstFieldInNestedRecord\",\"type\":\"string\"}]}},{\"name\":\"age\",\"type\":\"int\"}]}";
private static final String getExpectedSchemaStringTabular="{\"type\":\"record\",\"name\":\"sesblog\",\"fields\":[{\"name\"" +
":\"eventType\",\"type\":\"string\"},{\"name\":\"ews\",\"type\":\"string\"},{\"name\":\"mail\",\"type\":" +
"{\"type\":\"record\",\"name\":\"sesblog_0\",\"fields\":[{\"name\":\"col1\",\"type\":\"string\"},{\"name\":\"" +
"innercolName\",\"type\":{\"type\":\"record\",\"name\":\"sesblog_1\",\"fields\":[{\"name\":\"colInner\"," +
"\"type\":\"string\"}]}}]}},{\"name\":\"collumn2\",\"type\":\"string\"}]}";
private static final String inputString = "TABLE sesblog (\n" +
" eventType string,\n" +
" ews string,\n" +
" mail struct<col1:string,\n" +
" innercolName struct<colInner:string> \n" +
" >,\n" +
" collumn2 string) ";
private AvroOutputCodecConfig config;

private ByteArrayOutputStream outputStream;
Expand Down Expand Up @@ -174,4 +186,11 @@ private static Map<String, Object> convertRecordToMap(GenericRecord nestedRecord
}
return eventData;
}

@Test
public void testTabularSchemaParser() throws IOException {
Schema expectedSchema = new Schema.Parser().parse(getExpectedSchemaStringTabular);
Schema actualSchema=AvroSchemaParserFromTabularFormat.generateSchemaFromTabularString(inputString);
assertThat(actualSchema, Matchers.equalTo(expectedSchema));
}
}
7 changes: 6 additions & 1 deletion data-prepper-plugins/parquet-codecs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pipeline:
event_collect_timeout: 15s
codec:
parquet:
tabular_schema: |
TABLE Person (colname1 datatype,
colname2 datatype,
colname3 datatype,
colname4 datatype)
schema: "{\"namespace\": \"org.example.test\"," +
" \"type\": \"record\"," +
" \"name\": \"TestMessage\"," +
Expand Down Expand Up @@ -58,7 +63,7 @@ pipeline:
8) `schema_bucket`: Name of the S3 bucket in which `schema.json` file is kept.
9) `file_key`: File key of `schema.json` file kept in S3 bucket.
10) `schema_region`: AWS Region of the S3 bucket in which `schema.json` file is kept.

11) `tabular_schema`: A multiline schema string like glue schema string that user can provide in the yaml file itself. The codec build schema object from this schema string.
### Note:

1) User can provide only one schema at a time i.e. through either of the ways provided in codec config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public synchronized void start(File file) throws IOException {
void buildSchemaAndKey(final Event event, final String tagsTargetKey) throws IOException {
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if(config.getTabularSchema() != null){
schema = ParquetSchemaParserFromTabularFormat.generateSchemaFromTabularString(config.getTabularSchema());
} else if(config.getFileLocation()!=null){
schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
}else if(config.getSchemaRegistryUrl()!=null){
Expand Down Expand Up @@ -284,7 +286,7 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa
LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType);
break;
}
}else{
}else if(field.schema().getLogicalType() != null) {
final String logicalTypeName = field.schema().getLogicalType().getName();
switch (logicalTypeName){
case "date":
Expand All @@ -309,12 +311,11 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa
}
return finalValue;
}
boolean checkS3SchemaValidity() throws IOException {
boolean checkS3SchemaValidity() {
if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) {
return true;
} else {
LOG.error("Invalid S3 credentials, can't reach the schema file.");
throw new IOException("Can't proceed without schema.");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class ParquetOutputCodecConfig {
@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("tabular_schema")
private String tabularSchema;

@Valid
@Size(max = 0, message = "Schema from Schema Registry is not supported.")
@JsonProperty("schema_registry_url")
Expand Down Expand Up @@ -145,5 +148,8 @@ public void setSchemaRegistryUrl(String schemaRegistryUrl) {
public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
}
public String getTabularSchema() {
return tabularSchema;
}
}

Loading
Loading