Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kkondaka authored Jul 7, 2024
2 parents deaf11a + 253e592 commit 932c549
Show file tree
Hide file tree
Showing 47 changed files with 1,665 additions and 56 deletions.
6 changes: 3 additions & 3 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
This code of conduct applies to all spaces provided by the OpenSource project including in code, documentation, issue trackers, mailing lists, chat channels, wikis, blogs, social media and any other communication channels used by the project.

This code of conduct applies to all spaces provided by the OpenSource project including in code, documentation, issue trackers, mailing lists, chat channels, wikis, blogs, social media, events, conferences, meetings, and any other communication channels used by the project.

**Our open source communities endeavor to:**

Expand All @@ -8,7 +8,6 @@ This code of conduct applies to all spaces provided by the OpenSource project in
* Be Respectful: We are committed to encouraging differing viewpoints, accepting constructive criticism and work collaboratively towards decisions that help the project grow. Disrespectful and unacceptable behavior will not be tolerated.
* Be Collaborative: We are committed to supporting what is best for our community and users. When we build anything for the benefit of the project, we should document the work we do and communicate to others on how this affects their work.


**Our Responsibility. As contributors, members, or bystanders we each individually have the responsibility to behave professionally and respectfully at all times. Disrespectful and unacceptable behaviors include, but are not limited to:**

* The use of violent threats, abusive, discriminatory, or derogatory language;
Expand All @@ -19,6 +18,7 @@ This code of conduct applies to all spaces provided by the OpenSource project in
* Publishing private information, such as physical or electronic address, without permission;
* Other conduct which could reasonably be considered inappropriate in a professional setting;
* Advocating for or encouraging any of the above behaviors.
* Enforcement and Reporting Code of Conduct Issues:

**Enforcement and Reporting Code of Conduct Issues:**

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported. [Contact us](mailto:[email protected]). All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances.
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.commons.lang3
testImplementation project(':data-prepper-test-common')
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/avro-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
dependencies {
implementation project(path: ':data-prepper-api')
implementation libs.avro.core
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
testImplementation 'org.json:json:20240205'
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
implementation libs.bouncycastle.bcpkix
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
implementation 'org.xerial.snappy:snappy-java:1.1.10.5'
testImplementation project(':data-prepper-plugins:blocking-buffer')
testImplementation project(':data-prepper-test-event')
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/csv-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'io.micrometer:micrometer-core'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
testImplementation project(':data-prepper-plugins:log-generator-source')
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/event-json-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
testImplementation project(':data-prepper-test-common')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,22 @@ private void addPart(List<String> parts, final String str, final int start, fina
}
}

public int findInStartGroup(final String str, int idx) {
private int findInStartGroup(final String str, final int idx) {
if (idx < 0 || idx >= str.length()) {
return -1; // Invalid starting index
}

for (int j = 0; j < startGroupStrings.length; j++) {
try {
if (startGroupStrings[j].equals(str.substring(idx, idx+startGroupStrings[j].length()))) {
// For " and ', make sure, it's not escaped
if (j <= 1 && (idx == 0 || str.charAt(idx-1) != '\\')) {
return j;
} else if (j > 1) {
return j;
}
String startGroup = startGroupStrings[j];
int startGroupLen = startGroup.length();

if (idx + startGroupLen <= str.length() && str.startsWith(startGroup, idx)) {
// For the first two elements, check for escape characters
if (j <= 1 && (idx == 0 || str.charAt(idx - 1) != '\\')) {
return j;
} else if (j > 1) {
return j;
}
} catch (Exception e) {
return -1;
}
}
return -1;
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/newline-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
testImplementation project(':data-prepper-plugins:common')
testImplementation project(':data-prepper-test-event')
}
Expand Down
4 changes: 2 additions & 2 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ dependencies {
}
testImplementation testLibs.junit.vintage
testImplementation libs.commons.io
testImplementation 'net.bytebuddy:byte-buddy:1.14.12'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.12'
testImplementation 'net.bytebuddy:byte-buddy:1.14.17'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.17'
testImplementation testLibs.slf4j.simple
testImplementation testLibs.mockito.inline
}
Expand Down
8 changes: 4 additions & 4 deletions data-prepper-plugins/parquet-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation libs.avro.core
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.parquet:parquet-avro:1.14.0'
implementation 'org.apache.parquet:parquet-column:1.14.0'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation 'org.apache.parquet:parquet-hadoop:1.14.0'
implementation libs.parquet.avro
implementation libs.parquet.column
implementation libs.parquet.common
implementation libs.parquet.hadoop
runtimeOnly(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation libs.parquet.common
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -50,6 +51,7 @@ protected AbstractParseProcessor(PluginMetrics pluginMetrics,
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -93,6 +95,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
event.put(destination, parsedValue);
}

if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public interface CommonParseConfig {
* An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination.
* There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing
* key in the Event then the absolute path of the target key will be placed into destination
*
* Note: (should this be configurable/what about double conflicts?)
* @return String representing JSON Pointer
*/
Expand All @@ -54,4 +53,10 @@ public interface CommonParseConfig {
* Defaults to true.
*/
boolean getOverwriteIfDestinationExists();

/**
* An optional setting used to request dropping the original raw message after successfully parsing the input event.
* Defaults to false.
*/
boolean isDeleteSourceRequested();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ParseIonProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -68,6 +71,11 @@ boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class ParseJsonProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -63,11 +66,16 @@ public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ParseXmlProcessorConfig implements CommonParseConfig {
@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@JsonProperty
private boolean deleteSource = false;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -65,6 +68,11 @@ boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/"));
}

@Override
public boolean isDeleteSourceRequested() {
return deleteSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseIonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseIonProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ void test_when_using_ion_features_then_processorParsesCorrectly() {
final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true));
assertThat(parsedEvent.get(processorConfig.getSource(), Object.class), equalTo(serializedMessage));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);

final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1));
assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL"));
assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false));
}

@Nested
Expand Down Expand Up @@ -57,6 +58,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseJsonProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() {
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);

final String key = "key";
final ArrayList<String> value = new ArrayList<>(List.of("Element0","Element1","Element2"));
final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}";
final Event parsedEvent = createAndParseMessageEvent(jsonArray);

assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false));
assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value));
final String pointerToFirstElement = key + "/0";
assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0)));
}

@Test
void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() {
parseJsonProcessor = createObjectUnderTest();
Expand Down Expand Up @@ -373,23 +389,21 @@ private String constructDeeplyNestedJsonPointer(final int numberOfLayers) {

/**
* Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation).
* @param messageMap
* @return
* @param messageMap source key value map
* @return serialized string representation of the map
*/
private String convertMapToJSONString(final Map<String, Object> messageMap) {
final String replaceEquals = messageMap.toString().replace("=",":");
final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes
return addQuotes;
return replaceEquals.replaceAll("(\\w+)", "\"$1\"");
}

/**
* Creates a Map that maps a single key to a value nested numberOfLayers layers deep.
* @param numberOfLayers
* @return
* @param numberOfLayers indicates the depth of layers count
* @return a Map representing the nested structure
*/
private Map<String, Object> constructArbitrarilyDeepJsonMap(final int numberOfLayers) {
final Map<String, Object> result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
return result;
return Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
}

private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));

setField(ParseXmlProcessorConfig.class, config, "deleteSource", true);
assertThat(config.isDeleteSourceRequested(), equalTo(true));
}
}
}
Loading

0 comments on commit 932c549

Please sign in to comment.