Skip to content

Commit

Permalink
Add overwrite option to parse-json processor
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Aug 17, 2023
1 parent 7c3b1d5 commit 3ee162f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfKeyExists;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -54,6 +55,7 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics,
pointer = parseJsonProcessorConfig.getPointer();
parseWhen = parseJsonProcessorConfig.getParseWhen();
tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure();
overwriteIfKeyExists = parseJsonProcessorConfig.getOverwriteIfKeyExists();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -85,7 +87,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

if (doWriteToRoot) {
writeToRoot(event, parsedJson);
} else {
} else if (overwriteIfKeyExists || !event.containsKey(destination)) {
event.put(destination, parsedJson);
}
} catch (final JsonProcessingException jsonException) {
Expand Down Expand Up @@ -169,7 +171,9 @@ private String trimPointer(String pointer) {

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
event.put(entry.getKey(), entry.getValue());
if (overwriteIfKeyExists || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class ParseJsonProcessorConfig {
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_key_exists")
private boolean overwriteIfKeyExists = true;

/**
* The field of the Event that contains the JSON data.
*
Expand Down Expand Up @@ -68,6 +71,10 @@ public List<String> getTagsOnFailure() {

public String getParseWhen() { return parseWhen; }

public boolean getOverwriteIfKeyExists() {
return overwriteIfKeyExists;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfKeyExists(), equalTo(true));
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void setup() {
when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination());
when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer());
when(processorConfig.getParseWhen()).thenReturn(null);
when(processorConfig.getOverwriteIfKeyExists()).thenReturn(true);
}

private ParseJsonProcessor createObjectUnderTest() {
Expand Down Expand Up @@ -94,6 +95,42 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() {
assertThatKeyEquals(parsedEvent, "key", "value");
}

@Test
void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() {
final String source = "root_source";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getOverwriteIfKeyExists()).thenReturn(false);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Map.ofEntries(
entry(source,"value_that_will_overwrite_source"),
entry("key","value")
);

final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_overwrite_source\", \"key\":\"value\"}");
assertThatKeyEquals(parsedEvent, "key", "value");
}

@Test
void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields() {
final String source = "root_source";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getDestination()).thenReturn(source); // write back to source
when(processorConfig.getOverwriteIfKeyExists()).thenReturn(false);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Map.of("key","value");

final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}");
assertThat(parsedEvent.containsKey("key"), equalTo(false));
}

@Test
void test_when_valueIsEmpty_then_notParsed() {
parseJsonProcessor = createObjectUnderTest();
Expand Down

0 comments on commit 3ee162f

Please sign in to comment.