diff --git a/data-prepper-plugins/avro-codecs/README.md b/data-prepper-plugins/avro-codecs/README.md index 9423b5380c..1b9f3707c0 100644 --- a/data-prepper-plugins/avro-codecs/README.md +++ b/data-prepper-plugins/avro-codecs/README.md @@ -34,6 +34,11 @@ pipeline: " {\"name\": \"name\", \"type\": \"string\"}," + " {\"name\": \"age\", \"type\": \"int\"}]" + "}"; + tabular_schema: | + TABLE Person (colname datatype, + colname2 datatype, + colname3 datatype, + colname4 datatype) exclude_keys: - s3 buffer_type: in_memory @@ -45,6 +50,7 @@ pipeline: 1) `schema`: A json string that user can provide in the yaml file itself. The codec parses schema object from this schema string. 2) `exclude_keys`: Those keys of the events that the user wants to exclude while converting them to avro records. +3) `tabular_schema`: A multiline schema string like glue schema string that user can provide in the yaml file itself. The codec build schema object from this schema string. ### Note: diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java index 5d9cb870e4..df9180d4da 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -57,6 +57,8 @@ public void start(final OutputStream outputStream, final Event event, final Stri Objects.requireNonNull(outputStream); if (config.getSchema() != null) { schema = parseSchema(config.getSchema()); + } else if(config.getTabularSchemaString() != null){ + schema = AvroSchemaParserFromTabularFormat.generateSchemaFromTabularString(config.getTabularSchemaString()); } else if (config.getFileLocation() != null) { schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation()); } else if (config.getSchemaRegistryUrl() != null) { diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java index 6d28b74190..3e227fb9b0 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java @@ -45,6 +45,9 @@ public class AvroOutputCodecConfig { @JsonProperty("file_key") private String fileKey; + @JsonProperty("tabular_schema") + private String tabularSchemaString; + public List getExcludeKeys() { return excludeKeys; } @@ -80,4 +83,7 @@ public void setExcludeKeys(List excludeKeys) { this.excludeKeys = excludeKeys; } + public String getTabularSchemaString() { + return tabularSchemaString; + } } diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroSchemaParserFromTabularFormat.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroSchemaParserFromTabularFormat.java new file mode 100644 index 0000000000..a0877a2b6e --- /dev/null +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroSchemaParserFromTabularFormat.java @@ -0,0 +1,194 @@ +package org.opensearch.dataprepper.plugins.codec.avro; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AvroSchemaParserFromTabularFormat { + + private static final String END_SCHEMA_STRING = "]}"; + private static final Pattern pattern = Pattern.compile("\\(([^()]*|)*\\)"); + + static Schema generateSchemaFromTabularString(final String inputString) throws IOException { + String recordSchemaOutputString = inputString; + recordSchemaOutputString = recordSchemaOutputString.trim(); + + final String tableName = extractTableName(recordSchemaOutputString.split("\\s+")); + recordSchemaOutputString = getStringFromParanthesis(recordSchemaOutputString); + recordSchemaOutputString = removeSpaceAndReplaceParanthesis(recordSchemaOutputString); + + final StringBuilder mainSchemaBuilder = new StringBuilder(); + final String baseSchemaStr = "{\"type\":\"record\",\"name\":\"" + tableName + "\",\"fields\":["; + mainSchemaBuilder.append(baseSchemaStr); + + iterateRecursively(mainSchemaBuilder, recordSchemaOutputString, false, tableName, 0); + + mainSchemaBuilder.append(END_SCHEMA_STRING); + + return new org.apache.avro.Schema.Parser().parse(mainSchemaBuilder.toString()); + } + + private static String removeSpaceAndReplaceParanthesis(String recordSchemaOutputString) { + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\(", ""); + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\)", ""); + + recordSchemaOutputString = recordSchemaOutputString.trim(); + + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\n", ""); + recordSchemaOutputString = recordSchemaOutputString.replaceAll(",\\s+", ","); + recordSchemaOutputString = recordSchemaOutputString.replaceAll(">\\s+", ">"); + recordSchemaOutputString = recordSchemaOutputString.replaceAll("<\\s+", "<"); + return recordSchemaOutputString; + } + + private static String getStringFromParanthesis(String recordSchemaOutputString) { + final Matcher matcher = pattern.matcher(recordSchemaOutputString); + if (matcher.find()) { + recordSchemaOutputString = matcher.group(0); + } + return recordSchemaOutputString; + } + + private static String extractTableName(final String[] words) throws IOException { + String tableName = null; + if (words.length >= 2) { + tableName = words[1]; + } else { + throw new IOException("Invalid schema string."); + } + return tableName; + } + + private static String buildBaseSchemaString(final String part) { + return "{\"type\":\"record\",\"name\":\"" + part + "\",\"fields\":["; + } + + private static String buildSchemaStringForArr() { + return "{\"type\":\"array\", \"items\":\"string\"}"; + } + + public static void iterateRecursively(final StringBuilder mainSchemaBuilder, final String recordSchema, + final boolean isStructString, final String tableName, int innerRecCounter) { + boolean isNameStringFormed = false; + StringBuilder fieldNameBuilder = new StringBuilder(); + StringBuilder fieldTypeBuilder = new StringBuilder(); + boolean isFirstRecordForName = true; + + final char[] schemaStrCharArr = recordSchema.toCharArray(); + int curPosInSchemaStrCharArr = 0; + + while (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + final char currentCharFromArr = schemaStrCharArr[curPosInSchemaStrCharArr]; + curPosInSchemaStrCharArr++; + + if (!isNameStringFormed) { + if (isStructString && currentCharFromArr == ':') { + if (isFirstRecordForName) { + mainSchemaBuilder.append("{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } else { + mainSchemaBuilder.append(",{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } + isNameStringFormed = true; + fieldNameBuilder = new StringBuilder(); + isFirstRecordForName = false; + continue; + } else if (currentCharFromArr == ' ') { + if (isFirstRecordForName) { + mainSchemaBuilder.append("{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } else { + mainSchemaBuilder.append(",{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } + isNameStringFormed = true; + fieldNameBuilder = new StringBuilder(); + isFirstRecordForName = false; + continue; + } + fieldNameBuilder.append(currentCharFromArr); + } + + if (isNameStringFormed) { + + if (currentCharFromArr == ',' || curPosInSchemaStrCharArr == schemaStrCharArr.length) { + if (curPosInSchemaStrCharArr == schemaStrCharArr.length) { + fieldTypeBuilder.append(currentCharFromArr); + } + final String type = fieldTypeBuilder.toString().trim() + "\"}"; + + mainSchemaBuilder.append(type); + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + continue; + } + + fieldTypeBuilder.append(currentCharFromArr); + if ("struct".equals(fieldTypeBuilder.toString())) { + mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1); + mainSchemaBuilder.append(buildBaseSchemaString(tableName + "_" + innerRecCounter)); + final String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr); + final StringBuilder structString = new StringBuilder(); + int openClosedCounter = 0; + int structSchemaStrEndBracketPos = 0; + for (final char innerChar : structSchemaStr.toCharArray()) { + structSchemaStrEndBracketPos++; + if (innerChar == '<') { + openClosedCounter++; + } else if (innerChar == '>') { + openClosedCounter--; + } + structString.append(innerChar); + if (openClosedCounter == 0) { + break; + } + } + + final String innerRecord = structString.toString().substring(1, structSchemaStrEndBracketPos - 1); + iterateRecursively(mainSchemaBuilder, innerRecord, true, + tableName, innerRecCounter + 1); + mainSchemaBuilder.append("}"); + curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos; + if (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + // Skip one comma after the close struct close + curPosInSchemaStrCharArr++; + } + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + } else if ("array".equals(fieldTypeBuilder.toString())) { + mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1); + mainSchemaBuilder.append(buildSchemaStringForArr()); + final String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr); + int openClosedCounter = 0; + int structSchemaStrEndBracketPos = 0; + + for (final char innerChar : structSchemaStr.toCharArray()) { + structSchemaStrEndBracketPos++; + if (innerChar == '<') { + openClosedCounter++; + } else if (innerChar == '>') { + openClosedCounter--; + } + if (openClosedCounter == 0) { + break; + } + } + + mainSchemaBuilder.append("}"); + curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos; + if (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + // Skip one comma after the close struct close + curPosInSchemaStrCharArr++; + } + + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + } + } + } + + if (isStructString) { + mainSchemaBuilder.append(END_SCHEMA_STRING); + } + } + +} diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java index a5b08fa9f2..40989c1f81 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java @@ -35,6 +35,18 @@ public class AvroOutputCodecTest { ":\"name\",\"type\":\"string\"},{\"name\":\"nestedRecord\",\"type\":{\"type\":\"record\",\"name\":" + "\"NestedRecord1\",\"fields\":[{\"name\":\"secondFieldInNestedRecord\",\"type\":\"int\"},{\"name\":\"" + "firstFieldInNestedRecord\",\"type\":\"string\"}]}},{\"name\":\"age\",\"type\":\"int\"}]}"; + private static final String getExpectedSchemaStringTabular="{\"type\":\"record\",\"name\":\"sesblog\",\"fields\":[{\"name\"" + + ":\"eventType\",\"type\":\"string\"},{\"name\":\"ews\",\"type\":\"string\"},{\"name\":\"mail\",\"type\":" + + "{\"type\":\"record\",\"name\":\"sesblog_0\",\"fields\":[{\"name\":\"col1\",\"type\":\"string\"},{\"name\":\"" + + "innercolName\",\"type\":{\"type\":\"record\",\"name\":\"sesblog_1\",\"fields\":[{\"name\":\"colInner\"," + + "\"type\":\"string\"}]}}]}},{\"name\":\"collumn2\",\"type\":\"string\"}]}"; + private static final String inputString = "TABLE sesblog (\n" + + " eventType string,\n" + + " ews string,\n" + + " mail struct \n" + + " >,\n" + + " collumn2 string) "; private AvroOutputCodecConfig config; private ByteArrayOutputStream outputStream; @@ -174,4 +186,11 @@ private static Map convertRecordToMap(GenericRecord nestedRecord } return eventData; } + + @Test + public void testTabularSchemaParser() throws IOException { + Schema expectedSchema = new Schema.Parser().parse(getExpectedSchemaStringTabular); + Schema actualSchema=AvroSchemaParserFromTabularFormat.generateSchemaFromTabularString(inputString); + assertThat(actualSchema, Matchers.equalTo(expectedSchema)); + } } diff --git a/data-prepper-plugins/parquet-codecs/README.md b/data-prepper-plugins/parquet-codecs/README.md index e67af5e3af..17488ecd18 100644 --- a/data-prepper-plugins/parquet-codecs/README.md +++ b/data-prepper-plugins/parquet-codecs/README.md @@ -27,6 +27,11 @@ pipeline: event_collect_timeout: 15s codec: parquet: + tabular_schema: | + TABLE Person (colname1 datatype, + colname2 datatype, + colname3 datatype, + colname4 datatype) schema: "{\"namespace\": \"org.example.test\"," + " \"type\": \"record\"," + " \"name\": \"TestMessage\"," + @@ -58,7 +63,7 @@ pipeline: 8) `schema_bucket`: Name of the S3 bucket in which `schema.json` file is kept. 9) `file_key`: File key of `schema.json` file kept in S3 bucket. 10) `schema_region`: AWS Region of the S3 bucket in which `schema.json` file is kept. - +11) `tabular_schema`: A multiline schema string like glue schema string that user can provide in the yaml file itself. The codec build schema object from this schema string. ### Note: 1) User can provide only one schema at a time i.e. through either of the ways provided in codec config. diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index 71e6fae4b5..968a3c6515 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -84,6 +84,8 @@ public synchronized void start(File file) throws IOException { void buildSchemaAndKey(final Event event, final String tagsTargetKey) throws IOException { if (config.getSchema() != null) { schema = parseSchema(config.getSchema()); + } else if(config.getTabularSchema() != null){ + schema = ParquetSchemaParserFromTabularFormat.generateSchemaFromTabularString(config.getTabularSchema()); } else if(config.getFileLocation()!=null){ schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation()); }else if(config.getSchemaRegistryUrl()!=null){ @@ -284,7 +286,7 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType); break; } - }else{ + }else if(field.schema().getLogicalType() != null) { final String logicalTypeName = field.schema().getLogicalType().getName(); switch (logicalTypeName){ case "date": @@ -309,12 +311,11 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa } return finalValue; } - boolean checkS3SchemaValidity() throws IOException { + boolean checkS3SchemaValidity() { if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) { return true; } else { - LOG.error("Invalid S3 credentials, can't reach the schema file."); - throw new IOException("Can't proceed without schema."); + return false; } } } diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java index e1016496d4..666a35e5e2 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java @@ -57,6 +57,9 @@ public class ParquetOutputCodecConfig { @JsonProperty("exclude_keys") private List excludeKeys = DEFAULT_EXCLUDE_KEYS; + @JsonProperty("tabular_schema") + private String tabularSchema; + @Valid @Size(max = 0, message = "Schema from Schema Registry is not supported.") @JsonProperty("schema_registry_url") @@ -145,5 +148,8 @@ public void setSchemaRegistryUrl(String schemaRegistryUrl) { public void setExcludeKeys(List excludeKeys) { this.excludeKeys = excludeKeys; } + public String getTabularSchema() { + return tabularSchema; + } } diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromTabularFormat.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromTabularFormat.java new file mode 100644 index 0000000000..c14b60961a --- /dev/null +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetSchemaParserFromTabularFormat.java @@ -0,0 +1,186 @@ +package org.opensearch.dataprepper.plugins.codec.parquet; + + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ParquetSchemaParserFromTabularFormat { + + private static final Pattern pattern = Pattern.compile("\\(([^()]*|)*\\)"); + + public static Schema generateSchemaFromTabularString(final String inputString) throws IOException { + String recordSchemaOutputString = inputString.trim(); + + final String tableName = extractTableName(recordSchemaOutputString.split("\\s+")); + recordSchemaOutputString = getStringFromParanthesis(recordSchemaOutputString); + recordSchemaOutputString = removeSpaceAndReplaceParanthesis(recordSchemaOutputString); + + final StringBuilder mainSchemaBuilder = new StringBuilder(); + final String baseSchemaStr = "{\"type\":\"record\",\"name\":\"" + tableName + "\",\"fields\":["; + mainSchemaBuilder.append(baseSchemaStr); + iterateOverString(mainSchemaBuilder, recordSchemaOutputString); + mainSchemaBuilder.append("]}"); + + return new Schema.Parser().parse(mainSchemaBuilder.toString()); + } + + private static String removeSpaceAndReplaceParanthesis(String recordSchemaOutputString) { + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\(", ""); + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\)", ""); + + recordSchemaOutputString = recordSchemaOutputString.trim(); + + recordSchemaOutputString = recordSchemaOutputString.replaceAll("\\n", ""); + recordSchemaOutputString = recordSchemaOutputString.replaceAll(",\\s+", ","); + recordSchemaOutputString = recordSchemaOutputString.replaceAll(">\\s+", ">"); + recordSchemaOutputString = recordSchemaOutputString.replaceAll("<\\s+", "<"); + return recordSchemaOutputString; + } + + private static String getStringFromParanthesis(String recordSchemaOutputString) { + final Matcher matcher = pattern.matcher(recordSchemaOutputString); + if (matcher.find()) { + recordSchemaOutputString = matcher.group(0); + } + return recordSchemaOutputString; + } + + private static String extractTableName(final String[] words) throws IOException { + String tableName = null; + if (words.length >= 2) { + tableName = words[1]; + } else { + throw new IOException("Invalid schema string."); + } + return tableName; + } + + private static String buildSchemaStringForArr() { + return "{\"type\":\"array\", \"items\":\"string\"}"; + } + + private static void iterateOverString(final StringBuilder mainSchemaBuilder, final String recordSchema) { + boolean isNameStringFormed = false; + StringBuilder fieldNameBuilder = new StringBuilder(); + StringBuilder fieldTypeBuilder = new StringBuilder(); + boolean isFirstRecordForName = true; + + final char[] schemaStrCharArr = recordSchema.toCharArray(); + int curPosInSchemaStrCharArr = 0; + + while (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + final char currentCharFromArr = schemaStrCharArr[curPosInSchemaStrCharArr]; + curPosInSchemaStrCharArr++; + + if (!isNameStringFormed) { + if (currentCharFromArr == ' ') { + if (isFirstRecordForName) { + mainSchemaBuilder.append("{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } else { + mainSchemaBuilder.append(",{\"name\":\"" + fieldNameBuilder.toString() + "\",\"type\":\""); + } + isNameStringFormed = true; + fieldNameBuilder = new StringBuilder(); + isFirstRecordForName = false; + continue; + } + fieldNameBuilder.append(currentCharFromArr); + } + + if (isNameStringFormed) { + + if (currentCharFromArr == ',' || curPosInSchemaStrCharArr == schemaStrCharArr.length) { + if (curPosInSchemaStrCharArr == schemaStrCharArr.length) { + fieldTypeBuilder.append(currentCharFromArr); + } + final String type = fieldTypeBuilder.toString().trim() + "\"}"; + + mainSchemaBuilder.append(type); + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + continue; + } + + fieldTypeBuilder.append(currentCharFromArr); + // Below block if for logical + if ("array".equals(fieldTypeBuilder.toString())) { + mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1); + mainSchemaBuilder.append(buildSchemaStringForArr()); + String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr); + int openClosedCounter = 0; + int structSchemaStrEndBracketPos = 0; + + for (final char innerChar : structSchemaStr.toCharArray()) { + structSchemaStrEndBracketPos++; + if (innerChar == '<') { + openClosedCounter++; + } else if (innerChar == '>') { + openClosedCounter--; + } + if (openClosedCounter == 0) { + break; + } + } + + mainSchemaBuilder.append("}"); + curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos; + if (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + // Skip one comma after this + curPosInSchemaStrCharArr++; + } + + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + } else if ("<".equals(fieldTypeBuilder.toString())) { + mainSchemaBuilder.deleteCharAt(mainSchemaBuilder.length() - 1); + curPosInSchemaStrCharArr--; + String structSchemaStr = recordSchema.substring(curPosInSchemaStrCharArr); + StringBuilder logicalTypeStringBuilder = new StringBuilder(); + int openClosedCounter = 0; + int structSchemaStrEndBracketPos = 0; + for (final char innerChar : structSchemaStr.toCharArray()) { + structSchemaStrEndBracketPos++; + if (innerChar == '<') { + openClosedCounter++; + } else if (innerChar == '>') { + openClosedCounter--; + } else { + logicalTypeStringBuilder.append(innerChar); + } + + if (openClosedCounter == 0) { + break; + } + } + + final String[] logicalTypeStr = logicalTypeStringBuilder.toString().split(","); + + mainSchemaBuilder.append("{"); + boolean isFirstRec = true; + for (final String str : logicalTypeStr) { + final String[] innerStr = str.split(":"); + if (isFirstRec) { + mainSchemaBuilder.append("\"" + innerStr[0] + "\": \"" + innerStr[1] + "\""); + isFirstRec = false; + } else { + mainSchemaBuilder.append(",\"" + innerStr[0] + "\": \"" + innerStr[1] + "\""); + } + } + mainSchemaBuilder.append("}}"); + + curPosInSchemaStrCharArr = curPosInSchemaStrCharArr + structSchemaStrEndBracketPos; + if (curPosInSchemaStrCharArr < schemaStrCharArr.length) { + // Skip one comma after this + curPosInSchemaStrCharArr++; + } + isNameStringFormed = false; + fieldTypeBuilder = new StringBuilder(); + } + } + } + } + +} diff --git a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java index abba1c5429..c48bab36bd 100644 --- a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java +++ b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java @@ -44,10 +44,19 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; public class ParquetOutputCodecTest { private static final String FILE_NAME = "parquet-data"; + private static final String expectedSchemaString = "{\"type\":\"record\",\"name\":\"sesblog\",\"fields\":[{\"name\":" + + "\"eventType\",\"type\":\"string\"},{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":" + + "\"date\",\"precision\":\"10\",\"scale\":\"2\"}},{\"name\":\"col2\",\"type\":\"string\"},{\"name\":\"tags\"," + + "\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"col3\",\"type\":\"string\"}]}"; + private static final String inputString = "TABLE sesblog (\n" + + " eventType string,\n" + + " event_date ,\n" + + " col2 string,\n" + + " tags array,\n" + + " col3 string) "; private static final String FILE_SUFFIX = ".parquet"; private static int numberOfRecords; private ParquetOutputCodecConfig config; @@ -129,6 +138,13 @@ public void test_getExtension() { assertThat(extension, equalTo("parquet")); } + + @Test + public void testSchemaGenerationFromTabular() throws IOException { + Schema expectedSchema = new Schema.Parser().parse(expectedSchemaString); + Schema actualSchema = ParquetSchemaParserFromTabularFormat.generateSchemaFromTabularString(inputString); + assertThat(actualSchema, Matchers.equalTo(expectedSchema)); + } @Test public void whenNoSchemaProvided_thenThrowsException() { config = new ParquetOutputCodecConfig(); @@ -136,12 +152,11 @@ public void whenNoSchemaProvided_thenThrowsException() { config.setFileLocation(null); config.setSchemaRegistryUrl(null); ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); - assertThrows(IOException.class,()-> - parquetOutputCodec.buildSchemaAndKey(null, null)); + assertThat(parquetOutputCodec.checkS3SchemaValidity(), equalTo(Boolean.FALSE)); } @Test - public void test_s3SchemaValidity() throws IOException { + public void test_s3SchemaValidity() { config = new ParquetOutputCodecConfig(); config.setSchema(parseSchema().toString()); config.setSchemaBucket("test"); @@ -150,8 +165,7 @@ public void test_s3SchemaValidity() throws IOException { ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); assertThat(parquetOutputCodec.checkS3SchemaValidity(), equalTo(Boolean.TRUE)); ParquetOutputCodec parquetOutputCodecFalse = createObjectUnderTest(); - assertThrows(IOException.class,()-> - parquetOutputCodecFalse.checkS3SchemaValidity()); + assertThat(parquetOutputCodecFalse.checkS3SchemaValidity(), equalTo(Boolean.FALSE)); } private List> createParquetRecordsList(final InputStream inputStream) throws IOException {