Skip to content

Commit

Permalink
Add tagging on failure for KeyValue processor (#3368)
Browse files Browse the repository at this point in the history
* readme, config done, main code integration in progress

Signed-off-by: Kat Shen <[email protected]>

* clarify readme with example output

Signed-off-by: Kat Shen <[email protected]>

* add import statement

Signed-off-by: Kat Shen <[email protected]>

* Add tagging on failure

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Kat Shen <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Co-authored-by: Kat Shen <[email protected]>
  • Loading branch information
oeyh and shenkw1 authored Sep 25, 2023
1 parent bcaaf1e commit f2593a9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 8 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ When run, the processor will parse the message into the following output:
* `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event.
* Default: `true`

* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags.
* Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata.

## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final Set<String> validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict);
final String delimiterBracketCheck = "[\\[\\]()<>]";
private final Set<Character> bracketSet = Set.of('[', ']', '(', ')', '<', '>');
private final List<String> tagsOnFailure;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) {
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure();

if (keyValueProcessorConfig.getFieldDelimiterRegex() != null
&& !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) {
if (keyValueProcessorConfig.getFieldSplitCharacters() != null
Expand Down Expand Up @@ -94,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces

if (keyValueProcessorConfig.getRecursive()
&& fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) {
throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse.");
throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse.");
}
}

Expand Down Expand Up @@ -211,8 +214,8 @@ private boolean validateRegex(final String pattern)
}

private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet, final Set<String> defaultSet) {
final Set<String> includeIntersectionSet = new HashSet<String>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<String>(defaultSet);
final Set<String> includeIntersectionSet = new HashSet<>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<>(defaultSet);

includeIntersectionSet.retainAll(excludeSet);
if (!includeIntersectionSet.isEmpty()) {
Expand All @@ -229,7 +232,7 @@ private void validateKeySets(final Set<String> includeSet, final Set<String> exc
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper mapper = new ObjectMapper();

for(final Record<Event> record : records) {
for (final Record<Event> record : records) {
final Map<String, Object> outputMap = new HashMap<>();
final Event recordEvent = record.getData();
final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
Expand All @@ -240,10 +243,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
JsonNode recursedTree = recurse(groupsRaw, mapper);
outputMap.putAll(createRecursedMap(recursedTree, mapper));
} catch (Exception e) {
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive");
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
} else {
outputMap.putAll(createNonRecursedMap(groups));
try {
outputMap.putAll(createNonRecursedMap(groups));
} catch (Exception e) {
LOG.error("Non-recursive parsing ran into an unexpected error", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}

final Map<String, Object> processedMap = executeConfigs(outputMap);
Expand Down Expand Up @@ -275,11 +284,11 @@ public void shutdown() {
}

private ObjectNode recurse(final String input, final ObjectMapper mapper) {
Stack<Character> bracketStack = new Stack<Character>();
Stack<Character> bracketStack = new Stack<>();
Map<Character, Character> bracketMap = initBracketMap();
int pairStart = 0;

ArrayList<String> pairs = new ArrayList<String>();
ArrayList<String> pairs = new ArrayList<>();
ObjectNode root = mapper.createObjectNode();

for (int i = 0; i < input.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class KeyValueProcessorConfig {
@NotNull
private boolean recursive = DEFAULT_RECURSIVE;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

Expand Down Expand Up @@ -170,6 +173,10 @@ public boolean getRecursive() {
return recursive;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() {
assertThatKeyEquals(parsed_message, "item1-subitem1", "default");
}

@Test
void testTagsAddedWhenParsingFails() {
when(mockConfig.getRecursive()).thenReturn(true);
when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2"));
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("item1=[]");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(0));
assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true));
}

@Test
void testShutdownIsReady() {
assertThat(keyValueProcessor.isReadyForShutdown(), is(true));
Expand Down

0 comments on commit f2593a9

Please sign in to comment.