Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tagging on failure for KeyValue processor #3368

Merged
merged 4 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ When run, the processor will parse the message into the following output:
* `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event.
* Default: `true`

* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags.
* Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata.

## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final Set<String> validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict);
final String delimiterBracketCheck = "[\\[\\]()<>]";
private final Set<Character> bracketSet = Set.of('[', ']', '(', ')', '<', '>');
private final List<String> tagsOnFailure;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) {
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure();

if (keyValueProcessorConfig.getFieldDelimiterRegex() != null
&& !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) {
if (keyValueProcessorConfig.getFieldSplitCharacters() != null
Expand Down Expand Up @@ -94,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces

if (keyValueProcessorConfig.getRecursive()
&& fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) {
throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse.");
throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse.");
}
}

Expand Down Expand Up @@ -211,8 +214,8 @@ private boolean validateRegex(final String pattern)
}

private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet, final Set<String> defaultSet) {
final Set<String> includeIntersectionSet = new HashSet<String>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<String>(defaultSet);
final Set<String> includeIntersectionSet = new HashSet<>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<>(defaultSet);

includeIntersectionSet.retainAll(excludeSet);
if (!includeIntersectionSet.isEmpty()) {
Expand All @@ -229,7 +232,7 @@ private void validateKeySets(final Set<String> includeSet, final Set<String> exc
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper mapper = new ObjectMapper();

for(final Record<Event> record : records) {
for (final Record<Event> record : records) {
final Map<String, Object> outputMap = new HashMap<>();
final Event recordEvent = record.getData();
final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
Expand All @@ -240,10 +243,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
JsonNode recursedTree = recurse(groupsRaw, mapper);
outputMap.putAll(createRecursedMap(recursedTree, mapper));
} catch (Exception e) {
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive");
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
} else {
outputMap.putAll(createNonRecursedMap(groups));
try {
outputMap.putAll(createNonRecursedMap(groups));
} catch (Exception e) {
LOG.error("Non-recursive parsing ran into an unexpected error", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}

final Map<String, Object> processedMap = executeConfigs(outputMap);
Expand Down Expand Up @@ -275,11 +284,11 @@ public void shutdown() {
}

private ObjectNode recurse(final String input, final ObjectMapper mapper) {
Stack<Character> bracketStack = new Stack<Character>();
Stack<Character> bracketStack = new Stack<>();
Map<Character, Character> bracketMap = initBracketMap();
int pairStart = 0;

ArrayList<String> pairs = new ArrayList<String>();
ArrayList<String> pairs = new ArrayList<>();
ObjectNode root = mapper.createObjectNode();

for (int i = 0; i < input.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class KeyValueProcessorConfig {
@NotNull
private boolean recursive = DEFAULT_RECURSIVE;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

Expand Down Expand Up @@ -170,6 +173,10 @@ public boolean getRecursive() {
return recursive;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() {
assertThatKeyEquals(parsed_message, "item1-subitem1", "default");
}

@Test
void testTagsAddedWhenParsingFails() {
when(mockConfig.getRecursive()).thenReturn(true);
when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2"));
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("item1=[]");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(0));
assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true));
}

@Test
void testShutdownIsReady() {
assertThat(keyValueProcessor.isReadyForShutdown(), is(true));
Expand Down
Loading