diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 4af8684729..9f1cd56f20 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -65,6 +65,7 @@ public class KeyValueProcessor extends AbstractProcessor, Record bracketSet = Set.of('[', ']', '(', ')', '<', '>'); private final List tagsOnFailure; + private final Character stringLiteralCharacter; @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, @@ -73,6 +74,8 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; + this.stringLiteralCharacter = keyValueProcessorConfig.getStringLiteralCharacter(); + tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure(); if (keyValueProcessorConfig.getFieldDelimiterRegex() != null @@ -163,7 +166,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, } validateKeySets(includeKeysSet, excludeKeysSet, defaultValuesSet); - + if (!validTransformOptionSet.contains(keyValueProcessorConfig.getTransformKey())) { throw new IllegalArgumentException(String.format("The transform_key value: %s is not a valid option", keyValueProcessorConfig.getTransformKey())); } @@ -261,11 +264,14 @@ public int skipGroup(final String str, int idx, final Character endChar) { i++; continue; } else if (str.charAt(i) == endChar) { - return i-1; + return i; } else i++; } - throw new RuntimeException("Bad Input, no end character found in "+str+" after index " + idx +", expected end char = "+endChar); + if (keyValueProcessorConfig.isStrictGroupingEnabled()) { + throw new RuntimeException("Bad Input, no end character found in "+str+" after index " + idx +", expected end char = "+endChar); + } + return i-1; } private void addPart(List parts, final String str, final int start, final int end) { @@ -279,7 +285,8 @@ public int findInStartGroup(final String str, int idx) { for (int j = 0; j < startGroupStrings.length; j++) { try { if (startGroupStrings[j].equals(str.substring(idx, idx+startGroupStrings[j].length()))) { - if (j <= 1 && idx > 0 && str.charAt(idx-1) != '\\') { + // For " and ', make sure, it's not escaped + if (j <= 1 && (idx == 0 || str.charAt(idx-1) != '\\')) { return j; } else if (j > 1) { return j; @@ -291,7 +298,7 @@ public int findInStartGroup(final String str, int idx) { } return -1; } - + private List parseWithValueGrouping(String str) { String fieldDelimiter = keyValueProcessorConfig.getFieldSplitCharacters(); Set fieldDelimiterSet = new HashSet<>(); @@ -308,10 +315,22 @@ private List parseWithValueGrouping(String str) { i++; continue; } + int groupIndex = findInStartGroup(str, i); + boolean skippedGroup = false; if (groupIndex >= 0) { - i = skipGroup(str, i+1, endGroupChars[groupIndex])+2; - } else if (fieldDelimiterSet.contains(str.charAt(i))) { + String[] s = keyValueDelimiterPattern.split(str.substring(start,i+1)); + // Only handle Grouping patterns in the values, not keys + if (s.length > 1 || startGroupStrings[groupIndex].charAt(0) == stringLiteralCharacter) { + i = skipGroup(str, i+1, endGroupChars[groupIndex]); + skippedGroup = true; + } + } + if (fieldDelimiterSet.contains(str.charAt(i))) { + // If end of group character is same as field delimiter, then include that in the value if value grouping is done + if (skippedGroup) { + i++; + } addPart(parts, str, start, i); i++; start = i; @@ -322,7 +341,7 @@ private List parseWithValueGrouping(String str) { if (start != i) { addPart(parts, str, start, i); } - + return parts; } @@ -460,7 +479,7 @@ private ObjectNode recurse(final String input, final ObjectMapper mapper) { valueEnd = pair.length() - 1; valueString = pair.substring(valueStart, valueEnd).stripLeading(); JsonNode child = ((ObjectNode) root).put(keyString, recurse(valueString, mapper)); - } + } } else { valueString = pair.substring(valueStart).stripLeading(); ObjectNode child = ((ObjectNode)root).put(keyString, valueString); @@ -484,15 +503,30 @@ private Map createRecursedMap(JsonNode node, ObjectMapper mapper return mapper.convertValue(node, new TypeReference>() {}); } + private boolean isIgnoredGroup(String group) { + // If a group starts and ends with stringLiteralCharacter, + // treat the entire group as key with null as the value + return stringLiteralCharacter != null && + group.charAt(0) == stringLiteralCharacter && + group.charAt(group.length()-1) == stringLiteralCharacter; + } + private Map createNonRecursedMap(String[] groups) { Map nonRecursedMap = new LinkedHashMap<>(); List valueList; for(final String group : groups) { + if (isIgnoredGroup(group)) { + if (validKeyAndValue(group, null)) { + nonRecursedMap.put(group, null); + } + continue; + } + final String[] terms = keyValueDelimiterPattern.split(group, 2); String key = terms[0]; Object value; - + if (terms.length == 2) { value = terms[1]; } else { @@ -508,7 +542,9 @@ private Map createNonRecursedMap(String[] groups) { } else { valueList = new ArrayList(); valueList.add(existingValue); - nonRecursedMap.put(key, valueList); + if (validKeyAndValue(key, valueList)) { + nonRecursedMap.put(key, valueList); + } } if (keyValueProcessorConfig.getSkipDuplicateValues()) { @@ -519,7 +555,9 @@ private Map createNonRecursedMap(String[] groups) { valueList.add(value); } } else { - nonRecursedMap.put(key, value); + if (validKeyAndValue(key, value)) { + nonRecursedMap.put(key, value); + } } } @@ -581,12 +619,11 @@ private Map executeConfigs(Map map) { LOG.debug("Skipping already included default key: '{}'", pair.getKey()); continue; } - processed.put(pair.getKey(), pair.getValue()); + if (validKeyAndValue(pair.getKey(), pair.getValue())) { + processed.put(pair.getKey(), pair.getValue()); + } } - if (keyValueProcessorConfig.getDropKeysWithNoValue()) { - processed.entrySet().removeIf(entry -> entry.getValue() == null); - } return processed; } @@ -594,7 +631,7 @@ private String[] trimWhitespace(String key, Object value) { String[] arr = {key.stripTrailing(), value.toString().stripLeading()}; return arr; } - + private String transformKey(String key) { if (keyValueProcessorConfig.getTransformKey().equals(lowercaseKey)) { key = key.toLowerCase(); @@ -606,9 +643,24 @@ private String transformKey(String key) { return key; } + private boolean validKeyAndValue(String key, Object value) { + if (key == null || key.isEmpty()) { + return false; + } + + if (keyValueProcessorConfig.getDropKeysWithNoValue() && value == null) { + return false; + } + return true; + } + private void addKeyValueToMap(final Map parsedMap, final String key, Object value) { Object processedValue = value; + if (!validKeyAndValue(key, value)) { + return; + } + if (value instanceof List) { List valueAsList = (List) value; if (valueAsList.size() == 1) { @@ -646,8 +698,12 @@ private void addKeyValueToMap(final Map parsedMap, final String private void writeToRoot(final Event event, final Map parsedJson) { for (Map.Entry entry : parsedJson.entrySet()) { - if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) { - event.put(entry.getKey(), entry.getValue()); + try { + if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) { + event.put(entry.getKey(), entry.getValue()); + } + } catch (IllegalArgumentException e) { + LOG.warn("Failed to put key: "+entry.getKey()+" value : "+entry.getValue()+" into event. ", e); } } } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 68d09408f3..84cdb868e9 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -9,6 +9,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Size; import java.util.ArrayList; import java.util.List; @@ -111,15 +112,40 @@ public class KeyValueProcessorConfig { @JsonProperty("key_value_when") private String keyValueWhen; + @JsonProperty("strict_grouping") + private boolean strictGrouping = false; + + @JsonProperty("string_literal_character") + @Size(min = 0, max = 1, message = "string_literal_character may only have character") + private String stringLiteralCharacter = null; + @AssertTrue(message = "Invalid Configuration. value_grouping option and field_delimiter_regex are mutually exclusive") boolean isValidValueGroupingAndFieldDelimiterRegex() { return (!valueGrouping || fieldDelimiterRegex == null); } + @AssertTrue(message = "Invalid Configuration. String literal character config is valid only when value_grouping is enabled, and only double quote (\") and single quote are (') are valid string literal characters.") + boolean isValidStringLiteralConfig() { + if (stringLiteralCharacter == null) + return true; + if ((!stringLiteralCharacter.equals("\"") && + (!stringLiteralCharacter.equals("'")))) + return false; + return valueGrouping; + } + public String getSource() { return source; } + public Character getStringLiteralCharacter() { + return stringLiteralCharacter == null ? null : stringLiteralCharacter.charAt(0); + } + + public boolean isStrictGroupingEnabled() { + return strictGrouping; + } + public String getDestination() { return destination; } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 019bbb7fab..2adfc37884 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -70,6 +70,7 @@ static Record buildRecordWithEvent(final Map data) { void setup() { final KeyValueProcessorConfig defaultConfig = new KeyValueProcessorConfig(); lenient().when(mockConfig.getSource()).thenReturn(defaultConfig.getSource()); + lenient().when(mockConfig.getStringLiteralCharacter()).thenReturn(null); lenient().when(mockConfig.getDestination()).thenReturn(defaultConfig.getDestination()); lenient().when(mockConfig.getFieldDelimiterRegex()).thenReturn(defaultConfig.getFieldDelimiterRegex()); lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(defaultConfig.getFieldSplitCharacters()); @@ -187,6 +188,7 @@ void testDropKeysWithNoValue() { @MethodSource("getKeyValueGroupingTestdata") void testMultipleKvToObjectKeyValueProcessorWithValueGrouping(String fieldDelimiters, String input, Map expectedResultMap) { lenient().when(mockConfig.getValueGrouping()).thenReturn(true); + lenient().when(mockConfig.getStringLiteralCharacter()).thenReturn('\"'); lenient().when(mockConfig.getDropKeysWithNoValue()).thenReturn(true); lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(fieldDelimiters); final KeyValueProcessor objectUnderTest = createObjectUnderTest(); @@ -208,17 +210,66 @@ private static Stream getKeyValueGroupingTestdata() { Arguments.of(", ", "key1=value1, key2=value2", Map.of("key1", "value1", "key2", "value2")), Arguments.of(", ", "key1=It\\'sValue1, key2=value2", Map.of("key1", "It\\'sValue1", "key2", "value2")), Arguments.of(", ", "text1 text2 key1=value1, key2=value2 text3 text4", Map.of("key1", "value1", "key2", "value2")), - Arguments.of(", ", "text1 text2 foo key1=value1 url=http://foo.com?bar=text,text&foo=zoo bar k2=\"http://bar.com?a=b&c=foo bar\" barr", Map.of("key1", "value1", "url", "http://foo.com?bar=text,text&foo=zoo", "k2", "\"http://bar.com?a=b&c=foo bar\"")), + Arguments.of(", ", "text1 text2 foo key1=value1 url=http://foo.com?bar=text,text&foo=zoo bar k2=\"http://bar.com?a=b&c=foo bar\" barr", Map.of("key1", "value1", "url", "http://foo.com?bar=text,text&foo=zoo", "k2", "\"http://bar.com?a=b&c=foo bar\"")), Arguments.of(", ", "vendorMessage=VendorMessage(uid=1847060493-1712778523223, feedValue=https://syosetu.org/novel/147705/15.html, bundleId=, linkType=URL, vendor=DOUBLEVERIFY, platform=DESKTOP, deviceTypeId=1, bidCount=6, appStoreTld=, feedSource=DSP, regions=[APAC], timestamp=1712778523223, externalId=)", Map.of("vendorMessage", "VendorMessage(uid=1847060493-1712778523223, feedValue=https://syosetu.org/novel/147705/15.html, bundleId=, linkType=URL, vendor=DOUBLEVERIFY, platform=DESKTOP, deviceTypeId=1, bidCount=6, appStoreTld=, feedSource=DSP, regions=[APAC], timestamp=1712778523223, externalId=)")), + Arguments.of(", ()", "foo bar(key1=value1, key2=value2, key3=)", Map.of("key1", "value1", "key2", "value2", "key3","")), + Arguments.of(", ", "foo bar(key1=value1, key2=value2, key3=)", Map.of("bar(key1", "value1", "key2", "value2", "key3",")")), + Arguments.of(", ", "foo bar[key1=value1, key2=value2, key3=]", Map.of("bar[key1", "value1", "key2", "value2", "key3","]")), + Arguments.of(", ", "foo bar{key1=value1, key2=value2, key3=}", Map.of("bar{key1", "value1", "key2", "value2", "key3","}")), + Arguments.of(", ", "key1 \"key2=val2\" key3=\"value3,value4\"", Map.of("key3", "\"value3,value4\"")), Arguments.of(", ", "key1=[value1,value2], key3=value3", Map.of("key1", "[value1,value2]", "key3", "value3")), Arguments.of(", ", "key1=(value1, value2), key3=value3", Map.of("key1", "(value1, value2)", "key3", "value3")), Arguments.of(", ", "key1=, key3=value3", Map.of("key1", "", "key3", "value3")), Arguments.of(", ", "key1={value1,value2}, key3=value3", Map.of("key1", "{value1,value2}", "key3", "value3")), Arguments.of(", ", "key1='value1,value2', key3=value3", Map.of("key1", "'value1,value2'", "key3", "value3")), + Arguments.of(", ", "foo key1=val1, key2=val2,key3=val3 bar", Map.of("key1", "val1", "key2", "val2", "key3", "val3")), + Arguments.of(", ", "foo,key1=(val1,key2=val2,val3),key4=val4 bar", Map.of("key1", "(val1,key2=val2,val3)", "key4", "val4")), + Arguments.of(", ", "foo,key1=(val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "(val1,key2=val2,val3,key4=val4 bar")), + + Arguments.of(", ", "foo,key1=[val1,key2=val2,val3],key4=val4 bar", Map.of("key1", "[val1,key2=val2,val3]", "key4", "val4")), + Arguments.of(", ", "foo,key1=[val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "[val1,key2=val2,val3,key4=val4 bar")), + + Arguments.of(", ", "foo,key1={val1,key2=val2,val3},key4=val4 bar", Map.of("key1", "{val1,key2=val2,val3}", "key4", "val4")), + Arguments.of(", ", "foo,key1={val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "{val1,key2=val2,val3,key4=val4 bar")), + + Arguments.of(", ", "foo,key1=,key4=val4 bar", Map.of("key1", "", "key4", "val4")), + Arguments.of(", ", "foo,key1= record = getMessage(message); + keyValueProcessor = createObjectUnderTest(); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + + final Event event = editedRecords.get(0).getData(); + assertThat(event.containsKey("parsed_message"), is(false)); + + assertThat(event.containsKey("key1"), is(true)); + assertThat(event.containsKey("key2"), is(true)); + assertThat(event.get("key1", Object.class), is("value1")); + } + @Test void testWriteToRoot() { when(mockConfig.getDestination()).thenReturn(null);