Skip to content

Commit

Permalink
Introducing delete input configuration option for some parsers (#4702)
Browse files Browse the repository at this point in the history
* Introduced delete_source configuration option to give flexibility for the user to drop the raw source record if they don't want to propagate it downstream

Signed-off-by: Santhosh Gandhe <[email protected]>

* addressing review comments

Signed-off-by: Santhosh Gandhe <[email protected]>

* added delete_source option to other similar parser classes

Signed-off-by: Santhosh Gandhe <[email protected]>

---------

Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 authored Jul 4, 2024
1 parent 3e8513b commit 253e592
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -50,6 +51,7 @@ protected AbstractParseProcessor(PluginMetrics pluginMetrics,
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -93,6 +95,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedValue);
}

if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public interface CommonParseConfig {
* An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination.
* There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing
* key in the Event then the absolute path of the target key will be placed into destination
*
* Note: (should this be configurable/what about double conflicts?)
* @return String representing JSON Pointer
*/
Expand All @@ -54,4 +53,10 @@ public interface CommonParseConfig {
* Defaults to true.
*/
boolean getOverwriteIfDestinationExists();

/**
* An optional setting used to request dropping the original raw message after successfully parsing the input event.
* Defaults to false.
*/
boolean isDeleteSourceRequested();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ParseIonProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -68,6 +71,11 @@ boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ParseJsonProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -63,11 +66,16 @@ public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ParseXmlProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -65,6 +68,11 @@ boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseIonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseIonProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ void test_when_using_ion_features_then_processorParsesCorrectly() {
final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true));
assertThat(parsedEvent.get(processorConfig.getSource(), Object.class), equalTo(serializedMessage));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);

final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false));
}

@Nested
Expand Down Expand Up @@ -57,6 +58,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseJsonProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() {
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);

final String key = "key";
final ArrayList<String> value = new ArrayList<>(List.of("Element0","Element1","Element2"));
final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}";
final Event parsedEvent = createAndParseMessageEvent(jsonArray);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value));
final String pointerToFirstElement = key + "/0";
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() {
parseJsonProcessor = createObjectUnderTest();
Expand Down Expand Up @@ -373,23 +389,21 @@ private String constructDeeplyNestedJsonPointer(final int numberOfLayers) {

/**
* Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation).
* @param messageMap
* @return
* @param messageMap source key value map
* @return serialized string representation of the map
*/
private String convertMapToJSONString(final Map<String, Object> messageMap) {
final String replaceEquals = messageMap.toString().replace("=",":");
final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes
return addQuotes;
return replaceEquals.replaceAll("(\\w+)", "\"$1\"");
}

/**
* Creates a Map that maps a single key to a value nested numberOfLayers layers deep.
* @param numberOfLayers
* @return
* @param numberOfLayers indicates the depth of layers count
* @return a Map representing the nested structure
*/
private Map<String, Object> constructArbitrarilyDeepJsonMap(final int numberOfLayers) {
final Map<String, Object> result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
return result;
return Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
}

private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseXmlProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ void test_when_using_xml_features_then_processorParsesCorrectly() {
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_deleteSourceFlagEnabled() {

final String tagOnFailure = UUID.randomUUID().toString();
when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure));
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);

parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "<Person><name>John Doe</name><age>30</age></Person>";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);
assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get("name", String.class), equalTo("John Doe"));
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_using_invalid_xml_tags_correctly() {

Expand Down

0 comments on commit 253e592

Please sign in to comment.