Skip to content

Commit

Permalink
Add include_keys and exclude_keys to S3 sink (#3122)
Browse files Browse the repository at this point in the history
* Add validation and update document

Signed-off-by: Aiden Dai <[email protected]>

* Add OutputCodecContext for output codecs.

Signed-off-by: Aiden Dai <[email protected]>

* Add OutputCodecContext for output codecs.

Signed-off-by: Aiden Dai <[email protected]>

---------

Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Aug 9, 2023
1 parent 956e5ad commit 94d7b10
Show file tree
Hide file tree
Showing 29 changed files with 484 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;

import java.io.IOException;
Expand All @@ -26,21 +27,20 @@ public interface OutputCodec {
*
* @param outputStream outputStream param for wrapping
* @param event Event to auto-generate schema
* @param tagsTargetKey to add tags to the record to create schema
* @param context Extra Context used in Codec.
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException;
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;

/**
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;
void writeEvent(Event event, OutputStream outputStream) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -122,6 +123,13 @@ private SinkInternalJsonModel(final List<String> routes, final String tagsTarget
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList();
this.tagsTargetKey = tagsTargetKey;
validateConfiguration();
}

void validateConfiguration() {
if (!includeKeys.isEmpty() && !excludeKeys.isEmpty()) {
throw new InvalidPluginConfigurationException("include_keys and exclude_keys cannot both exist in the configuration at the same time.");
}
}


Expand Down Expand Up @@ -151,4 +159,4 @@ static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkM
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import java.util.Collections;
import java.util.List;

/**
* Data Prepper Output Codec Context class.
* The context contains information that are shared and may be used among {@link org.opensearch.dataprepper.model.codec.OutputCodec}
*/
public class OutputCodecContext {

private final String tagsTargetKey;

private final List<String> includeKeys;
private final List<String> excludeKeys;

public OutputCodecContext() {
this(null, Collections.emptyList(), Collections.emptyList());
}


public OutputCodecContext(String tagsTargetKey, List<String> includeKeys, List<String> excludeKeys) {
this.tagsTargetKey = tagsTargetKey;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
}


public static OutputCodecContext fromSinkContext(SinkContext sinkContext) {
if (sinkContext == null) {
return new OutputCodecContext();
}
return new OutputCodecContext(sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys());
}

public String getTagsTargetKey() {
return tagsTargetKey;
}

public List<String> getIncludeKeys() {
return includeKeys;
}

public List<String> getExcludeKeys() {
return excludeKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -30,11 +31,11 @@ public void setUp() {
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException {
public void start(OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
}

@Override
public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(Event event, OutputStream outputStream) throws IOException {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -21,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand All @@ -32,6 +34,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -90,7 +93,6 @@ void serialize_into_known_SinkModel() throws IOException {
}



@Test
void deserialize_with_any_pluginModel() throws IOException {
final InputStream inputStream = this.getClass().getResourceAsStream("/serialized_with_plugin_settings.yaml");
Expand Down Expand Up @@ -144,13 +146,29 @@ void serialize_with_just_pluginModel() throws IOException {
@Test
void sinkModel_with_include_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "/abc", "efg/"), null, pluginSettings);

assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));

}

@Test
void sinkModel_with_exclude_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);

assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));

}

@Test
void sinkModel_with_both_include_and_exclude_keys() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings));
}

@Nested
class BuilderTest {
private PluginModel pluginModel;
Expand Down Expand Up @@ -181,10 +199,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getExcludeKeys(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());

}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;

public class OutputCodecContextTest {


@Test
public void testOutputCodecContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testIncludeKeys = Collections.emptyList();
final List<String> testExcludeKeys = Collections.emptyList();
OutputCodecContext codecContext = new OutputCodecContext(testTagsTargetKey, testIncludeKeys, testExcludeKeys);
assertThat(codecContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(codecContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(codecContext.getExcludeKeys(), equalTo(testExcludeKeys));

OutputCodecContext emptyContext = new OutputCodecContext();
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));


}

@Test
public void testOutputCodecContextAdapter() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testIncludeKeys = Collections.emptyList();
final List<String> testExcludeKeys = Collections.emptyList();

SinkContext sinkContext = new SinkContext(testTagsTargetKey, null, testIncludeKeys, testExcludeKeys);

OutputCodecContext codecContext = OutputCodecContext.fromSinkContext(sinkContext);
assertThat(codecContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(codecContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(codecContext.getExcludeKeys(), equalTo(testExcludeKeys));

OutputCodecContext emptyContext = OutputCodecContext.fromSinkContext(null);
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));


}
}
Loading

0 comments on commit 94d7b10

Please sign in to comment.