From 3ee162f65ceb929fb20c301113adef2e9f4cc93b Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Thu, 17 Aug 2023 15:52:43 -0500 Subject: [PATCH] Add overwrite option to parse-json processor Signed-off-by: Hai Yan --- .../parsejson/ParseJsonProcessor.java | 8 +++- .../parsejson/ParseJsonProcessorConfig.java | 7 ++++ .../ParseJsonProcessorConfigTest.java | 1 + .../parsejson/ParseJsonProcessorTest.java | 37 +++++++++++++++++++ 4 files changed, 51 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java index e076b7de10..a2f5ad2ea4 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java @@ -40,6 +40,7 @@ public class ParseJsonProcessor extends AbstractProcessor, Record< private final String pointer; private final String parseWhen; private final List tagsOnFailure; + private final boolean overwriteIfKeyExists; private final ExpressionEvaluator expressionEvaluator; @@ -54,6 +55,7 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics, pointer = parseJsonProcessorConfig.getPointer(); parseWhen = parseJsonProcessorConfig.getParseWhen(); tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure(); + overwriteIfKeyExists = parseJsonProcessorConfig.getOverwriteIfKeyExists(); this.expressionEvaluator = expressionEvaluator; } @@ -85,7 +87,7 @@ public Collection> doExecute(final Collection> recor if (doWriteToRoot) { writeToRoot(event, parsedJson); - } else { + } else if (overwriteIfKeyExists || !event.containsKey(destination)) { event.put(destination, parsedJson); } } catch (final JsonProcessingException jsonException) { @@ -169,7 +171,9 @@ private String trimPointer(String pointer) { private void writeToRoot(final Event event, final Map parsedJson) { for (Map.Entry entry : parsedJson.entrySet()) { - event.put(entry.getKey(), entry.getValue()); + if (overwriteIfKeyExists || !event.containsKey(entry.getKey())) { + event.put(entry.getKey(), entry.getValue()); + } } } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java index fa6dbe199b..ae49c07596 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java @@ -31,6 +31,9 @@ public class ParseJsonProcessorConfig { @JsonProperty("tags_on_failure") private List tagsOnFailure; + @JsonProperty("overwrite_if_key_exists") + private boolean overwriteIfKeyExists = true; + /** * The field of the Event that contains the JSON data. * @@ -68,6 +71,10 @@ public List getTagsOnFailure() { public String getParseWhen() { return parseWhen; } + public boolean getOverwriteIfKeyExists() { + return overwriteIfKeyExists; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java index ea3ef5a7f7..4012f7a848 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getDestination(), equalTo(null)); assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); + assertThat(objectUnderTest.getOverwriteIfKeyExists(), equalTo(true)); } @Nested diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java index 7fce6ecbe5..123f1f44bc 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java @@ -52,6 +52,7 @@ void setup() { when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); + when(processorConfig.getOverwriteIfKeyExists()).thenReturn(true); } private ParseJsonProcessor createObjectUnderTest() { @@ -94,6 +95,42 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() { assertThatKeyEquals(parsedEvent, "key", "value"); } + @Test + void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() { + final String source = "root_source"; + when(processorConfig.getSource()).thenReturn(source); + when(processorConfig.getOverwriteIfKeyExists()).thenReturn(false); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + + final Map data = Map.ofEntries( + entry(source,"value_that_will_overwrite_source"), + entry("key","value") + ); + + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_overwrite_source\", \"key\":\"value\"}"); + assertThatKeyEquals(parsedEvent, "key", "value"); + } + + @Test + void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields() { + final String source = "root_source"; + when(processorConfig.getSource()).thenReturn(source); + when(processorConfig.getDestination()).thenReturn(source); // write back to source + when(processorConfig.getOverwriteIfKeyExists()).thenReturn(false); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + + final Map data = Map.of("key","value"); + + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}"); + assertThat(parsedEvent.containsKey("key"), equalTo(false)); + } + @Test void test_when_valueIsEmpty_then_notParsed() { parseJsonProcessor = createObjectUnderTest();