diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index a2b984d070..878316c183 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -36,6 +36,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor tagsOnFailure; private final boolean overwriteIfDestinationExists; + private final boolean deleteSourceRequested; private final ExpressionEvaluator expressionEvaluator; @@ -50,6 +51,7 @@ protected AbstractParseProcessor(PluginMetrics pluginMetrics, parseWhen = commonParseConfig.getParseWhen(); tagsOnFailure = commonParseConfig.getTagsOnFailure(); overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists(); + deleteSourceRequested = commonParseConfig.isDeleteSourceRequested(); this.expressionEvaluator = expressionEvaluator; } @@ -93,6 +95,10 @@ public Collection> doExecute(final Collection> recor } else if (overwriteIfDestinationExists || !event.containsKey(destination)) { event.put(destination, parsedValue); } + + if(deleteSourceRequested) { + event.delete(this.source); + } } catch (Exception e) { LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java index 193631bea9..5fd5050b3d 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java @@ -27,7 +27,6 @@ public interface CommonParseConfig { * An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination. * There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing * key in the Event then the absolute path of the target key will be placed into destination - * * Note: (should this be configurable/what about double conflicts?) * @return String representing JSON Pointer */ @@ -54,4 +53,10 @@ public interface CommonParseConfig { * Defaults to true. */ boolean getOverwriteIfDestinationExists(); + + /** + * An optional setting used to request dropping the original raw message after successfully parsing the input event. + * Defaults to false. + */ + boolean isDeleteSourceRequested(); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java index 67a2f464ad..fcc2950477 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -35,6 +35,9 @@ public class ParseIonProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -68,6 +71,11 @@ boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); + } + + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index e0a2e91c1d..49ff2a5969 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -35,6 +35,9 @@ public class ParseJsonProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -63,11 +66,16 @@ public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java index df4fabc397..c90173dc43 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -30,6 +30,9 @@ public class ParseXmlProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -65,6 +68,11 @@ boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); + } + + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java index 0fb274ba13..8c47650c05 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java @@ -57,6 +57,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseIonProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseIonProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index 62873866d7..c9a8fdf4e5 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -47,6 +47,23 @@ void test_when_using_ion_features_then_processorParsesCorrectly() { final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + assertThat(parsedEvent.get(processorConfig.getSource(), Object.class), equalTo(serializedMessage)); + assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1)); + assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); + assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); + assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + } + + @Test + void test_when_deleteSourceFlagEnabled() { + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator); + + final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1)); assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java index 459fab6ea5..aa138a0e7e 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false)); } @Nested @@ -57,6 +58,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseJsonProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index 4594cbe2f5..1416d6cf35 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -194,6 +194,22 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); } + @Test + void test_when_deleteSourceFlagEnabled() { + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator); + + final String key = "key"; + final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); + final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}"; + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); + assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); + final String pointerToFirstElement = key + "/0"; + assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + } + @Test void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() { parseJsonProcessor = createObjectUnderTest(); @@ -373,23 +389,21 @@ private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { /** * Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation). - * @param messageMap - * @return + * @param messageMap source key value map + * @return serialized string representation of the map */ private String convertMapToJSONString(final Map messageMap) { final String replaceEquals = messageMap.toString().replace("=",":"); - final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes - return addQuotes; + return replaceEquals.replaceAll("(\\w+)", "\"$1\""); } /** * Creates a Map that maps a single key to a value nested numberOfLayers layers deep. - * @param numberOfLayers - * @return + * @param numberOfLayers indicates the depth of layers count + * @return a Map representing the nested structure */ private Map constructArbitrarilyDeepJsonMap(final int numberOfLayers) { - final Map result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers)); - return result; + return Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers)); } private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java index d5e7e1ec43..bab6d6e919 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java @@ -52,6 +52,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseXmlProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 51de35ca70..8d9bc4cde3 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -60,6 +60,22 @@ void test_when_using_xml_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("age", String.class), equalTo("30")); } + @Test + void test_when_deleteSourceFlagEnabled() { + + final String tagOnFailure = UUID.randomUUID().toString(); + when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + + parseXmlProcessor = createObjectUnderTest(); + + final String serializedMessage = "John Doe30"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); + assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); + assertThat(parsedEvent.get("age", String.class), equalTo("30")); + } + @Test void test_when_using_invalid_xml_tags_correctly() {