Skip to content

Commit

Permalink
Add include_keys and exclude_keys to S3 sink.
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Aug 2, 2023
1 parent 9cef0d5 commit 0ab3010
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -122,6 +123,13 @@ private SinkInternalJsonModel(final List<String> routes, final String tagsTarget
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList();
this.tagsTargetKey = tagsTargetKey;
validateConfiguration();
}

void validateConfiguration() {
if (!includeKeys.isEmpty() && !excludeKeys.isEmpty()) {
throw new InvalidPluginConfigurationException("include_keys and exclude_keys cannot both exist in the configuration at the same time.");
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -21,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand All @@ -32,6 +34,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -90,7 +93,6 @@ void serialize_into_known_SinkModel() throws IOException {
}



@Test
void deserialize_with_any_pluginModel() throws IOException {
final InputStream inputStream = this.getClass().getResourceAsStream("/serialized_with_plugin_settings.yaml");
Expand Down Expand Up @@ -144,13 +146,29 @@ void serialize_with_just_pluginModel() throws IOException {
@Test
void sinkModel_with_include_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "/abc", "efg/"), null, pluginSettings);

assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));

}

@Test
void sinkModel_with_exclude_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);

assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));

}

@Test
void sinkModel_with_both_include_and_exclude_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings));
}

@Nested
class BuilderTest {
private PluginModel pluginModel;
Expand Down Expand Up @@ -181,6 +199,7 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getExcludeKeys(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());

}
}

Expand Down
59 changes: 59 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,65 @@ With the `document_root_key` set to `status`. The document structure would be `{
duration: "15 ms"
}
```
- `include_keys`: A list of keys to be included (retained). The key in the list can be a valid JSON path, such as 'request/status'. This option can work together with `document_root_key`.

For example, If we have the following sample event:
```
{
status: 200,
message: null,
metadata: {
sourceIp: "123.212.49.58",
destinationIp: "79.54.67.231",
bytes: 3545,
duration: "15 ms"
}
}
```
if `include_keys` is set to ["status", "metadata/sourceIp"], the document written to OpenSearch would be:
```
{
status: 200,
metadata: {
sourceIp: "123.212.49.58"
}
}
```
if you have also set `document_root_key` as "metadata", and the include_keys as ["sourceIp, "bytes"], the document written to OpenSearch would be:
```
{
sourceIp: "123.212.49.58",
bytes: 3545
}
```

- `exclude_keys`: Similar to include_keys except any keys in the list will be excluded. Note that you should not have both include_keys and exclude_keys in the configuration at the same time.

For example, If we have the following sample event:
```
{
status: 200,
message: null,
metadata: {
sourceIp: "123.212.49.58",
destinationIp: "79.54.67.231",
bytes: 3545,
duration: "15 ms"
}
}
```
if `exclude_keys` is set to ["status", "metadata/sourceIp"], the document written to OpenSearch would be:
```
{
message: null,
metadata: {
destinationIp: "79.54.67.231",
bytes: 3545,
duration: "15 ms"
}
}
```

- `distribution_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es6` represents Elasticsearch 6; `default` represents latest compatible backend version (Elasticsearch 7.x, OpenSearch 1.x, OpenSearch 2.x). Default to `default`.

### <a name="aws_configuration">AWS Configuration</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
Expand All @@ -25,10 +28,17 @@ public class NdjsonOutputCodec implements OutputCodec {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final NdjsonOutputConfig config;

private final List<String> includeKeys;
private final List<String> excludeKeys;


@DataPrepperPluginConstructor
public NdjsonOutputCodec(final NdjsonOutputConfig config) {
Objects.requireNonNull(config);
this.config = config;

this.includeKeys = preprocessingKeys(config.getIncludeKeys());
this.excludeKeys = preprocessingKeys(config.getExcludeKeys());
}

@Override
Expand All @@ -39,40 +49,39 @@ public void start(final OutputStream outputStream, Event event, String tagsTarge
@Override
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
Objects.requireNonNull(event);
Map<String, Object> eventMap;
if (tagsTargetKey != null) {
eventMap = addTagsToEvent(event, tagsTargetKey).toMap();
} else {
eventMap = event.toMap();
}
writeToOutputStream(outputStream, eventMap);
String jsonString = event.jsonBuilder().includeKeys(includeKeys).excludeKeys(excludeKeys).includeTags(tagsTargetKey).toJsonString();

outputStream.write(jsonString.getBytes());
outputStream.write(System.lineSeparator().getBytes());
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
outputStream.close();
}

private void writeToOutputStream(final OutputStream outputStream, final Object object) throws IOException {
byte[] byteArr = null;
if (object instanceof Map) {
Map<Object, Object> map = objectMapper.convertValue(object, Map.class);
for (String key : config.getExcludeKeys()) {
if (map.containsKey(key)) {
map.remove(key);
}
}
String json = objectMapper.writeValueAsString(map);
byteArr = json.getBytes();
} else {
byteArr = object.toString().getBytes();
}
outputStream.write(byteArr);
outputStream.write(System.lineSeparator().getBytes());
}

@Override
public String getExtension() {
return NDJSON;
}

/**
* Pre-processes a list of Keys and returns a sorted list
* The keys must start with `/` and not end with `/`
*
* @param keys a list of raw keys
* @return a sorted processed keys
*/
private List<String> preprocessingKeys(final List<String> keys) {
if (keys.contains("/")) {
return new ArrayList<>();
}
List<String> result = keys.stream()
.map(k -> k.startsWith("/") ? k : "/" + k)
.map(k -> k.endsWith("/") ? k.substring(0, k.length() - 1) : k)
.collect(Collectors.toList());
Collections.sort(result);
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,23 @@ public class NdjsonOutputConfig {
@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;


@JsonProperty("include_keys")
private List<String> includeKeys = DEFAULT_EXCLUDE_KEYS;

public List<String> getExcludeKeys() {
return excludeKeys;
}

public List<String> getIncludeKeys() {
return includeKeys;
}

public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
}

public void setIncludeKeys(List<String> includeKeys) {
this.includeKeys = includeKeys;
}
}

0 comments on commit 0ab3010

Please sign in to comment.