From d674f12649057f18192382dd747fa0e0edc9074d Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 20 Sep 2023 11:52:08 -0500 Subject: [PATCH] Add tagging on failure Signed-off-by: Hai Yan --- .../key-value-processor/README.md | 5 ++-- .../processor/keyvalue/KeyValueProcessor.java | 26 ++++++++++++------- .../keyvalue/KeyValueProcessorConfig.java | 10 +++---- .../keyvalue/KeyValueProcessorTests.java | 14 ++++++++++ 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index 84254e1b00..4cb53c84bc 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -99,9 +99,8 @@ When run, the processor will parse the message into the following output: * While `recursive` is `true`, `skip_duplicate_values` will always be `true`. * While `recursive` is `true`, `whitespace` will always be `"strict"`. -* `tag_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided value. - * Default: `["keyvalueprocessor_failure"]` - * Example: in the case of a runtime exception, the output will be `{"message": "some input message", "tags": ["keyvalueprocessor_failure"]}` +* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags. + * Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata. ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 7f24bf21e0..927dff3daa 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -56,14 +56,14 @@ public class KeyValueProcessor extends AbstractProcessor, Record validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict); final String delimiterBracketCheck = "[\\[\\]()<>]"; private final Set bracketSet = Set.of('[', ']', '(', ')', '<', '>'); - private final List tagOnFailure; + private final List tagsOnFailure; @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; - tagOnFailure = keyValueProcessorConfig.getTagOnFailure(); + tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure(); if (keyValueProcessorConfig.getFieldDelimiterRegex() != null && !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) { @@ -97,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (keyValueProcessorConfig.getRecursive() && fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) { - throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse."); + throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse."); } } @@ -214,8 +214,8 @@ private boolean validateRegex(final String pattern) } private void validateKeySets(final Set includeSet, final Set excludeSet, final Set defaultSet) { - final Set includeIntersectionSet = new HashSet(includeSet); - final Set defaultIntersectionSet = new HashSet(defaultSet); + final Set includeIntersectionSet = new HashSet<>(includeSet); + final Set defaultIntersectionSet = new HashSet<>(defaultSet); includeIntersectionSet.retainAll(excludeSet); if (!includeIntersectionSet.isEmpty()) { @@ -232,7 +232,7 @@ private void validateKeySets(final Set includeSet, final Set exc public Collection> doExecute(final Collection> records) { final ObjectMapper mapper = new ObjectMapper(); - for(final Record record : records) { + for (final Record record : records) { final Map outputMap = new HashMap<>(); final Event recordEvent = record.getData(); final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); @@ -243,10 +243,16 @@ public Collection> doExecute(final Collection> recor JsonNode recursedTree = recurse(groupsRaw, mapper); outputMap.putAll(createRecursedMap(recursedTree, mapper)); } catch (Exception e) { - LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive"); + LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e); + recordEvent.getMetadata().addTags(tagsOnFailure); } } else { - outputMap.putAll(createNonRecursedMap(groups)); + try { + outputMap.putAll(createNonRecursedMap(groups)); + } catch (Exception e) { + LOG.error("Non-recursive parsing ran into an unexpected error", e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } } final Map processedMap = executeConfigs(outputMap); @@ -258,11 +264,11 @@ public Collection> doExecute(final Collection> recor } private ObjectNode recurse(final String input, final ObjectMapper mapper) { - Stack bracketStack = new Stack(); + Stack bracketStack = new Stack<>(); Map bracketMap = initBracketMap(); int pairStart = 0; - ArrayList pairs = new ArrayList(); + ArrayList pairs = new ArrayList<>(); ObjectNode root = mapper.createObjectNode(); for (int i = 0; i < input.length(); i++) { diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index b8e2077771..a211bd375f 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -10,7 +10,6 @@ import jakarta.validation.constraints.NotNull; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ public class KeyValueProcessorConfig { static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false; static final boolean DEFAULT_REMOVE_BRACKETS = false; static final boolean DEFAULT_RECURSIVE = false; - static final List DEFAULT_TAG_ON_FAILURE = new ArrayList<>(Arrays.asList("keyvalueprocessor_failure")); @NotEmpty private String source = DEFAULT_SOURCE; @@ -98,8 +96,8 @@ public class KeyValueProcessorConfig { @NotNull private boolean recursive = DEFAULT_RECURSIVE; - @JsonProperty("tag_on_failure") - private List tagOnFailure = DEFAULT_TAG_ON_FAILURE; + @JsonProperty("tags_on_failure") + private List tagsOnFailure; public String getSource() { return source; @@ -173,7 +171,7 @@ public boolean getRecursive() { return recursive; } - public List getTagOnFailure() { - return tagOnFailure; + public List getTagsOnFailure() { + return tagsOnFailure; } } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 38da5beebf..1d738af4c9 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -743,6 +743,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() { assertThatKeyEquals(parsed_message, "item1-subitem1", "default"); } + @Test + void testTagsAddedWhenParsingFails() { + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2")); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[]"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(0)); + assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true)); + } + @Test void testShutdownIsReady() { assertThat(keyValueProcessor.isReadyForShutdown(), is(true));