diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java index d772ff21c6..133e0beb0e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java @@ -14,8 +14,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Represents an extension of the {@link PluginModel} which is specific to Sink @@ -28,8 +30,8 @@ @JsonDeserialize(using = SinkModel.SinkModelDeserializer.class) public class SinkModel extends PluginModel { - SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final Map pluginSettings) { - this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings)); + SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { + this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings)); } private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) { @@ -46,6 +48,15 @@ public Collection getRoutes() { return this.getInternalJsonModel().routes; } + public List getIncludeKeys() { + return this.getInternalJsonModel().includeKeys; + } + + public List getExcludeKeys() { + return this.getInternalJsonModel().excludeKeys; + } + + /** * Gets the tags target key associated with this Sink. * @@ -62,14 +73,19 @@ public static class SinkModelBuilder { private final List routes; private final String tagsTargetKey; + private final List includeKeys; + private final List excludeKeys; + private SinkModelBuilder(final PluginModel pluginModel) { this.pluginModel = pluginModel; this.routes = Collections.emptyList(); this.tagsTargetKey = null; + this.includeKeys = Collections.emptyList(); + this.excludeKeys = Collections.emptyList(); } public SinkModel build() { - return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings()); + return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings()); } } @@ -86,23 +102,53 @@ private static class SinkInternalJsonModel extends InternalJsonModel { @JsonProperty("tags_target_key") private final String tagsTargetKey; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty("include_keys") + private final List includeKeys; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty("exclude_keys") + private final List excludeKeys; + @JsonCreator - private SinkInternalJsonModel(@JsonProperty("routes") final List routes, @JsonProperty("tags_target_key") final String tagsTargetKey) { - super(); - this.routes = routes != null ? routes : new ArrayList<>(); - this.tagsTargetKey = tagsTargetKey; + private SinkInternalJsonModel(@JsonProperty("routes") final List routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List includeKeys, @JsonProperty("exclude_keys") final List excludeKeys) { + this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>()); } - private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final Map pluginSettings) { + private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { super(pluginSettings); - this.routes = routes != null ? routes : new ArrayList<>(); + this.routes = routes != null ? routes : Collections.emptyList(); + this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList(); + this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList(); this.tagsTargetKey = tagsTargetKey; } + + + /** + * Pre-processes a list of Keys and returns a sorted list + * The keys must start with `/` and not end with `/` + * + * @param keys a list of raw keys + * @return a sorted processed keys + */ + private List preprocessingKeys(final List keys) { + if (keys.contains("/")) { + return new ArrayList<>(); + } + List result = keys.stream() + .map(k -> k.startsWith("/") ? k : "/" + k) + .map(k -> k.endsWith("/") ? k.substring(0, k.length() - 1) : k) + .collect(Collectors.toList()); + Collections.sort(result); + return result; + } } + static class SinkModelDeserializer extends AbstractPluginModelDeserializer { SinkModelDeserializer() { - super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null)); + super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null)); } } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index 78c15f9122..db97e1a327 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -20,6 +20,7 @@ * {@link org.opensearch.dataprepper.model.sink.Sink} and {@link org.opensearch.dataprepper.model.source.Source} will be extended to support * the new internal model. The use of {@link org.opensearch.dataprepper.model.record.Record}s will be deprecated in 2.0. *

+ * * @since 1.2 */ public interface Event extends Serializable { @@ -27,7 +28,7 @@ public interface Event extends Serializable { /** * Adds or updates the key with a given value in the Event * - * @param key where the value will be set + * @param key where the value will be set * @param value value to set the key to * @since 1.2 */ @@ -36,9 +37,9 @@ public interface Event extends Serializable { /** * Retrieves the given key from the Event * - * @param key the value to retrieve from + * @param key the value to retrieve from * @param clazz the return type of the value - * @param The type + * @param The type * @return T a clazz object from the key * @since 1.2 */ @@ -47,9 +48,9 @@ public interface Event extends Serializable { /** * Retrieves the given key from the Event as a List * - * @param key the value to retrieve from + * @param key the value to retrieve from * @param clazz the return type of elements in the list - * @param The type + * @param The type * @return {@literal List} a list of clazz elements * @since 1.2 */ @@ -57,6 +58,7 @@ public interface Event extends Serializable { /** * Deletes the given key from the Event + * * @param key the field to be deleted * @since 1.2 */ @@ -64,6 +66,7 @@ public interface Event extends Serializable { /** * Generates a serialized Json string of the entire Event + * * @return Json string of the event * @since 1.2 */ @@ -71,6 +74,7 @@ public interface Event extends Serializable { /** * Gets a serialized Json string of the specific key in the Event + * * @param key the field to be returned * @return Json string of the field * @since 2.2 @@ -79,6 +83,7 @@ public interface Event extends Serializable { /** * Retrieves the EventMetadata + * * @return EventMetadata for the event * @since 1.2 */ @@ -86,6 +91,7 @@ public interface Event extends Serializable { /** * Checks if the key exists. + * * @param key name of the key to look for * @return returns true if the key exists, otherwise false * @since 1.2 @@ -94,6 +100,7 @@ public interface Event extends Serializable { /** * Checks if the value stored for the key is list + * * @param key name of the key to look for * @return returns true if the key is a list, otherwise false * @since 1.2 @@ -108,6 +115,7 @@ public interface Event extends Serializable { /** * Returns formatted parts of the input string replaced by their values in the event + * * @param format input format * @return returns a string with no formatted parts, returns null if no value is found * @throws RuntimeException if the input string is not properly formatted @@ -135,9 +143,15 @@ public interface Event extends Serializable { JsonStringBuilder jsonBuilder(); - public abstract class JsonStringBuilder { + abstract class JsonStringBuilder { private String tagsKey; + private String rootKey; + + private List includeKeys; + + private List excludeKeys; + /** * @param key key to be used for tags * @return JsonStringString with tags included @@ -148,6 +162,36 @@ public JsonStringBuilder includeTags(String key) { return this; } + /** + * @param rootKey key to be used for tags + * @return JsonStringString with tags included + * @since 2.4 + */ + public JsonStringBuilder rootKey(String rootKey) { + this.rootKey = rootKey; + return this; + } + + /** + * @param includeKeys A list of keys to be retained + * @return JsonStringString with retained keys only + * @since 2.4 + */ + public JsonStringBuilder includeKeys(List includeKeys) { + this.includeKeys = includeKeys; + return this; + } + + /** + * @param excludeKeys A list of keys to be excluded + * @return JsonStringString without excluded keys + * @since 2.4 + */ + public JsonStringBuilder excludeKeys(List excludeKeys) { + this.excludeKeys = excludeKeys; + return this; + } + /** * @return key used for tags * @since 2.3 @@ -156,6 +200,30 @@ public String getTagsKey() { return tagsKey; } + /** + * @return root key + * @since 2.4 + */ + public String getRootKey() { + return rootKey; + } + + /** + * @return a list of keys to be retrained. + * @since 2.4 + */ + public List getIncludeKeys() { + return includeKeys; + } + + /** + * @return a list of keys to be excluded + * @since 2.4 + */ + public List getExcludeKeys() { + return excludeKeys; + } + /** * @return json string * @since 2.3 diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 084ca3bc15..d0ce16de14 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.StringJoiner; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -538,21 +540,109 @@ public JacksonEvent build() { } public class JsonStringBuilder extends Event.JsonStringBuilder { - private Event event; - private JsonStringBuilder(final Event event) { + private final boolean RETAIN_ALL = true; + + private final boolean EXCLUDE_ALL = false; + + + private final JacksonEvent event; + + private JsonStringBuilder(final JacksonEvent event) { checkNotNull(event, "event cannot be null"); this.event = event; } + private JsonNode getBaseNode() { + // Get root node. + if (getRootKey() != null && !getRootKey().isEmpty() && event.containsKey(getRootKey())) { + return event.getNode(getRootKey()); + } + return event.getJsonNode(); + } + + public String toJsonString() { - final String jsonString = event.toJsonString().trim(); + + String jsonString; + if (getIncludeKeys() != null && !getIncludeKeys().isEmpty()) { + jsonString = searchAndFilter(getBaseNode(), "", getIncludeKeys(), RETAIN_ALL); + } else if (getExcludeKeys() != null && !getExcludeKeys().isEmpty()) { + jsonString = searchAndFilter(getBaseNode(), "", getExcludeKeys(), EXCLUDE_ALL); + } else if (getBaseNode() !=event.getJsonNode()) { + jsonString = event.getAsJsonString(getRootKey()); + } else { + // Some successors have its own implementation of toJsonString, such as JacksonSpan. + // In such case, it's only used when the root key is not provided. + // TODO: Need to check if such behaviour is expected. + jsonString = event.toJsonString(); + } + final String tagsKey = getTagsKey(); - if(tagsKey != null) { + if (tagsKey != null) { final JsonNode tagsNode = mapper.valueToTree(event.getMetadata().getTags()); - return jsonString.substring(0, jsonString.length()-1) + ",\""+tagsKey+"\":" + tagsNode.toString()+"}"; + return jsonString.substring(0, jsonString.length() - 1) + ",\"" + tagsKey + "\":" + tagsNode.toString() + "}"; } return jsonString; } + + /** + * Perform DFS(Depth-first search) like traversing using recursion on the Json Tree and return the json string. + * This supports filtering (to include or exclude) from a list of keys. + * + * @param node Root node to start traversing + * @param path Json path, e.g. /foo/bar + * @param filterKeys A list of filtered keys + * @param filterAction Either to include (RETAIN_ALL or true) or to exclude (EXCLUDE_ALL or false) + * @return a json string with filtered keys + */ + String searchAndFilter(JsonNode node, String path, final List filterKeys, boolean filterAction) { + + if (node.isArray()) { // for array node. + StringJoiner sj = new StringJoiner(",", "[", "]"); + node.forEach(childNode -> sj.add(searchAndFilter(childNode, path, filterKeys, filterAction))); + return sj.toString(); + } else { + StringJoiner sj = new StringJoiner(",", "{", "}"); + List valueList = new ArrayList<>(); + + node.properties().forEach(entry -> { + String keyPath = path + SEPARATOR + entry.getKey(); + // Track whether the key is found in the filter list. + // Different behaviours between include and exclude action. + boolean found = false; + for (String key : filterKeys) { + if (keyPath.equals(key)) { + found = true; + // To keep the order. + if (filterAction == RETAIN_ALL) { + valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString()); + } + break; + } else if (key.startsWith(keyPath)) { + found = true; + valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction)); + break; + } + if (key.compareTo(keyPath) > 0) { + // To save the comparing. + // This requires the filter keys to be sorted first. + // This is done in SinkModel. + break; + } + } + + if (!found && filterAction == EXCLUDE_ALL) { + valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString()); + } + }); + + valueList.forEach(value -> sj.add(value)); + return sj.toString(); + + } + } + + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java index 9650411bd8..85a8f49ea0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import java.util.Collection; +import java.util.List; /** * Data Prepper Sink Context class. This the class for keeping global @@ -15,13 +16,24 @@ public class SinkContext { private final String tagsTargetKey; private final Collection routes; - public SinkContext(final String tagsTargetKey, final Collection routes) { + private final List includeKeys; + private final List excludeKeys; + + + public SinkContext(String tagsTargetKey, Collection routes, List includeKeys, List excludeKeys) { this.tagsTargetKey = tagsTargetKey; this.routes = routes; + this.includeKeys = includeKeys; + this.excludeKeys = excludeKeys; } - + + public SinkContext(String tagsTargetKey) { + this(tagsTargetKey, null, null, null); + } + /** * returns the target key name for tags if configured for a given sink + * * @return tags target key */ public String getTagsTargetKey() { @@ -30,10 +42,19 @@ public String getTagsTargetKey() { /** * returns routes if configured for a given sink + * * @return routes */ public Collection getRoutes() { return routes; } + + public List getIncludeKeys() { + return includeKeys; + } + + public List getExcludeKeys() { + return excludeKeys; + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java index 162b7c8de3..a177158201 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java @@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an final PluginModel source = new PluginModel("testSource", (Map) null); final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel)); @@ -72,7 +72,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing final DataPrepperVersion version = DataPrepperVersion.parse("2.0"); final PluginModel source = new PluginModel("testSource", (Map) null); final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel)); @@ -93,7 +93,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an final PluginModel source = new PluginModel("testSource", (Map) null); final List preppers = Collections.singletonList(new PluginModel("testPrepper", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, Collections.emptyList(), Collections.emptyList(), null)); final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel)); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java index bfe5ad3e73..5a09a6c4c1 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java @@ -17,7 +17,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -76,7 +78,7 @@ void serialize_into_known_SinkModel() throws IOException { pluginSettings.put("key1", "value1"); pluginSettings.put("key2", "value2"); final String tagsTargetKey = "tags"; - final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, pluginSettings); + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, Collections.emptyList(), Collections.emptyList(), pluginSettings); final String actualJson = objectMapper.writeValueAsString(sinkModel); @@ -84,8 +86,11 @@ void serialize_into_known_SinkModel() throws IOException { assertThat("---\n" + actualJson, equalTo(expectedJson)); assertThat(sinkModel.getTagsTargetKey(), equalTo(tagsTargetKey)); + } + + @Test void deserialize_with_any_pluginModel() throws IOException { final InputStream inputStream = this.getClass().getResourceAsStream("/serialized_with_plugin_settings.yaml"); @@ -127,7 +132,7 @@ void serialize_with_just_pluginModel() throws IOException { pluginSettings.put("key1", "value1"); pluginSettings.put("key2", "value2"); pluginSettings.put("key3", "value3"); - final SinkModel sinkModel = new SinkModel("customPlugin", null, null, pluginSettings); + final SinkModel sinkModel = new SinkModel("customPlugin", null, null, Collections.emptyList(), Collections.emptyList(), pluginSettings); final String actualJson = objectMapper.writeValueAsString(sinkModel); @@ -136,6 +141,16 @@ void serialize_with_just_pluginModel() throws IOException { assertThat("---\n" + actualJson, equalTo(expectedJson)); } + @Test + void sinkModel_with_include_keys() throws IOException { + final Map pluginSettings = new LinkedHashMap<>(); + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings); + + assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList())); + assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg"))); + + } + @Nested class BuilderTest { private PluginModel pluginModel; @@ -160,6 +175,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() { assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings)); assertThat(actualSinkModel.getRoutes(), notNullValue()); assertThat(actualSinkModel.getRoutes(), empty()); + assertThat(actualSinkModel.getIncludeKeys(), notNullValue()); + assertThat(actualSinkModel.getIncludeKeys(), empty()); + assertThat(actualSinkModel.getExcludeKeys(), notNullValue()); + assertThat(actualSinkModel.getExcludeKeys(), empty()); + assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 8e5c70161c..9d75a2253f 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -36,11 +36,12 @@ import static org.opensearch.dataprepper.test.matcher.MapEquals.isEqualWithoutTimestamp; public class JacksonEventTest { - + class TestEventHandle implements EventHandle { @Override - public void release(boolean result) {} - }; + public void release(boolean result) { + } + } private Event event; @@ -183,7 +184,7 @@ public void testGetList_withIncorrectPojo() { final String nestedValue = UUID.randomUUID().toString(); final TestObject value = new TestObject(nestedValue); - event.put(key, Arrays.asList(value)); + event.put(key, List.of(value)); assertThrows(RuntimeException.class, () -> event.getList(key, UUID.class)); } @@ -676,9 +677,104 @@ void testJsonStringBuilder() { eventMetadata.addTags(List.of("tag1", "tag2")); final String expectedJsonString = "{\"foo\":\"bar\",\"tags\":[\"tag1\",\"tag2\"]}"; assertThat(event.jsonBuilder().includeTags("tags").toJsonString(), equalTo(expectedJsonString)); + assertThat(event.jsonBuilder().rootKey("foo").toJsonString(), equalTo("\"bar\"")); assertThat(event.jsonBuilder().toJsonString(), equalTo(jsonString)); } + @Test + void testJsonStringBuilderWithIncludeKeys() { + final String jsonString = "{\"id\":1,\"foo\":\"bar\",\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + event = JacksonEvent.builder() + .withEventType(eventType) + .withData(jsonString) + .getThis() + .build(); + + // Include Keys must start with / and also ordered, This is pre-processed in SinkModel + List includeKeys1 = Arrays.asList("/foo", "/info"); + final String expectedJsonString1 = "{\"foo\":\"bar\",\"info\":{\"name\":\"hello\",\"foo\":\"bar\"}}"; + assertThat(event.jsonBuilder().rootKey(null).includeKeys(includeKeys1).toJsonString(), equalTo(expectedJsonString1)); + + // Test child node + List includeKeys2 = Arrays.asList("/foo", "/info/name"); + final String expectedJsonString2 = "{\"foo\":\"bar\",\"info\":{\"name\":\"hello\"}}"; + assertThat(event.jsonBuilder().includeKeys(includeKeys2).toJsonString(), equalTo(expectedJsonString2)); + + // Test array node. + List includeKeys3 = Arrays.asList("/foo", "/tags/key"); + final String expectedJsonString3 = "{\"foo\":\"bar\",\"tags\":[{\"key\":\"a\"},{\"key\":\"c\"}]}"; + assertThat(event.jsonBuilder().includeKeys(includeKeys3).toJsonString(), equalTo(expectedJsonString3)); + + // Test some keys not found + List includeKeys4 = Arrays.asList("/foo", "/info/age"); + final String expectedJsonString4 = "{\"foo\":\"bar\",\"info\":{}}"; + assertThat(event.jsonBuilder().includeKeys(includeKeys4).toJsonString(), equalTo(expectedJsonString4)); + + // Test all keys not found + List includeKeys5 = List.of("/hello"); + final String expectedJsonString5 = "{}"; + assertThat(event.jsonBuilder().includeKeys(includeKeys5).toJsonString(), equalTo(expectedJsonString5)); + + // Test working with root node + List includeKeys6 = List.of("/name"); + final String expectedJsonString6 = "{\"name\":\"hello\"}"; + assertThat(event.jsonBuilder().rootKey("info").includeKeys(includeKeys6).toJsonString(), equalTo(expectedJsonString6)); + + // Test working with unknown root node + List includeKeys7 = List.of("/name"); + final String expectedJsonString7 = "{}"; + assertThat(event.jsonBuilder().rootKey("hello").includeKeys(includeKeys7).toJsonString(), equalTo(expectedJsonString7)); + + + } + + @Test + void testJsonStringBuilderWithExcludeKeys() { + final String jsonString = "{\"id\":1,\"foo\":\"bar\",\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + event = JacksonEvent.builder() + .withEventType(eventType) + .withData(jsonString) + .getThis() + .build(); + + // Include Keys must start with / and also ordered, This is pre-processed in SinkModel + List excludeKeys1 = Arrays.asList("/foo", "/info"); + final String expectedJsonString1 = "{\"id\":1,\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + assertThat(event.jsonBuilder().rootKey(null).excludeKeys(excludeKeys1).toJsonString(), equalTo(expectedJsonString1)); + + // Test child node + List excludeKeys2 = Arrays.asList("/foo", "/info/name"); + final String expectedJsonString2 = "{\"id\":1,\"info\":{\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + assertThat(event.jsonBuilder().excludeKeys(excludeKeys2).toJsonString(), equalTo(expectedJsonString2)); + + // Test array node. + List excludeKeys3 = Arrays.asList("/foo", "/tags/key"); + final String expectedJsonString3 = "{\"id\":1,\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"value\":\"b\"},{\"value\":\"d\"}]}"; + assertThat(event.jsonBuilder().excludeKeys(excludeKeys3).toJsonString(), equalTo(expectedJsonString3)); + + // Test some keys not found + List excludeKeys4 = Arrays.asList("/foo", "/info/age"); + final String expectedJsonString4 = "{\"id\":1,\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + assertThat(event.jsonBuilder().excludeKeys(excludeKeys4).toJsonString(), equalTo(expectedJsonString4)); + + // Test all keys not found + List excludeKeys5 = List.of("/hello"); + final String expectedJsonString5 = "{\"id\":1,\"foo\":\"bar\",\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}"; + assertThat(event.jsonBuilder().excludeKeys(excludeKeys5).toJsonString(), equalTo(expectedJsonString5)); + + // Test working with root node + List excludeKeys6 = List.of("/name"); + final String expectedJsonString6 = "{\"foo\":\"bar\"}"; + assertThat(event.jsonBuilder().rootKey("info").excludeKeys(excludeKeys6).toJsonString(), equalTo(expectedJsonString6)); + + // Test working with unknown root node + List includeKeys7 = List.of("/name"); + final String expectedJsonString7 = "{}"; + assertThat(event.jsonBuilder().rootKey("hello").includeKeys(includeKeys7).toJsonString(), equalTo(expectedJsonString7)); + + } + + private static Map createComplexDataMap() { final Map dataObject = new HashMap<>(); final int fullDepth = 6; diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java index 404c3bbbf5..cb6fe54e02 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java @@ -5,15 +5,14 @@ package org.opensearch.dataprepper.model.sink; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import org.apache.commons.lang3.RandomStringUtils; - +import static org.hamcrest.Matchers.equalTo; public class SinkContextTest { @@ -23,11 +22,26 @@ public class SinkContextTest { public void testSinkContextBasic() { final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); final List testRoutes = Collections.emptyList(); - sinkContext = new SinkContext(testTagsTargetKey, testRoutes); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys); assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); assertThat(sinkContext.getRoutes(), equalTo(testRoutes)); - + assertThat(sinkContext.getIncludeKeys(), equalTo(testIncludeKeys)); + assertThat(sinkContext.getExcludeKeys(), equalTo(testExcludeKeys)); + } - + + @Test + public void testSinkContextWithTagsOnly() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + sinkContext = new SinkContext(testTagsTargetKey); + assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); + assertThat(sinkContext.getRoutes(), equalTo(null)); + assertThat(sinkContext.getIncludeKeys(), equalTo(null)); + assertThat(sinkContext.getExcludeKeys(), equalTo(null)); + + } + } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java index b35b05bdb5..5f0929a0fc 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java @@ -135,7 +135,7 @@ private static SinkContextPluginSetting getSinkContextPluginSettingFromSinkModel final Map settingsMap = Optional .ofNullable(sinkModel.getPluginSettings()) .orElseGet(HashMap::new); - return new SinkContextPluginSetting(sinkModel.getPluginName(), settingsMap, new SinkContext(sinkModel.getTagsTargetKey(), sinkModel.getRoutes())); + return new SinkContextPluginSetting(sinkModel.getPluginName(), settingsMap, new SinkContext(sinkModel.getTagsTargetKey(), sinkModel.getRoutes(), sinkModel.getIncludeKeys(), sinkModel.getExcludeKeys())); } private Integer getWorkersFromPipelineModel(final PipelineModel pipelineModel) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index dcfbcd1c35..1464e011e0 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -63,6 +63,7 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -130,7 +131,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS); this.awsCredentialsSupplier = awsCredentialsSupplier; - this.sinkContext = sinkContext; + this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); this.expressionEvaluator = expressionEvaluator; bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); @@ -343,7 +344,7 @@ private SerializedJson getDocument(final Event event) { String routing = (routingField != null) ? event.get(routingField, String.class) : null; - final String document = DocumentBuilder.build(event, documentRootKey, Objects.nonNull(sinkContext)?sinkContext.getTagsTargetKey():null); + final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); return SerializedJson.fromStringAndOptionals(document, docId, routing); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java index 81e484904b..928179ae5f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java @@ -2,16 +2,25 @@ import org.opensearch.dataprepper.model.event.Event; +import java.util.List; + public final class DocumentBuilder { - public static String build(final Event event, final String documentRootKey, final String tagsTargetKey) { - if (documentRootKey != null && event.containsKey(documentRootKey)) { - final String document = event.getAsJsonString(documentRootKey); - if (document == null || !document.startsWith("{")) { - return String.format("{\"data\": %s}", document); - } - return document; + public static String build(final Event event, final String documentRootKey, final String tagsTargetKey, final List includeKeys, final List excludeKeys) { + final String document = event.jsonBuilder() + .rootKey(documentRootKey) + .includeKeys(includeKeys) + .excludeKeys(excludeKeys) + .includeTags(tagsTargetKey) + .toJsonString(); + + if (document == null || !document.startsWith("{")) { + return String.format("{\"data\": %s}", document); } - return event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); + return document; + } + + public static String build(final Event event, final String documentRootKey, final String tagsTargetKey) { + return build(event, documentRootKey, tagsTargetKey, null, null); } }