Skip to content

Commit

Permalink
Add support for Update/Upsert/Delete operations in OpenSearch Sink (o…
Browse files Browse the repository at this point in the history
…pensearch-project#3424)

* Add support for Update/Upsert/Delete operations in OpenSearch Sink

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed tests and removed unused imports

Signed-off-by: Krishna Kondaka <[email protected]>

* Updated documentation

Signed-off-by: Krishna Kondaka <[email protected]>

* Added test cases to improve code coverage

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed check style errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Added another test for upsert action without prior create action

Signed-off-by: Krishna Kondaka <[email protected]>

* Added check for valid action strings at config time

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Oct 6, 2023
1 parent b0d253c commit 36b0b9c
Show file tree
Hide file tree
Showing 19 changed files with 831 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,34 @@ public <T> List<T> getTypedList(final String attribute, final Class<T> type) {
return (List<T>) object;
}

/**
* Returns the value of the specified {@literal List<Map<String, String>>}, or {@code defaultValue} if this settings contains no value for
* the attribute.
*
* @param keyType key type of the Map
* @param valueType value type stored in the Map
* @param <K> The key type
* @param <V> The value type
* @return the value of the specified attribute, or {@code defaultValue} if this settings contains no value for
* the attribute
*/
public <K, V> List<Map<K, V>> getTypedListOfMaps(final String attribute, final Class<K> keyType, final Class<V> valueType) {
Object object = getAttributeOrDefault(attribute, null);
if (object == null) {
return null;
}

checkObjectType(attribute, object, List.class);

for (final Map<K, V> listItem: (List<Map<K, V>>) object) {
((Map<?, ?>) listItem).forEach((key, value) -> {
checkObjectType(attribute, key, keyType);
checkObjectType(attribute, value, valueType);
});
}
return (List<Map<K, V>>) object;
}

/**
* Returns the value of the specified {@literal Map<String, String> object}, or {@code defaultValue} if this settings contains no value for
* the attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import com.fasterxml.jackson.databind.JsonNode;

import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -72,6 +73,14 @@ public interface Event extends Serializable {
*/
String toJsonString();

/**
* Returns the JsonNode containing the internal representation of the event
*
* @return JsonNode
* @since 2.5
*/
JsonNode getJsonNode();

/**
* Gets a serialized Json string of the specific key in the Event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ private JsonNode getInitialJsonNode(final Object data) {
return mapper.valueToTree(data);
}

protected JsonNode getJsonNode() {
@Override
public JsonNode getJsonNode() {
return jsonNode;
}

Expand Down Expand Up @@ -319,6 +320,28 @@ public String formatString(final String format, final ExpressionEvaluator expres
return formatStringInternal(format, expressionEvaluator);
}

public static boolean isValidFormatExpressions(final String format, final ExpressionEvaluator expressionEvaluator) {
if (Objects.isNull(expressionEvaluator)) {
return false;
}
int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return false;
}
String name = format.substring(position + 2, endPosition);

Object val;
if (!expressionEvaluator.isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
}
return true;
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PluginSettingsTests {
private static final String TEST_INT_ATTRIBUTE = "int-attribute";
private static final String TEST_STRING_ATTRIBUTE = "string-attribute";
private static final String TEST_STRINGLIST_ATTRIBUTE = "list-attribute";
private static final String TEST_LIST_OF_MAPS_ATTRIBUTE = "map-list-attribute";
private static final String TEST_STRINGMAP_ATTRIBUTE = "map-attribute";
private static final String TEST_STRINGLISTMAP_ATTRIBUTE = "list-map-attribute";
private static final String TEST_BOOL_ATTRIBUTE = "bool-attribute";
Expand Down Expand Up @@ -148,6 +149,16 @@ public void testGetTypedMap() {
assertThat(pluginSetting.getTypedMap(TEST_STRINGMAP_ATTRIBUTE, String.class, String.class), is(equalTo(TEST_STRINGMAP_VALUE)));
}

@Test
public void testGetTypedListOfMaps() {
final Map<String, String> TEST_SETTINGS_MAP = ImmutableMap.of(TEST_STRING_ATTRIBUTE, TEST_STRING_VALUE);
final List<Map<String, String>> TEST_SETTINGS_LIST = List.of(TEST_SETTINGS_MAP);
final Map<String, Object> TEST_SETTINGS = ImmutableMap.of(TEST_LIST_OF_MAPS_ATTRIBUTE, TEST_SETTINGS_LIST);
final PluginSetting pluginSetting = new PluginSetting(TEST_PLUGIN_NAME, TEST_SETTINGS);

assertThat(pluginSetting.getTypedListOfMaps(TEST_LIST_OF_MAPS_ATTRIBUTE, String.class, String.class), is(equalTo(List.of(TEST_SETTINGS_MAP))));
}

@Test
public void testGetTypedListMap() {
final Map<String, Object> TEST_SETTINGS = ImmutableMap.of(TEST_STRINGLISTMAP_ATTRIBUTE, TEST_STRINGLISTMAP_VALUE);
Expand Down Expand Up @@ -270,6 +281,20 @@ public void testGetTypedListMap_AsNull() {
assertThat(pluginSetting.getTypedListMap(TEST_STRINGLISTMAP_NULL_ATTRIBUTE, String.class, String.class), nullValue());
}

/**
* Request attributes are present with null values, expect nulls to be returned
*/
@Test
public void testGetTypedListOfMaps_AsNull() {
final String TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE = "typedlistofmaps-null-attribute";
final Map<String, Object> TEST_SETTINGS_AS_NULL = new HashMap<>();

TEST_SETTINGS_AS_NULL.put(TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE, null);
final PluginSetting pluginSetting = new PluginSetting(TEST_PLUGIN_NAME, TEST_SETTINGS_AS_NULL);

assertThat(pluginSetting.getTypedListOfMaps(TEST_STRINGLISTOFMAPS_NULL_ATTRIBUTE, String.class, String.class), nullValue());
}

/**
* Request attributes are present with null values, expect nulls to be returned
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,25 @@ public void testBuild_withFormatString(String formattedString, String finalStrin
assertThat(event.formatString(formattedString), is(equalTo(finalString)));
}

@ParameterizedTest
@CsvSource({
"abc-${/foo, false",
"abc-${/foo}, true",
"abc-${getMetadata(\"key\")}, true",
"abc-${getXYZ(\"key\")}, false"
})
public void testBuild_withIsValidFormatExpressions(final String format, final Boolean expectedResult) {
final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement("/foo")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"key\")")).thenReturn(true);
assertThat(JacksonEvent.isValidFormatExpressions(format, expressionEvaluator), equalTo(expectedResult));
}

@Test
public void testBuild_withIsValidFormatExpressionsWithNullEvaluator() {
assertThat(JacksonEvent.isValidFormatExpressions("${}", null), equalTo(false));
}

@Test
public void testBuild_withFormatStringWithExpressionEvaluator() {

Expand Down
22 changes: 22 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,28 @@ e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-projec
- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.

- `action`(optional): A string indicating the type of action to be performed. Supported values are "create", "update", "upsert", "delete" and "index". Default value is "index". It also be an expression which evaluates to one of the supported values mentioned earlier.

- `actions`(optional): This is an alternative to `action`. `actions` can have multiple actions, each with a condition. The first action for which the condition evaluates to true is picked as the action for an event. The action must be one of the supported values mentioned under `action` field above. Just like in case of `action`, the `type` mentioned in `actions` can be an expression which evaluates to one of the supported values. For example, the following configuration shows different action types for different conditions.

```
sink:
- opensearch
actions:
- type: "create"
when: "/some_key == CREATE"
- type: "index"
when: "/some_key == INDEX"
- type: "upsert"
when: "/some_key == UPSERT"
- type: "update"
when: "/some_key == UPDATE"
- type: "delete"
when: "/some_key == DELETE"
# default case
- type: "index"
```

- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown.

- `max_retries`(optional): A number indicating the maximum number of times OpenSearch Sink should try to push the data to the OpenSearch server before considering it as failure. Defaults to `Integer.MAX_VALUE`.
Expand Down
Loading

0 comments on commit 36b0b9c

Please sign in to comment.