diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index b9048a6a29..02c01a7c4c 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -49,9 +49,9 @@ When run, the processor will parse the message into the following output: * Default: `{}` * Example: `default_values` is `{"defaultkey": "defaultvalue"}`. `key1=value1` will parse into `{"key1": "value1", "defaultkey": "defaultvalue"}` * If the default key already exists in the message, the value is not changed. - * Example: `default_values` is `{"value1": "abc"}`. `key1=value1` will parse into `{"key1": "value1"}` + * Example: `default_values` is `{"key1": "abc"}`. `key1=value1` will parse into `{"key1": "value1"}` * It should be noted that the include_keys filter will be applied to the message first, and then default keys. - * Example: `include_keys` is `["key1"]`, and `default_keys` is `{"key2": "value2"}`. `key1=value1&key2=abc` will parse into `{"key1": "value1", "key2": "value2"}` + * Example: `include_keys` is `["key1"]`, and `default_values` is `{"key2": "value2"}`. `key1=value1&key2=abc` will parse into `{"key1": "value1", "key2": "value2"}` * `key_value_delimiter_regex` - A regex specifying the delimiter between a key and a value. Special regex characters such as `[` and `]` must be escaped using `\\`. * There is no default. * Note: This cannot be defined at the same time as `value_split_characters` @@ -74,8 +74,8 @@ When run, the processor will parse the message into the following output: * `transform_key` - Change keys to lowercase, uppercase, or all capitals. * Default is an empty string (no transformation) * Example: `transform_key` is `lowercase`. `{"Key1=value1"}` will parse into `{"key1": "value1"}` - * Example: `transform_key` is `uppercase`. `{"key1=value1"}` will parse into `{"Key1": "value1"}` - * Example: `transform_key` is `capitalize`. `{"key1=value1"}` will parse into `{"KEY1": "value1"}` + * Example: `transform_key` is `capitalize`. `{"key1=value1"}` will parse into `{"Key1": "value1"}` + * Example: `transform_key` is `uppercase`. `{"key1=value1"}` will parse into `{"KEY1": "value1"}` * `whitespace` - Specify whether to be lenient or strict with the acceptance of unnecessary whitespace surrounding the configured value-split sequence. * Default: `lenient` * Example: `whitespace` is `"lenient"`. `{"key1 = value1"}` will parse into `{"key1 ": " value1"}` @@ -88,7 +88,16 @@ When run, the processor will parse the message into the following output: * Default: `false` * Example: `remove_brackets` is `true`. `{"key1=(value1)"}` will parse into `{"key1": value1}` * Example: `remove_brackets` is `false`. `{"key1=(value1)"}` will parse into `{"key1": "(value1)"}` - * In the case of a key-value pair with a brackets and a split character, the splitting will take priority over `remove_brackets=true`. `{key1=(value1&value2)}` will parse into `{"key1":"value1","value2)":null}` + * In the case of a key-value pair with a brackets and a split character, the splitting will take priority over `remove_brackets=true`. `{"key1=(value1&value2)"}` will parse into `{"key1":"value1","value2)":null}` +* `recursive` - Specify whether to drill down into values and recursively get more key-value pairs from it. The extra key-value pairs will be stored as subkeys of the root key. + * Default: `false` + * The levels of recursive parsing must be defined by different brackets for each level: `[]`, `()`, and `<>` in this order. + * Example: `recursive` is true. `{"item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=(item1-subitem2-subitem2A=item1-subitem2-subitem2A-value&item1-subitem2-subitem2B=item1-subitem2-subitem2B-value)]&item2=item2-value"}` will parse into `"item1": {"item1-subitem1": "item1-subitem1-value", "item1-subitem2": {"item1-subitem2-subitem2A": "item1-subitem2-subitem2A-value", "item1-subitem2-subitem2B": "item1-subitem2-subitem2B-value"}}` + * Example: `recursive` is false. `{"item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=(item1-subitem2-subitem2A=item1-subitem2-subitem2A-value&item1-subitem2-subitem2B=item1-subitem2-subitem2B-value)]&item2=item2-value"}` will parse into `"item1-subitem2": "(item1-subitem2-subitem2A=item1-subitem2-subitem2A-value", "item2": "item2-value","item1": "[item1-subitem1=item1-subitem1-value", "item1-subitem2-subitem2B": "item1-subitem2-subitem2B-value)]"` + * Any other configurations specified will only be applied on the OUTER keys. + * While `recursive` is `true`, `remove_brackets` cannot also be `true`. + * While `recursive` is `true`, `skip_duplicate_values` will always be `true`. + * While `recursive` is `true`, `whitespace` will always be `"strict"`. ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index bca9b11540..fb11a3386d 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -15,17 +15,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.core.type.TypeReference; + import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.regex.Matcher; +import java.util.Stack; +import java.util.ArrayList; @DataPrepperPlugin(name = "key_value", pluginType = Processor.class, pluginConfigurationType = KeyValueProcessorConfig.class) public class KeyValueProcessor extends AbstractProcessor, Record> { @@ -46,36 +54,53 @@ public class KeyValueProcessor extends AbstractProcessor, Record validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict); + final String delimiterBracketCheck = "[\\[\\]()<>]"; + private final Set bracketSet = Set.of('[', ']', '(', ')', '<', '>'); @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; - if(keyValueProcessorConfig.getFieldDelimiterRegex() != null + if (keyValueProcessorConfig.getFieldDelimiterRegex() != null && !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) { - if(keyValueProcessorConfig.getFieldSplitCharacters() != null + if (keyValueProcessorConfig.getFieldSplitCharacters() != null && !keyValueProcessorConfig.getFieldSplitCharacters().isEmpty()) { throw new IllegalArgumentException("field_delimiter_regex and field_split_characters cannot both be defined."); - } else if(!validateRegex(keyValueProcessorConfig.getFieldDelimiterRegex())) { + } else if (!validateRegex(keyValueProcessorConfig.getFieldDelimiterRegex())) { throw new PatternSyntaxException("field_delimiter_regex is not a valid regex string", keyValueProcessorConfig.getFieldDelimiterRegex(), -1); } fieldDelimiterPattern = Pattern.compile(keyValueProcessorConfig.getFieldDelimiterRegex()); + + if (keyValueProcessorConfig.getRecursive()) { + if (fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) { + throw new IllegalArgumentException("While recursive is true, the set field delimiter regex cannot contain brackets while you are trying to recurse."); + } + } } else { String regex; if (keyValueProcessorConfig.getFieldSplitCharacters().isEmpty()) { regex = KeyValueProcessorConfig.DEFAULT_FIELD_SPLIT_CHARACTERS; } else { + if (keyValueProcessorConfig.getRecursive() + && keyValueProcessorConfig.getFieldSplitCharacters().length() != 1) { + throw new IllegalArgumentException("While recursive is true, the set field split characters is limited to one character only."); + } regex = buildRegexFromCharacters(keyValueProcessorConfig.getFieldSplitCharacters()); } fieldDelimiterPattern = Pattern.compile(regex); + + if (keyValueProcessorConfig.getRecursive() + && fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) { + throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse."); + } } - if(keyValueProcessorConfig.getKeyValueDelimiterRegex() != null + if (keyValueProcessorConfig.getKeyValueDelimiterRegex() != null && !keyValueProcessorConfig.getKeyValueDelimiterRegex().isEmpty()) { - if(keyValueProcessorConfig.getValueSplitCharacters() != null + if (keyValueProcessorConfig.getValueSplitCharacters() != null && !keyValueProcessorConfig.getValueSplitCharacters().isEmpty()) { throw new IllegalArgumentException("key_value_delimiter_regex and value_split_characters cannot both be defined."); } else if (!validateRegex(keyValueProcessorConfig.getKeyValueDelimiterRegex())) { @@ -83,15 +108,30 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces } keyValueDelimiterPattern = Pattern.compile(keyValueProcessorConfig.getKeyValueDelimiterRegex()); + + if (keyValueProcessorConfig.getRecursive() + && keyValueDelimiterPattern.matcher(delimiterBracketCheck).matches()) { + throw new IllegalArgumentException("While recursive is true, the set key value delimiter regex cannot contain brackets while you are trying to recurse."); + } } else { String regex; - if(keyValueProcessorConfig.getValueSplitCharacters().isEmpty()) { + if (keyValueProcessorConfig.getValueSplitCharacters().isEmpty()) { regex = KeyValueProcessorConfig.DEFAULT_VALUE_SPLIT_CHARACTERS; } else { + if (keyValueProcessorConfig.getRecursive() + && keyValueProcessorConfig.getValueSplitCharacters().length() != 1) { + throw new IllegalArgumentException("While recursive is true, the set value split characters is limited to one character only."); + } + regex = buildRegexFromCharacters(keyValueProcessorConfig.getValueSplitCharacters()); } keyValueDelimiterPattern = Pattern.compile(regex); + + if (keyValueProcessorConfig.getRecursive() + && keyValueDelimiterPattern.matcher(delimiterBracketCheck).matches()) { + throw new IllegalArgumentException("While recursive is true, the set value split characters cannot contain brackets while you are trying to recurse."); + } } if (!validateRegex(keyValueProcessorConfig.getDeleteKeyRegex())) { @@ -122,6 +162,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces final Pattern boolCheck = Pattern.compile("true|false", Pattern.CASE_INSENSITIVE); final Matcher duplicateValueBoolMatch = boolCheck.matcher(String.valueOf(keyValueProcessorConfig.getSkipDuplicateValues())); final Matcher removeBracketsBoolMatch = boolCheck.matcher(String.valueOf(keyValueProcessorConfig.getRemoveBrackets())); + final Matcher recursiveBoolMatch = boolCheck.matcher(String.valueOf(keyValueProcessorConfig.getRecursive())); if (!duplicateValueBoolMatch.matches()) { throw new IllegalArgumentException(String.format("The skip_duplicate_values value must be either true or false", keyValueProcessorConfig.getSkipDuplicateValues())); @@ -130,13 +171,21 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (!removeBracketsBoolMatch.matches()) { throw new IllegalArgumentException(String.format("The remove_brackets value must be either true or false", keyValueProcessorConfig.getRemoveBrackets())); } + + if (!recursiveBoolMatch.matches()) { + throw new IllegalArgumentException(String.format("The recursive value must be either true or false", keyValueProcessorConfig.getRemoveBrackets())); + } + + if (keyValueProcessorConfig.getRemoveBrackets() && keyValueProcessorConfig.getRecursive()) { + throw new IllegalArgumentException("Cannot remove brackets needed for determining levels of recursion"); + } } private String buildRegexFromCharacters(String s) { char[] splitters = s.toCharArray(); StringBuilder regexedFieldSplitCharacters = new StringBuilder(); for(char c : splitters) { - if(Objects.equals(c, '\\')) { + if (Objects.equals(c, '\\')) { regexedFieldSplitCharacters.append(c); } else { regexedFieldSplitCharacters.append(c).append('|'); @@ -150,7 +199,7 @@ private String buildRegexFromCharacters(String s) { private boolean validateRegex(final String pattern) { - if(pattern != null && !Objects.equals(pattern, "")) { + if (pattern != null && !Objects.equals(pattern, "")) { try { Pattern.compile(pattern); } catch (PatternSyntaxException e) { @@ -178,80 +227,218 @@ private void validateKeySets(final Set includeSet, final Set exc @Override public Collection> doExecute(final Collection> records) { + final ObjectMapper mapper = new ObjectMapper(); + for(final Record record : records) { - final Map parsedMap = new HashMap<>(); + final Map outputMap = new HashMap<>(); final Event recordEvent = record.getData(); - final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0); - for(final String group : groups) { - final String[] terms = keyValueDelimiterPattern.split(group, 2); - String key = terms[0]; - Object value; - - if (!includeKeysSet.isEmpty() && !includeKeysSet.contains(key)) { - LOG.debug("Skipping not included key: '{}'", key); - continue; + if (keyValueProcessorConfig.getRecursive()) { + try { + JsonNode recursedTree = recurse(groupsRaw, mapper); + outputMap.putAll(createRecursedMap(recursedTree, mapper)); + } catch (Exception e) { + LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive"); } + } else { + outputMap.putAll(createNonRecursedMap(groups)); + } - if (excludeKeysSet.contains(key)) { - LOG.debug("Key is being excluded: '{}'", key); - continue; - } + final Map processedMap = executeConfigs(outputMap); - if(keyValueProcessorConfig.getDeleteKeyRegex() != null && !Objects.equals(keyValueProcessorConfig.getDeleteKeyRegex(), "")) { - key = key.replaceAll(keyValueProcessorConfig.getDeleteKeyRegex(), ""); - } - key = keyValueProcessorConfig.getPrefix() + key; + recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap); + } - if (terms.length == 2) { - value = terms[1]; - } else { - LOG.debug("Unsuccessful match: '{}'", terms[0]); - value = keyValueProcessorConfig.getNonMatchValue(); - } + return records; + } + + private ObjectNode recurse(final String input, final ObjectMapper mapper) { + Stack bracketStack = new Stack(); + Map bracketMap = initBracketMap(); + int pairStart = 0; + + ArrayList pairs = new ArrayList(); + ObjectNode root = mapper.createObjectNode(); + + for (int i = 0; i < input.length(); i++) { + if (bracketMap.containsKey(input.charAt(i))) { + bracketStack.push(input.charAt(i)); + } - if(value != null - && value instanceof String - && keyValueProcessorConfig.getDeleteValueRegex() != null - && !Objects.equals(keyValueProcessorConfig.getDeleteValueRegex(), "")) { - value = ((String)value).replaceAll(keyValueProcessorConfig.getDeleteValueRegex(), ""); + if (bracketMap.containsValue(input.charAt(i)) && !bracketStack.isEmpty()) { + if (bracketMap.get(bracketStack.peek()) == input.charAt(i)) { + bracketStack.pop(); } + } - if (keyValueProcessorConfig.getWhitespace().equals(whitespaceStrict)) { - String[] whitespace_arr = trimWhitespace(key, value); - key = whitespace_arr[0]; - value = whitespace_arr[1]; + if (bracketStack.isEmpty() && fieldDelimiterPattern.matcher(String.valueOf(input.charAt(i))).matches()) { + String pair = input.substring(pairStart, i); + pairs.add(pair); + pairStart = i + 1; + } + } + + pairs.add(input.substring(pairStart)); + + for (final String pair : pairs) { + int keyStart = 0; + int keyEnd = -1; + int valueStart = -1; + int valueEnd = -1; + String keyString = ""; + String valueString; + final Character whitespaceChar = ' '; + + bracketStack.clear(); + + for (int i = 0; i < pair.length(); i++) { + if (bracketStack.isEmpty() && keyValueDelimiterPattern.matcher(String.valueOf(pair.charAt(i))).matches()) { + keyString = pair.substring(keyStart, i).stripTrailing(); + valueStart = i + 1; + while(pair.charAt(valueStart) == whitespaceChar) { + valueStart++; + } + break; } + } + + if (keyString.isBlank()) { + keyString = pair; + LOG.debug("Unsuccessful match: '{}'", keyString); + valueString = keyValueProcessorConfig.getNonMatchValue().toString().stripLeading(); + } else if (bracketMap.containsKey(pair.charAt(valueStart))) { + if (pair.charAt(pair.length() - 1) == bracketMap.get(pair.charAt(valueStart))) { + valueStart++; + valueEnd = pair.length() - 1; + valueString = pair.substring(valueStart, valueEnd).stripLeading(); + JsonNode child = ((ObjectNode) root).put(keyString, recurse(valueString, mapper)); + } + } else { + valueString = pair.substring(valueStart).stripLeading(); + ObjectNode child = ((ObjectNode)root).put(keyString, valueString); + } + } - if (keyValueProcessorConfig.getTransformKey() != null - && !keyValueProcessorConfig.getTransformKey().isEmpty()) { - key = transformKey(key); + return root; + } + + private static Map initBracketMap() { + Map bracketMap = new HashMap<>(); + + bracketMap.put('[', ']'); + bracketMap.put('(', ')'); + bracketMap.put('<', '>'); + + return bracketMap; + } + + private Map createRecursedMap(JsonNode node, ObjectMapper mapper) { + return mapper.convertValue(node, new TypeReference>() {}); + } + + private Map createNonRecursedMap(String[] groups) { + Map nonRecursedMap = new LinkedHashMap<>(); + List valueList; + + for(final String group : groups) { + final String[] terms = keyValueDelimiterPattern.split(group, 2); + String key = terms[0]; + Object value; + + if (terms.length == 2) { + value = terms[1]; + } else { + LOG.debug("Unsuccessful match: '{}'", terms[0]); + value = keyValueProcessorConfig.getNonMatchValue(); + } + + if (nonRecursedMap.containsKey(key)) { + Object existingValue = nonRecursedMap.get(key); + + if (existingValue instanceof List) { + valueList = (List) existingValue; + } else { + valueList = new ArrayList(); + valueList.add(existingValue); + nonRecursedMap.put(key, valueList); } - if (keyValueProcessorConfig.getRemoveBrackets()) { - final String bracketRegex = "[\\[\\]()<>]"; - if (value != null) { - value = value.toString().replaceAll(bracketRegex,""); + if (keyValueProcessorConfig.getSkipDuplicateValues()) { + if (!valueList.contains(value)) { + valueList.add(value); } + } else { + valueList.add(value); } + } else { + nonRecursedMap.put(key, value); + } + } - addKeyValueToMap(parsedMap, key, value); + return nonRecursedMap; + } + + private Map executeConfigs(Map map) { + Map processed = new HashMap<>(); + + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (!includeKeysSet.isEmpty() && !includeKeysSet.contains(key)) { + LOG.debug("Skipping not included key: '{}'", key); + continue; } - for (Map.Entry pair : defaultValuesMap.entrySet()) { - if (parsedMap.containsKey(pair.getKey())) { - LOG.debug("Skipping already included default key: '{}'", pair.getKey()); - continue; + if (excludeKeysSet.contains(key)) { + LOG.debug("Key is being excluded: '{}'", key); + continue; + } + + if (keyValueProcessorConfig.getDeleteKeyRegex() != null && !Objects.equals(keyValueProcessorConfig.getDeleteKeyRegex(), "")) { + key = key.replaceAll(keyValueProcessorConfig.getDeleteKeyRegex(), ""); + } + key = keyValueProcessorConfig.getPrefix() + key; + + if (value != null + && value instanceof String + && keyValueProcessorConfig.getDeleteValueRegex() != null + && !Objects.equals(keyValueProcessorConfig.getDeleteValueRegex(), "")) { + value = ((String)value).replaceAll(keyValueProcessorConfig.getDeleteValueRegex(), ""); + } + + if (keyValueProcessorConfig.getWhitespace().equals(whitespaceStrict)) { + String[] whitespace_arr = trimWhitespace(key, value); + key = whitespace_arr[0]; + value = whitespace_arr[1]; + } + + if (keyValueProcessorConfig.getTransformKey() != null + && !keyValueProcessorConfig.getTransformKey().isEmpty()) { + key = transformKey(key); + } + + if (keyValueProcessorConfig.getRemoveBrackets()) { + final String bracketRegex = "[\\[\\]()<>]"; + if (value != null) { + value = value.toString().replaceAll(bracketRegex,""); } - parsedMap.put(pair.getKey(), pair.getValue()); } - recordEvent.put(keyValueProcessorConfig.getDestination(), parsedMap); + addKeyValueToMap(processed, key, value); } - return records; + for (Map.Entry pair : defaultValuesMap.entrySet()) { + if (processed.containsKey(pair.getKey())) { + LOG.debug("Skipping already included default key: '{}'", pair.getKey()); + continue; + } + processed.put(pair.getKey(), pair.getValue()); + } + + return processed; } private String[] trimWhitespace(String key, Object value) { @@ -262,38 +449,47 @@ private String[] trimWhitespace(String key, Object value) { private String transformKey(String key) { if (keyValueProcessorConfig.getTransformKey().equals(lowercaseKey)) { key = key.toLowerCase(); - } else if (keyValueProcessorConfig.getTransformKey().equals(uppercaseKey)) { - key = key.substring(0, 1).toUpperCase() + key.substring(1); } else if (keyValueProcessorConfig.getTransformKey().equals(capitalizeKey)) { + key = key.substring(0, 1).toUpperCase() + key.substring(1); + } else if (keyValueProcessorConfig.getTransformKey().equals(uppercaseKey)) { key = key.toUpperCase(); } return key; } - private void addKeyValueToMap(final Map parsedMap, final String key, final Object value) { - if(!parsedMap.containsKey(key)) { - parsedMap.put(key, value); + private void addKeyValueToMap(final Map parsedMap, final String key, Object value) { + Object processedValue = value; + + if (value instanceof List) { + List valueAsList = (List) value; + if (valueAsList.size() == 1) { + processedValue = valueAsList.get(0); + } + } + + if (!parsedMap.containsKey(key)) { + parsedMap.put(key, processedValue); return; } if (parsedMap.get(key) instanceof List) { if (keyValueProcessorConfig.getSkipDuplicateValues()) { - if (((List) parsedMap.get(key)).contains(value)) { + if (((List) parsedMap.get(key)).contains(processedValue)) { return; } } - ((List) parsedMap.get(key)).add(value); + ((List) parsedMap.get(key)).add(processedValue); } else { if (keyValueProcessorConfig.getSkipDuplicateValues()) { - if (parsedMap.containsValue(value)) { + if (parsedMap.containsValue(processedValue)) { return; } } final LinkedList combinedList = new LinkedList<>(); combinedList.add(parsedMap.get(key)); - combinedList.add(value); + combinedList.add(processedValue); parsedMap.replace(key, combinedList); } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 62a2b718f6..9688a38441 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -29,6 +29,7 @@ public class KeyValueProcessorConfig { static final String DEFAULT_WHITESPACE = "lenient"; static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false; static final boolean DEFAULT_REMOVE_BRACKETS = false; + static final boolean DEFAULT_RECURSIVE = false; @NotEmpty private String source = DEFAULT_SOURCE; @@ -91,6 +92,10 @@ public class KeyValueProcessorConfig { @NotNull private boolean removeBrackets = DEFAULT_REMOVE_BRACKETS; + @JsonProperty("recursive") + @NotNull + private boolean recursive = DEFAULT_RECURSIVE; + public String getSource() { return source; } @@ -158,4 +163,8 @@ public boolean getSkipDuplicateValues() { public boolean getRemoveBrackets() { return removeBrackets; } + + public boolean getRecursive() { + return recursive; + } } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index db50b7a403..38da5beebf 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -71,6 +71,7 @@ void setup() { lenient().when(mockConfig.getWhitespace()).thenReturn(defaultConfig.getWhitespace()); lenient().when(mockConfig.getSkipDuplicateValues()).thenReturn(defaultConfig.getSkipDuplicateValues()); lenient().when(mockConfig.getRemoveBrackets()).thenReturn(defaultConfig.getRemoveBrackets()); + lenient().when(mockConfig.getRecursive()).thenReturn(defaultConfig.getRecursive()); keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); } @@ -535,7 +536,7 @@ void testUppercaseTransformKvProcessor() { final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); assertThat(parsed_message.size(), equalTo(1)); - assertThatKeyEquals(parsed_message, "Key1", "value1"); + assertThatKeyEquals(parsed_message, "KEY1", "value1"); } @Test @@ -547,7 +548,7 @@ void testCapitalizeTransformKvProcessor() { final LinkedHashMap parsedMessage = getLinkedHashMap(editedRecords); assertThat(parsedMessage.size(), equalTo(1)); - assertThatKeyEquals(parsedMessage, "KEY1", "value1"); + assertThatKeyEquals(parsedMessage, "Key1", "value1"); } @Test @@ -564,8 +565,6 @@ void testStrictWhitespaceKvProcessor() { @Test void testFalseSkipDuplicateValuesKvProcessor() { - when(mockConfig.getSkipDuplicateValues()).thenReturn(false); - final Record record = getMessage("key1=value1&key1=value1"); final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); @@ -633,6 +632,117 @@ void testTrueRemoveMultipleBracketsKvProcessor() { assertThatKeyEquals(parsed_message, "key2", "value1value2"); } + @Test + void testBasicRecursiveKvProcessor() { + when(mockConfig.getRecursive()).thenReturn(true); + + final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final Map expectedValueMap = new HashMap<>(); + expectedValueMap.put("item1-subitem1", "item1-subitem1-value"); + expectedValueMap.put("item1-subitem2", "item1-subitem2-value"); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "item1", expectedValueMap); + assertThatKeyEquals(parsed_message, "item2", "item2-value"); + } + + @Test + void testMultiRecursiveKvProcessor() { + when(mockConfig.getRecursive()).thenReturn(true); + + final Record record = getMessage("item1=[item1-subitem1=(inner1=abc&inner2=xyz)&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final Map expectedValueMap = new HashMap<>(); + final Map nestedInnerMap = new HashMap<>(); + + nestedInnerMap.put("inner1", "abc"); + nestedInnerMap.put("inner2", "xyz"); + expectedValueMap.put("item1-subitem1", nestedInnerMap); + expectedValueMap.put("item1-subitem2", "item1-subitem2-value"); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "item1", expectedValueMap); + assertThatKeyEquals(parsed_message, "item2", "item2-value"); + } + + @Test + void testTransformKeyRecursiveKvProcessor() { + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getTransformKey()).thenReturn("capitalize"); + + final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final Map expectedValueMap = new HashMap<>(); + expectedValueMap.put("item1-subitem1", "item1-subitem1-value"); + expectedValueMap.put("item1-subitem2", "item1-subitem2-value"); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "Item1", expectedValueMap); + assertThatKeyEquals(parsed_message, "Item2", "item2-value"); + } + + @Test + void testIncludeInnerKeyRecursiveKvProcessor() { + final List includeKeys = List.of("item1-subitem1"); + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getIncludeKeys()).thenReturn(includeKeys); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(0)); + } + + @Test + void testExcludeInnerKeyRecursiveKvProcessor() { + final List excludeKeys = List.of("item1-subitem1"); + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final Map expectedValueMap = new HashMap<>(); + expectedValueMap.put("item1-subitem1", "item1-subitem1-value"); + expectedValueMap.put("item1-subitem2", "item1-subitem2-value"); + + assertThat(parsed_message.size(), equalTo(2)); + assertThatKeyEquals(parsed_message, "item1", expectedValueMap); + assertThatKeyEquals(parsed_message, "item2", "item2-value"); + } + + @Test + void testDefaultInnerKeyRecursiveKvProcessor() { + final Map defaultMap = Map.of("item1-subitem1", "default"); + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getDefaultValues()).thenReturn(defaultMap); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[item1-subitem1=item1-subitem1-value&item1-subitem2=item1-subitem2-value]&item2=item2-value"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final Map expectedValueMap = new HashMap<>(); + expectedValueMap.put("item1-subitem1", "item1-subitem1-value"); + expectedValueMap.put("item1-subitem2", "item1-subitem2-value"); + + assertThat(parsed_message.size(), equalTo(3)); + assertThatKeyEquals(parsed_message, "item1", expectedValueMap); + assertThatKeyEquals(parsed_message, "item2", "item2-value"); + assertThatKeyEquals(parsed_message, "item1-subitem1", "default"); + } + @Test void testShutdownIsReady() { assertThat(keyValueProcessor.isReadyForShutdown(), is(true));