diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 2e863762fd..cb23547755 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -1,5 +1,5 @@ -This code of conduct applies to all spaces provided by the OpenSource project including in code, documentation, issue trackers, mailing lists, chat channels, wikis, blogs, social media and any other communication channels used by the project. +This code of conduct applies to all spaces provided by the OpenSource project including in code, documentation, issue trackers, mailing lists, chat channels, wikis, blogs, social media, events, conferences, meetings, and any other communication channels used by the project. **Our open source communities endeavor to:** @@ -8,7 +8,6 @@ This code of conduct applies to all spaces provided by the OpenSource project in * Be Respectful: We are committed to encouraging differing viewpoints, accepting constructive criticism and work collaboratively towards decisions that help the project grow. Disrespectful and unacceptable behavior will not be tolerated. * Be Collaborative: We are committed to supporting what is best for our community and users. When we build anything for the benefit of the project, we should document the work we do and communicate to others on how this affects their work. - **Our Responsibility. As contributors, members, or bystanders we each individually have the responsibility to behave professionally and respectfully at all times. Disrespectful and unacceptable behaviors include, but are not limited to:** * The use of violent threats, abusive, discriminatory, or derogatory language; @@ -19,6 +18,7 @@ This code of conduct applies to all spaces provided by the OpenSource project in * Publishing private information, such as physical or electronic address, without permission; * Other conduct which could reasonably be considered inappropriate in a professional setting; * Advocating for or encouraging any of the above behaviors. -* Enforcement and Reporting Code of Conduct Issues: + +**Enforcement and Reporting Code of Conduct Issues:** Instances of abusive, harassing, or otherwise unacceptable behavior may be reported. [Contact us](mailto:opensource-codeofconduct@amazon.com). All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. diff --git a/data-prepper-api/build.gradle b/data-prepper-api/build.gradle index 0ad43ff470..045d331704 100644 --- a/data-prepper-api/build.gradle +++ b/data-prepper-api/build.gradle @@ -12,7 +12,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation libs.commons.lang3 testImplementation project(':data-prepper-test-common') diff --git a/data-prepper-plugins/avro-codecs/build.gradle b/data-prepper-plugins/avro-codecs/build.gradle index e6c5ea5e54..2bce28bbe0 100644 --- a/data-prepper-plugins/avro-codecs/build.gradle +++ b/data-prepper-plugins/avro-codecs/build.gradle @@ -6,7 +6,7 @@ dependencies { implementation project(path: ':data-prepper-api') implementation libs.avro.core - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:apache-client' testImplementation 'org.json:json:20240205' diff --git a/data-prepper-plugins/common/build.gradle b/data-prepper-plugins/common/build.gradle index 947d1234d4..aec7d7bddc 100644 --- a/data-prepper-plugins/common/build.gradle +++ b/data-prepper-plugins/common/build.gradle @@ -19,7 +19,7 @@ dependencies { implementation libs.bouncycastle.bcpkix implementation libs.reflections.core implementation 'io.micrometer:micrometer-core' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common implementation 'org.xerial.snappy:snappy-java:1.1.10.5' testImplementation project(':data-prepper-plugins:blocking-buffer') testImplementation project(':data-prepper-test-event') diff --git a/data-prepper-plugins/csv-processor/build.gradle b/data-prepper-plugins/csv-processor/build.gradle index 56c02daf83..cda0694a66 100644 --- a/data-prepper-plugins/csv-processor/build.gradle +++ b/data-prepper-plugins/csv-processor/build.gradle @@ -12,7 +12,7 @@ dependencies { implementation project(':data-prepper-api') implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' implementation 'io.micrometer:micrometer-core' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:apache-client' testImplementation project(':data-prepper-plugins:log-generator-source') diff --git a/data-prepper-plugins/event-json-codecs/build.gradle b/data-prepper-plugins/event-json-codecs/build.gradle index aad563d19d..2278bf6033 100644 --- a/data-prepper-plugins/event-json-codecs/build.gradle +++ b/data-prepper-plugins/event-json-codecs/build.gradle @@ -15,7 +15,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common testImplementation project(':data-prepper-test-common') } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index ea3a7accdb..c42e015829 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -281,19 +281,22 @@ private void addPart(List parts, final String str, final int start, fina } } - public int findInStartGroup(final String str, int idx) { + private int findInStartGroup(final String str, final int idx) { + if (idx < 0 || idx >= str.length()) { + return -1; // Invalid starting index + } + for (int j = 0; j < startGroupStrings.length; j++) { - try { - if (startGroupStrings[j].equals(str.substring(idx, idx+startGroupStrings[j].length()))) { - // For " and ', make sure, it's not escaped - if (j <= 1 && (idx == 0 || str.charAt(idx-1) != '\\')) { - return j; - } else if (j > 1) { - return j; - } + String startGroup = startGroupStrings[j]; + int startGroupLen = startGroup.length(); + + if (idx + startGroupLen <= str.length() && str.startsWith(startGroup, idx)) { + // For the first two elements, check for escape characters + if (j <= 1 && (idx == 0 || str.charAt(idx - 1) != '\\')) { + return j; + } else if (j > 1) { + return j; } - } catch (Exception e) { - return -1; } } return -1; diff --git a/data-prepper-plugins/newline-codecs/build.gradle b/data-prepper-plugins/newline-codecs/build.gradle index b504ed30ee..c71e8755ef 100644 --- a/data-prepper-plugins/newline-codecs/build.gradle +++ b/data-prepper-plugins/newline-codecs/build.gradle @@ -5,7 +5,7 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation 'com.fasterxml.jackson.core:jackson-annotations' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common testImplementation project(':data-prepper-plugins:common') testImplementation project(':data-prepper-test-event') } diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index b87e533afe..bece32eaae 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -41,8 +41,8 @@ dependencies { } testImplementation testLibs.junit.vintage testImplementation libs.commons.io - testImplementation 'net.bytebuddy:byte-buddy:1.14.12' - testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.12' + testImplementation 'net.bytebuddy:byte-buddy:1.14.17' + testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.17' testImplementation testLibs.slf4j.simple testImplementation testLibs.mockito.inline } diff --git a/data-prepper-plugins/parquet-codecs/build.gradle b/data-prepper-plugins/parquet-codecs/build.gradle index dd59e28068..fbc8f4a209 100644 --- a/data-prepper-plugins/parquet-codecs/build.gradle +++ b/data-prepper-plugins/parquet-codecs/build.gradle @@ -8,10 +8,10 @@ dependencies { implementation project(':data-prepper-plugins:common') implementation libs.avro.core implementation 'org.apache.commons:commons-text:1.11.0' - implementation 'org.apache.parquet:parquet-avro:1.14.0' - implementation 'org.apache.parquet:parquet-column:1.14.0' - implementation 'org.apache.parquet:parquet-common:1.14.0' - implementation 'org.apache.parquet:parquet-hadoop:1.14.0' + implementation libs.parquet.avro + implementation libs.parquet.column + implementation libs.parquet.common + implementation libs.parquet.hadoop runtimeOnly(libs.hadoop.common) { exclude group: 'org.eclipse.jetty' exclude group: 'org.apache.hadoop', module: 'hadoop-auth' diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index 44959173ba..488dbf7d86 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -13,7 +13,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common testImplementation project(':data-prepper-test-common') testImplementation project(':data-prepper-test-event') } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index a2b984d070..878316c183 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -36,6 +36,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor tagsOnFailure; private final boolean overwriteIfDestinationExists; + private final boolean deleteSourceRequested; private final ExpressionEvaluator expressionEvaluator; @@ -50,6 +51,7 @@ protected AbstractParseProcessor(PluginMetrics pluginMetrics, parseWhen = commonParseConfig.getParseWhen(); tagsOnFailure = commonParseConfig.getTagsOnFailure(); overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists(); + deleteSourceRequested = commonParseConfig.isDeleteSourceRequested(); this.expressionEvaluator = expressionEvaluator; } @@ -93,6 +95,10 @@ public Collection> doExecute(final Collection> recor } else if (overwriteIfDestinationExists || !event.containsKey(destination)) { event.put(destination, parsedValue); } + + if(deleteSourceRequested) { + event.delete(this.source); + } } catch (Exception e) { LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java index 193631bea9..5fd5050b3d 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java @@ -27,7 +27,6 @@ public interface CommonParseConfig { * An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination. * There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing * key in the Event then the absolute path of the target key will be placed into destination - * * Note: (should this be configurable/what about double conflicts?) * @return String representing JSON Pointer */ @@ -54,4 +53,10 @@ public interface CommonParseConfig { * Defaults to true. */ boolean getOverwriteIfDestinationExists(); + + /** + * An optional setting used to request dropping the original raw message after successfully parsing the input event. + * Defaults to false. + */ + boolean isDeleteSourceRequested(); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java index 67a2f464ad..fcc2950477 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -35,6 +35,9 @@ public class ParseIonProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -68,6 +71,11 @@ boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); + } + + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index e0a2e91c1d..49ff2a5969 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -35,6 +35,9 @@ public class ParseJsonProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -63,11 +66,16 @@ public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java index df4fabc397..c90173dc43 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -30,6 +30,9 @@ public class ParseXmlProcessorConfig implements CommonParseConfig { @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; + @JsonProperty + private boolean deleteSource = false; + @Override public String getSource() { return source; @@ -65,6 +68,11 @@ boolean isValidDestination() { if (Objects.isNull(destination)) return true; final String trimmedDestination = destination.trim(); - return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); + } + + @Override + public boolean isDeleteSourceRequested() { + return deleteSource; } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java index 0fb274ba13..8c47650c05 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java @@ -57,6 +57,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseIonProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseIonProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index 62873866d7..c9a8fdf4e5 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -47,6 +47,23 @@ void test_when_using_ion_features_then_processorParsesCorrectly() { final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + assertThat(parsedEvent.get(processorConfig.getSource(), Object.class), equalTo(serializedMessage)); + assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1)); + assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); + assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); + assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + } + + @Test + void test_when_deleteSourceFlagEnabled() { + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator); + + final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); assertThat(parsedEvent.get("bareKey", Integer.class), equalTo(1)); assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java index 459fab6ea5..aa138a0e7e 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java @@ -29,6 +29,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false)); } @Nested @@ -57,6 +58,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseJsonProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index 4594cbe2f5..1416d6cf35 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -194,6 +194,22 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); } + @Test + void test_when_deleteSourceFlagEnabled() { + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator); + + final String key = "key"; + final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); + final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}"; + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); + assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); + final String pointerToFirstElement = key + "/0"; + assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + } + @Test void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() { parseJsonProcessor = createObjectUnderTest(); @@ -373,23 +389,21 @@ private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { /** * Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation). - * @param messageMap - * @return + * @param messageMap source key value map + * @return serialized string representation of the map */ private String convertMapToJSONString(final Map messageMap) { final String replaceEquals = messageMap.toString().replace("=",":"); - final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes - return addQuotes; + return replaceEquals.replaceAll("(\\w+)", "\"$1\""); } /** * Creates a Map that maps a single key to a value nested numberOfLayers layers deep. - * @param numberOfLayers - * @return + * @param numberOfLayers indicates the depth of layers count + * @return a Map representing the nested structure */ private Map constructArbitrarilyDeepJsonMap(final int numberOfLayers) { - final Map result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers)); - return result; + return Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers)); } private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java index d5e7e1ec43..bab6d6e919 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java @@ -52,6 +52,9 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList); assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + + setField(ParseXmlProcessorConfig.class, config, "deleteSource", true); + assertThat(config.isDeleteSourceRequested(), equalTo(true)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 51de35ca70..8d9bc4cde3 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -60,6 +60,22 @@ void test_when_using_xml_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("age", String.class), equalTo("30")); } + @Test + void test_when_deleteSourceFlagEnabled() { + + final String tagOnFailure = UUID.randomUUID().toString(); + when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(processorConfig.isDeleteSourceRequested()).thenReturn(true); + + parseXmlProcessor = createObjectUnderTest(); + + final String serializedMessage = "John Doe30"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); + assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); + assertThat(parsedEvent.get("age", String.class), equalTo("30")); + } + @Test void test_when_using_invalid_xml_tags_correctly() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 0e8a92e31d..f059dd52bf 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -29,6 +29,8 @@ public class RdsService { private final PluginMetrics pluginMetrics; private final RdsSourceConfig sourceConfig; private ExecutorService executor; + private LeaderScheduler leaderScheduler; + private ExportScheduler exportScheduler; public RdsService(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, @@ -51,8 +53,10 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator, public void start(Buffer> buffer) { LOG.info("Start running RDS service"); final List runnableList = new ArrayList<>(); - runnableList.add(new LeaderScheduler(sourceCoordinator, sourceConfig)); - runnableList.add(new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics)); + leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig); + exportScheduler = new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics); + runnableList.add(leaderScheduler); + runnableList.add(exportScheduler); executor = Executors.newFixedThreadPool(runnableList.size()); runnableList.forEach(executor::submit); @@ -65,6 +69,8 @@ public void start(Buffer> buffer) { public void shutdown() { if (executor != null) { LOG.info("shutdown RDS schedulers"); + exportScheduler.shutdown(); + leaderScheduler.shutdown(); executor.shutdownNow(); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java index cc4bd23ca0..a9fe983572 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java @@ -17,6 +17,8 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; +import org.opensearch.dataprepper.plugins.source.rds.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,7 @@ public RdsSource(final PluginMetrics pluginMetrics, @Override public void start(Buffer> buffer) { Objects.requireNonNull(sourceCoordinator); + sourceCoordinator.createPartition(new LeaderPartition()); rdsService = new RdsService(sourceCoordinator, sourceConfig, clientFactory, pluginMetrics); @@ -70,6 +73,6 @@ public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordin @Override public Function getPartitionFactory() { - return null; + return new PartitionFactory(); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java new file mode 100644 index 0000000000..db35f5076b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; + +import java.util.function.Function; + +/** + * Partition factory to map a {@link SourcePartitionStoreItem} to a {@link EnhancedSourcePartition}. + */ +public class PartitionFactory implements Function { + + @Override + public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem) { + String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); + String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); + + if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { + return new LeaderPartition(partitionStoreItem); + } if (ExportPartition.PARTITION_TYPE.equals(partitionType)) { + return new ExportPartition(partitionStoreItem); + } else { + // Unable to acquire other partitions. + return new GlobalState(partitionStoreItem); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ExportPartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ExportPartition.java new file mode 100644 index 0000000000..5d79378dec --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/ExportPartition.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; + +import java.util.Optional; + +/** + * An ExportPartition represents an export job needs to be run for tables. + * Each export job has an export time associate with it. + * Each job maintains the state such as total files/records etc. independently. + * The source identifier contains keyword 'EXPORT' + */ +public class ExportPartition extends EnhancedSourcePartition { + public static final String PARTITION_TYPE = "EXPORT"; + + private static final String DB_CLUSTER = "cluster"; + private static final String DB_INSTANCE = "instance"; + + private final String dbIdentifier; + + private final boolean isCluster; + + private final ExportProgressState progressState; + + public ExportPartition(String dbIdentifier, boolean isCluster, ExportProgressState progressState) { + this.dbIdentifier = dbIdentifier; + this.isCluster = isCluster; + this.progressState = progressState; + } + + public ExportPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + String [] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|"); + dbIdentifier = keySplits[0]; + isCluster = DB_CLUSTER.equals(keySplits[1]); + progressState = convertStringToPartitionProgressState(ExportProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + final String dbType = isCluster ? DB_CLUSTER : DB_INSTANCE; + return dbIdentifier + "|" + dbType; + } + + @Override + public Optional getProgressState() { + if (progressState != null) { + return Optional.of(progressState); + } + return Optional.empty(); + } + + public String getDbIdentifier() { + return dbIdentifier; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java new file mode 100644 index 0000000000..c6f1d394a2 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; + +import java.util.Map; +import java.util.Optional; + +public class GlobalState extends EnhancedSourcePartition> { + + private final String stateName; + + private Map state; + + public GlobalState(String stateName, Map state) { + this.stateName = stateName; + this.state = state; + } + + public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + stateName = sourcePartitionStoreItem.getSourcePartitionKey(); + state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return null; + } + + @Override + public String getPartitionKey() { + return stateName; + } + + @Override + public Optional> getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + + public void setProgressState(Map state) { + this.state = state; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/LeaderPartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/LeaderPartition.java new file mode 100644 index 0000000000..806b199998 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/LeaderPartition.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; + +import java.util.Optional; + +/** + *

A LeaderPartition is for some tasks that should be done in a single node only.

+ *

Hence whatever node owns the lease of this partition will be acted as a 'leader'.

+ *

In this DynamoDB source design, a leader node will be responsible for:

+ *
    + *
  • Initialization process (create EXPORT and STREAM partitions)
  • + *
  • Triggering RDS export task
  • + *
  • Reading stream data
  • + *
+ */ +public class LeaderPartition extends EnhancedSourcePartition { + public static final String PARTITION_TYPE = "LEADER"; + + // identifier for the partition + private static final String DEFAULT_PARTITION_KEY = "GLOBAL"; + + private final LeaderProgressState state; + + public LeaderPartition() { + this.state = new LeaderProgressState(); + } + + public LeaderPartition(SourcePartitionStoreItem partitionStoreItem) { + setSourcePartitionStoreItem(partitionStoreItem); + this.state = convertStringToPartitionProgressState(LeaderProgressState.class, partitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return DEFAULT_PARTITION_KEY; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java new file mode 100644 index 0000000000..cde2be6dd8 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/ExportProgressState.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * Progress state for an EXPORT partition + */ +public class ExportProgressState { + + @JsonProperty("snapshotId") + private String snapshotId; + + @JsonProperty("exportTaskId") + private String exportTaskId; + + @JsonProperty("iamRoleArn") + private String iamRoleArn; + + @JsonProperty("bucket") + private String bucket; + + @JsonProperty("prefix") + private String prefix; + + @JsonProperty("tables") + private List tables; + + @JsonProperty("kmsKeyId") + private String kmsKeyId; + + @JsonProperty("exportTime") + private String exportTime; + + @JsonProperty("status") + private String status; + + public String getSnapshotId() { + return snapshotId; + } + + public void setSnapshotId(String snapshotId) { + this.snapshotId = snapshotId; + } + + public String getExportTaskId() { + return exportTaskId; + } + + public void setExportTaskId(String exportTaskId) { + this.exportTaskId = exportTaskId; + } + + public String getIamRoleArn() { + return iamRoleArn; + } + + public void setIamRoleArn(String iamRoleArn) { + this.iamRoleArn = iamRoleArn; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + public List getTables() { + return tables; + } + + public void setTables(List tables) { + this.tables = tables; + } + + public String getKmsKeyId() { + return kmsKeyId; + } + + public void setKmsKeyId(String kmsKeyId) { + this.kmsKeyId = kmsKeyId; + } + + public String getExportTime() { + return exportTime; + } + + public void setExportTime(String exportTime) { + this.exportTime = exportTime; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/LeaderProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/LeaderProgressState.java new file mode 100644 index 0000000000..216fb64fae --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/LeaderProgressState.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Progress state for a LEADER partition + */ +public class LeaderProgressState { + + @JsonProperty("initialized") + private boolean initialized = false; + + public boolean isInitialized() { + return initialized; + } + + public void setInitialized(boolean initialized) { + this.initialized = initialized; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java index 9c83643c68..51db82248b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java @@ -7,13 +7,23 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.ExportStatus; +import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotInfo; +import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.rds.RdsClient; import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.BiConsumer; public class ExportScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); @@ -23,14 +33,16 @@ public class ExportScheduler implements Runnable { private static final int DEFAULT_MAX_CLOSE_COUNT = 36; private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30 * 1000; + private static final Duration DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT = Duration.ofMinutes(60); private final RdsClient rdsClient; - private final PluginMetrics pluginMetrics; - private final EnhancedSourceCoordinator sourceCoordinator; - private final ExecutorService executor; + private final ExportTaskManager exportTaskManager; + private final SnapshotManager snapshotManager; + + private volatile boolean shutdownRequested = false; public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsClient rdsClient, @@ -39,10 +51,182 @@ public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, this.sourceCoordinator = sourceCoordinator; this.rdsClient = rdsClient; this.executor = Executors.newCachedThreadPool(); + this.exportTaskManager = new ExportTaskManager(rdsClient); + this.snapshotManager = new SnapshotManager(rdsClient); } @Override public void run() { + LOG.debug("Start running Export Scheduler"); + while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { + try { + final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); + + if (sourcePartition.isPresent()) { + ExportPartition exportPartition = (ExportPartition) sourcePartition.get(); + LOG.debug("Acquired an export partition: {}", exportPartition.getPartitionKey()); + + String exportTaskId = getOrCreateExportTaskId(exportPartition); + + if (exportTaskId == null) { + LOG.error("The export to S3 failed, it will be retried"); + closeExportPartitionWithError(exportPartition); + } else { + CompletableFuture checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor); + checkStatus.whenComplete(completeExport(exportPartition)); + } + } + + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception during export, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + LOG.warn("Export scheduler interrupted, looks like shutdown has triggered"); + executor.shutdownNow(); + } + + public void shutdown() { + shutdownRequested = true; + } + + private String getOrCreateExportTaskId(ExportPartition exportPartition) { + ExportProgressState progressState = exportPartition.getProgressState().get(); + + if (progressState.getExportTaskId() != null) { + LOG.info("Export task has already created for db {}", exportPartition.getDbIdentifier()); + return progressState.getExportTaskId(); + } + + LOG.info("Creating a new snapshot for db {}", exportPartition.getDbIdentifier()); + SnapshotInfo snapshotInfo = snapshotManager.createSnapshot(exportPartition.getDbIdentifier()); + if (snapshotInfo != null) { + LOG.info("Snapshot id is {}", snapshotInfo.getSnapshotId()); + progressState.setSnapshotId(snapshotInfo.getSnapshotId()); + sourceCoordinator.saveProgressStateForPartition(exportPartition, null); + } else { + LOG.error("The snapshot failed to create, it will be retried"); + closeExportPartitionWithError(exportPartition); + return null; + } + + final String snapshotId = snapshotInfo.getSnapshotId(); + try { + checkSnapshotStatus(snapshotId, DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT); + } catch (Exception e) { + LOG.warn("Check snapshot status for {} failed", snapshotId, e); + sourceCoordinator.giveUpPartition(exportPartition); + return null; + } + + LOG.info("Creating an export task for db {} from snapshot {}", exportPartition.getDbIdentifier(), snapshotId); + String exportTaskId = exportTaskManager.startExportTask( + snapshotInfo.getSnapshotArn(), progressState.getIamRoleArn(), progressState.getBucket(), + progressState.getPrefix(), progressState.getKmsKeyId(), progressState.getTables()); + + if (exportTaskId != null) { + LOG.info("Export task id is {}", exportTaskId); + progressState.setExportTaskId(exportTaskId); + sourceCoordinator.saveProgressStateForPartition(exportPartition, null); + } else { + LOG.error("The export task failed to create, it will be retried"); + closeExportPartitionWithError(exportPartition); + return null; + } + + return exportTaskId; + } + + private void closeExportPartitionWithError(ExportPartition exportPartition) { + ExportProgressState exportProgressState = exportPartition.getProgressState().get(); + // Clear current task id, so that a new export can be submitted. + exportProgressState.setExportTaskId(null); + sourceCoordinator.closePartition(exportPartition, DEFAULT_CLOSE_DURATION, DEFAULT_MAX_CLOSE_COUNT); + } + + private String checkSnapshotStatus(String snapshotId, Duration timeout) { + final Instant endTime = Instant.now().plus(timeout); + + LOG.debug("Start checking status of snapshot {}", snapshotId); + while (Instant.now().isBefore(endTime)) { + SnapshotInfo snapshotInfo = snapshotManager.checkSnapshotStatus(snapshotId); + String status = snapshotInfo.getStatus(); + // Valid snapshot statuses are: available, copying, creating + // The status should never be "copying" here + if (SnapshotStatus.AVAILABLE.getStatusName().equals(status)) { + LOG.info("Snapshot {} is available.", snapshotId); + return status; + } + + LOG.debug("Snapshot {} is still creating. Wait and check later", snapshotId); + try { + Thread.sleep(DEFAULT_CHECK_STATUS_INTERVAL_MILLS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + throw new RuntimeException("Snapshot status check timed out."); + } + + private String checkExportStatus(ExportPartition exportPartition) { + long lastCheckpointTime = System.currentTimeMillis(); + String exportTaskId = exportPartition.getProgressState().get().getExportTaskId(); + + LOG.debug("Start checking the status of export {}", exportTaskId); + while (true) { + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + sourceCoordinator.saveProgressStateForPartition(exportPartition, null); + lastCheckpointTime = System.currentTimeMillis(); + } + + // Valid statuses are: CANCELED, CANCELING, COMPLETE, FAILED, IN_PROGRESS, STARTING + String status = exportTaskManager.checkExportStatus(exportTaskId); + LOG.debug("Current export status is {}.", status); + if (ExportStatus.isTerminal(status)) { + LOG.info("Export {} is completed with final status {}", exportTaskId, status); + return status; + } + LOG.debug("Export {} is still running in progress. Wait and check later", exportTaskId); + try { + Thread.sleep(DEFAULT_CHECK_STATUS_INTERVAL_MILLS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private BiConsumer completeExport(ExportPartition exportPartition) { + return (status, ex) -> { + if (ex != null) { + LOG.warn("Check export status for {} failed", exportPartition.getPartitionKey(), ex); + sourceCoordinator.giveUpPartition(exportPartition); + } else { + if (!ExportStatus.COMPLETE.name().equals(status)) { + LOG.error("Export failed with status {}", status); + closeExportPartitionWithError(exportPartition); + return; + } + LOG.info("Export for {} completed successfully", exportPartition.getPartitionKey()); + + completeExportPartition(exportPartition); + } + }; + } + private void completeExportPartition(ExportPartition exportPartition) { + ExportProgressState progressState = exportPartition.getProgressState().get(); + progressState.setStatus("Completed"); + sourceCoordinator.completePartition(exportPartition); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java new file mode 100644 index 0000000000..dc447c2f42 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; +import software.amazon.awssdk.services.rds.model.StartExportTaskRequest; +import software.amazon.awssdk.services.rds.model.StartExportTaskResponse; + +import java.util.Collection; +import java.util.UUID; + +public class ExportTaskManager { + + private static final Logger LOG = LoggerFactory.getLogger(ExportTaskManager.class); + + // Export identifier cannot be longer than 60 characters + private static final int EXPORT_TASK_ID_MAX_LENGTH = 60; + + private final RdsClient rdsClient; + + public ExportTaskManager(final RdsClient rdsClient) { + this.rdsClient = rdsClient; + } + + public String startExportTask(String snapshotArn, String iamRoleArn, String bucket, String prefix, String kmsKeyId, Collection includeTables) { + final String exportTaskId = generateExportTaskId(snapshotArn); + StartExportTaskRequest.Builder requestBuilder = StartExportTaskRequest.builder() + .exportTaskIdentifier(exportTaskId) + .sourceArn(snapshotArn) + .iamRoleArn(iamRoleArn) + .s3BucketName(bucket) + .s3Prefix(prefix) + .kmsKeyId(kmsKeyId); + + if (includeTables != null && !includeTables.isEmpty()) { + requestBuilder.exportOnly(includeTables); + } + + try { + StartExportTaskResponse response = rdsClient.startExportTask(requestBuilder.build()); + LOG.info("Export task submitted with id {} and status {}", exportTaskId, response.status()); + return exportTaskId; + + } catch (Exception e) { + LOG.error("Failed to start an export task", e); + return null; + } + } + + public String checkExportStatus(String exportTaskId) { + DescribeExportTasksRequest request = DescribeExportTasksRequest.builder() + .exportTaskIdentifier(exportTaskId) + .build(); + + DescribeExportTasksResponse response = rdsClient.describeExportTasks(request); + + return response.exportTasks().get(0).status(); + } + + private String generateExportTaskId(String snapshotArn) { + String snapshotId = Arn.fromString(snapshotArn).resource().resource(); + return truncateString(snapshotId, EXPORT_TASK_ID_MAX_LENGTH - 16) + "-export-" + UUID.randomUUID().toString().substring(0, 8); + } + + private String truncateString(String originalString, int maxLength) { + if (originalString.length() <= maxLength) { + return originalString; + } + return originalString.substring(0, maxLength); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManager.java new file mode 100644 index 0000000000..7b8da8717c --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManager.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotRequest; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotResponse; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsRequest; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsResponse; + +import java.time.Instant; +import java.util.UUID; + +public class SnapshotManager { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); + + private final RdsClient rdsClient; + + public SnapshotManager(final RdsClient rdsClient) { + this.rdsClient = rdsClient; + } + + public SnapshotInfo createSnapshot(String dbInstanceId) { + final String snapshotId = generateSnapshotId(dbInstanceId); + CreateDbSnapshotRequest request = CreateDbSnapshotRequest.builder() + .dbInstanceIdentifier(dbInstanceId) + .dbSnapshotIdentifier(snapshotId) + .build(); + + try { + CreateDbSnapshotResponse response = rdsClient.createDBSnapshot(request); + String snapshotArn = response.dbSnapshot().dbSnapshotArn(); + String status = response.dbSnapshot().status(); + Instant createTime = response.dbSnapshot().snapshotCreateTime(); + LOG.info("Creating snapshot with id {} and status {}", snapshotId, status); + + return new SnapshotInfo(snapshotId, snapshotArn, createTime, status); + } catch (Exception e) { + LOG.error("Failed to create snapshot for {}", dbInstanceId, e); + return null; + } + } + + public SnapshotInfo checkSnapshotStatus(String snapshotId) { + DescribeDbSnapshotsRequest request = DescribeDbSnapshotsRequest.builder() + .dbSnapshotIdentifier(snapshotId) + .build(); + + DescribeDbSnapshotsResponse response = rdsClient.describeDBSnapshots(request); + String snapshotArn = response.dbSnapshots().get(0).dbSnapshotArn(); + String status = response.dbSnapshots().get(0).status(); + Instant createTime = response.dbSnapshots().get(0).snapshotCreateTime(); + + return new SnapshotInfo(snapshotId, snapshotArn, createTime, status); + } + + private String generateSnapshotId(String dbClusterId) { + return dbClusterId + "-snapshot-" + UUID.randomUUID().toString().substring(0, 8); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index ca99a7c8f1..4831f1e91a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -6,11 +6,19 @@ package org.opensearch.dataprepper.plugins.source.rds.leader; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.time.Instant; +import java.util.Optional; public class LeaderScheduler implements Runnable { @@ -20,6 +28,9 @@ public class LeaderScheduler implements Runnable { private final EnhancedSourceCoordinator sourceCoordinator; private final RdsSourceConfig sourceConfig; + private LeaderPartition leaderPartition; + private volatile boolean shutdownRequested = false; + public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; @@ -27,6 +38,84 @@ public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, final @Override public void run() { + LOG.info("Starting Leader Scheduler for initialization."); + + while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { + try { + // Try to acquire the lease if not owned + if (leaderPartition == null) { + final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Running as a LEADER node."); + leaderPartition = (LeaderPartition) sourcePartition.get(); + } + } + + // Once owned, run Normal LEADER node process + if (leaderPartition != null) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + if (!leaderProgressState.isInitialized()) { + init(); + } + } + } catch (final Exception e) { + LOG.error("Exception occurred in primary leader scheduling loop", e); + } finally { + if (leaderPartition != null) { + // Extend the timeout + // will always be a leader until shutdown + sourceCoordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + } + + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL.toMillis()); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred while waiting in leader scheduling loop."); + break; + } + } + } + + // Should stop + LOG.warn("Quitting Leader Scheduler"); + if (leaderPartition != null) { + sourceCoordinator.giveUpPartition(leaderPartition); + } + } + + public void shutdown() { + shutdownRequested = true; + } + + private void init() { + LOG.info("Initializing RDS source service..."); + + // Create a Global state in the coordination table for the configuration. + // Global State here is designed to be able to read whenever needed + // So that the jobs can refer to the configuration. + sourceCoordinator.createPartition(new GlobalState(sourceConfig.getDbIdentifier(), null)); + + if (sourceConfig.isExportEnabled()) { + Instant startTime = Instant.now(); + LOG.debug("Export is enabled. Creating export partition in the source coordination store."); + createExportPartition(sourceConfig, startTime); + } + LOG.debug("Update initialization state"); + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + leaderProgressState.setInitialized(true); } + + private void createExportPartition(RdsSourceConfig sourceConfig, Instant exportTime) { + ExportProgressState progressState = new ExportProgressState(); + progressState.setIamRoleArn(sourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn()); + progressState.setBucket(sourceConfig.getS3Bucket()); + progressState.setPrefix(sourceConfig.getS3Prefix()); + progressState.setTables(sourceConfig.getTableNames()); + progressState.setKmsKeyId(sourceConfig.getExport().getKmsKeyId()); + progressState.setExportTime(exportTime.toString()); + ExportPartition exportPartition = new ExportPartition(sourceConfig.getDbIdentifier(), sourceConfig.isCluster(), progressState); + sourceCoordinator.createPartition(exportPartition); + } + } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatus.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatus.java new file mode 100644 index 0000000000..16fb91b7f4 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatus.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public enum ExportStatus { + CANCELED, + CANCELING, + COMPLETE, + FAILED, + IN_PROGRESS, + STARTING; + + private static final Map TYPES_MAP = Arrays.stream(ExportStatus.values()) + .collect(Collectors.toMap( + Enum::name, + value -> value + )); + private static final Set TERMINAL_STATUSES = Set.of(CANCELED, COMPLETE, FAILED); + + public static ExportStatus fromString(final String name) { + return TYPES_MAP.get(name); + } + + public static boolean isTerminal(final String name) { + ExportStatus status = fromString(name); + return status != null && TERMINAL_STATUSES.contains(status); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotInfo.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotInfo.java new file mode 100644 index 0000000000..11bd452497 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotInfo.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import java.time.Instant; + +public class SnapshotInfo { + + private final String snapshotId; + private final String snapshotArn; + private final Instant createTime; + private String status; + + public SnapshotInfo(String snapshotId, String snapshotArn, Instant createTime, String status) { + this.snapshotId = snapshotId; + this.snapshotArn = snapshotArn; + this.createTime = createTime; + this.status = status; + } + + public String getSnapshotId() { + return snapshotId; + } + + public String getSnapshotArn() { + return snapshotArn; + } + + public Instant getCreateTime() { + return createTime; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotStatus.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotStatus.java new file mode 100644 index 0000000000..a2d18f70f9 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/SnapshotStatus.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +public enum SnapshotStatus { + AVAILABLE("available"), + COPYING("copying"), + CREATING("creating"); + + private final String statusName; + + SnapshotStatus(final String statusName) { + this.statusName = statusName; + } + + public String getStatusName() { + return statusName; + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java index 218c23d121..6aaa0b0bd5 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java @@ -56,7 +56,6 @@ class RdsServiceTest { @BeforeEach void setUp() { when(clientFactory.buildRdsClient()).thenReturn(rdsClient); - } @Test diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactoryTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactoryTest.java new file mode 100644 index 0000000000..c092a8b48c --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactoryTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; + +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PartitionFactoryTest { + + @Mock + private SourcePartitionStoreItem partitionStoreItem; + + @Test + void given_leader_partition_item_then_create_leader_partition() { + PartitionFactory objectUnderTest = createObjectUnderTest(); + when(partitionStoreItem.getSourceIdentifier()).thenReturn(UUID.randomUUID() + "|" + LeaderPartition.PARTITION_TYPE); + when(partitionStoreItem.getPartitionProgressState()).thenReturn(null); + + assertThat(objectUnderTest.apply(partitionStoreItem), instanceOf(LeaderPartition.class)); + } + + @Test + void given_export_partition_item_then_create_export_partition() { + PartitionFactory objectUnderTest = createObjectUnderTest(); + when(partitionStoreItem.getSourceIdentifier()).thenReturn(UUID.randomUUID() + "|" + ExportPartition.PARTITION_TYPE); + when(partitionStoreItem.getSourcePartitionKey()).thenReturn(UUID.randomUUID() + "|" + UUID.randomUUID()); + when(partitionStoreItem.getPartitionProgressState()).thenReturn(null); + + assertThat(objectUnderTest.apply(partitionStoreItem), instanceOf(ExportPartition.class)); + } + + @Test + void given_store_item_of_undefined_type_then_create_global_state() { + PartitionFactory objectUnderTest = createObjectUnderTest(); + when(partitionStoreItem.getSourceIdentifier()).thenReturn(UUID.randomUUID() + "|" + UUID.randomUUID()); + when(partitionStoreItem.getSourcePartitionKey()).thenReturn(UUID.randomUUID().toString()); + when(partitionStoreItem.getPartitionProgressState()).thenReturn(null); + + assertThat(objectUnderTest.apply(partitionStoreItem), instanceOf(GlobalState.class)); + } + + private PartitionFactory createObjectUnderTest() { + return new PartitionFactory(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java new file mode 100644 index 0000000000..d0560ab30d --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotRequest; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotResponse; +import software.amazon.awssdk.services.rds.model.DBSnapshot; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsRequest; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsResponse; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; +import software.amazon.awssdk.services.rds.model.StartExportTaskRequest; +import software.amazon.awssdk.services.rds.model.StartExportTaskResponse; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +class ExportSchedulerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private RdsClient rdsClient; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ExportPartition exportPartition; + + @Mock(answer = Answers.RETURNS_DEFAULTS) + private ExportProgressState exportProgressState; + + private ExportScheduler exportScheduler; + + @BeforeEach + void setUp() { + exportScheduler = createObjectUnderTest(); + } + + @Test + void test_given_no_export_partition_then_not_export() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ExportPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verifyNoInteractions(rdsClient); + } + + @Test + void test_given_export_partition_and_task_id_then_complete_export() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); + when(exportPartition.getPartitionKey()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getExportTaskId()).thenReturn(UUID.randomUUID().toString()); + when(exportPartition.getProgressState()).thenReturn(Optional.of(exportProgressState)); + + DescribeExportTasksResponse describeExportTasksResponse = mock(DescribeExportTasksResponse.class, Mockito.RETURNS_DEEP_STUBS); + when(describeExportTasksResponse.exportTasks().get(0).status()).thenReturn("COMPLETE"); + when(rdsClient.describeExportTasks(any(DescribeExportTasksRequest.class))).thenReturn(describeExportTasksResponse); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ExportPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(sourceCoordinator).completePartition(exportPartition); + verify(rdsClient, never()).startExportTask(any(StartExportTaskRequest.class)); + verify(rdsClient, never()).createDBSnapshot(any(CreateDbSnapshotRequest.class)); + } + + + @Test + void test_given_export_partition_and_no_task_id_then_start_and_complete_export() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); + when(exportPartition.getPartitionKey()).thenReturn(UUID.randomUUID().toString()); + when(exportProgressState.getExportTaskId()).thenReturn(null).thenReturn(UUID.randomUUID().toString()); + when(exportPartition.getProgressState()).thenReturn(Optional.of(exportProgressState)); + final String dbIdentifier = UUID.randomUUID().toString(); + when(exportPartition.getDbIdentifier()).thenReturn(dbIdentifier); + + // Mock snapshot response + CreateDbSnapshotResponse createDbSnapshotResponse = mock(CreateDbSnapshotResponse.class); + DBSnapshot dbSnapshot = mock(DBSnapshot.class); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:snapshot-0b5ae174"; + when(dbSnapshot.dbSnapshotArn()).thenReturn(snapshotArn); + when(dbSnapshot.status()).thenReturn("creating").thenReturn("available"); + when(dbSnapshot.snapshotCreateTime()).thenReturn(Instant.now()); + when(createDbSnapshotResponse.dbSnapshot()).thenReturn(dbSnapshot); + when(rdsClient.createDBSnapshot(any(CreateDbSnapshotRequest.class))).thenReturn(createDbSnapshotResponse); + + DescribeDbSnapshotsResponse describeDbSnapshotsResponse = DescribeDbSnapshotsResponse.builder() + .dbSnapshots(dbSnapshot) + .build(); + when(rdsClient.describeDBSnapshots(any(DescribeDbSnapshotsRequest.class))).thenReturn(describeDbSnapshotsResponse); + + // Mock export response + StartExportTaskResponse startExportTaskResponse = mock(StartExportTaskResponse.class); + when(startExportTaskResponse.status()).thenReturn("STARTING"); + when(rdsClient.startExportTask(any(StartExportTaskRequest.class))).thenReturn(startExportTaskResponse); + + DescribeExportTasksResponse describeExportTasksResponse = mock(DescribeExportTasksResponse.class, Mockito.RETURNS_DEEP_STUBS); + when(describeExportTasksResponse.exportTasks().get(0).status()).thenReturn("COMPLETE"); + when(rdsClient.describeExportTasks(any(DescribeExportTasksRequest.class))).thenReturn(describeExportTasksResponse); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(ExportPartition.PARTITION_TYPE)); + Thread.sleep(200); + executorService.shutdownNow(); + + verify(rdsClient).createDBSnapshot(any(CreateDbSnapshotRequest.class)); + verify(rdsClient).startExportTask(any(StartExportTaskRequest.class)); + verify(sourceCoordinator).completePartition(exportPartition); + } + + @Test + void test_shutDown() { + lenient().when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(exportScheduler); + exportScheduler.shutdown(); + verifyNoMoreInteractions(sourceCoordinator, rdsClient); + executorService.shutdownNow(); + } + + private ExportScheduler createObjectUnderTest() { + return new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java new file mode 100644 index 0000000000..15a23277c7 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest; +import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; +import software.amazon.awssdk.services.rds.model.ExportTask; +import software.amazon.awssdk.services.rds.model.StartExportTaskRequest; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +class ExportTaskManagerTest { + + @Mock + private RdsClient rdsClient; + + private ExportTaskManager exportTaskManager; + + @BeforeEach + void setUp() { + exportTaskManager = createObjectUnderTest(); + } + + @ParameterizedTest + @MethodSource("provideStartExportTaskTestParameters") + void test_start_export_task(List exportOnly) { + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + UUID.randomUUID(); + final String iamRoleArn = "arn:aws:iam:us-east-1:123456789012:role:" + UUID.randomUUID(); + final String bucket = UUID.randomUUID().toString(); + final String prefix = UUID.randomUUID().toString(); + final String kmsKey = UUID.randomUUID().toString(); + + exportTaskManager.startExportTask(snapshotArn, iamRoleArn, bucket, prefix, kmsKey, exportOnly); + + final ArgumentCaptor exportTaskRequestArgumentCaptor = + ArgumentCaptor.forClass(StartExportTaskRequest.class); + + verify(rdsClient).startExportTask(exportTaskRequestArgumentCaptor.capture()); + + final StartExportTaskRequest actualRequest = exportTaskRequestArgumentCaptor.getValue(); + assertThat(actualRequest.sourceArn(), equalTo(snapshotArn)); + assertThat(actualRequest.iamRoleArn(), equalTo(iamRoleArn)); + assertThat(actualRequest.s3BucketName(), equalTo(bucket)); + assertThat(actualRequest.s3Prefix(), equalTo(prefix)); + assertThat(actualRequest.kmsKeyId(), equalTo(kmsKey)); + assertThat(actualRequest.exportOnly(), equalTo(exportOnly)); + } + + @Test + void test_check_export_status() { + final String exportTaskId = UUID.randomUUID().toString(); + DescribeExportTasksResponse describeExportTasksResponse = mock(DescribeExportTasksResponse.class); + when(describeExportTasksResponse.exportTasks()).thenReturn(List.of(ExportTask.builder().status("COMPLETE").build())); + when(rdsClient.describeExportTasks(any(DescribeExportTasksRequest.class))).thenReturn(describeExportTasksResponse); + + exportTaskManager.checkExportStatus(exportTaskId); + + final ArgumentCaptor exportTaskRequestArgumentCaptor = + ArgumentCaptor.forClass(DescribeExportTasksRequest.class); + + verify(rdsClient).describeExportTasks(exportTaskRequestArgumentCaptor.capture()); + + final DescribeExportTasksRequest actualRequest = exportTaskRequestArgumentCaptor.getValue(); + assertThat(actualRequest.exportTaskIdentifier(), equalTo(exportTaskId)); + } + + private static Stream provideStartExportTaskTestParameters() { + final String tableName1 = UUID.randomUUID().toString(); + final String tableName2 = UUID.randomUUID().toString(); + return Stream.of( + Arguments.of(List.of()), + Arguments.of(List.of(tableName1)), + Arguments.of(List.of(tableName1, tableName2)) + ); + } + + private ExportTaskManager createObjectUnderTest() { + return new ExportTaskManager(rdsClient); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManagerTest.java new file mode 100644 index 0000000000..bca52a5fdd --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/SnapshotManagerTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotInfo; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotRequest; +import software.amazon.awssdk.services.rds.model.CreateDbSnapshotResponse; +import software.amazon.awssdk.services.rds.model.DBSnapshot; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsRequest; +import software.amazon.awssdk.services.rds.model.DescribeDbSnapshotsResponse; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SnapshotManagerTest { + + @Mock + private RdsClient rdsClient; + + private SnapshotManager snapshotManager; + + @BeforeEach + void setUp() { + snapshotManager = createObjectUnderTest(); + } + + @Test + void test_create_snapshot_with_success() { + String dbInstanceId = UUID.randomUUID().toString(); + CreateDbSnapshotResponse createDbSnapshotResponse = mock(CreateDbSnapshotResponse.class); + DBSnapshot dbSnapshot = mock(DBSnapshot.class); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:snapshot-0b5ae174"; + final String status = "creating"; + final Instant createTime = Instant.now(); + when(dbSnapshot.dbSnapshotArn()).thenReturn(snapshotArn); + when(dbSnapshot.status()).thenReturn(status); + when(dbSnapshot.snapshotCreateTime()).thenReturn(createTime); + when(createDbSnapshotResponse.dbSnapshot()).thenReturn(dbSnapshot); + when(rdsClient.createDBSnapshot(any(CreateDbSnapshotRequest.class))).thenReturn(createDbSnapshotResponse); + + SnapshotInfo snapshotInfo = snapshotManager.createSnapshot(dbInstanceId); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(CreateDbSnapshotRequest.class); + verify(rdsClient).createDBSnapshot(argumentCaptor.capture()); + + CreateDbSnapshotRequest request = argumentCaptor.getValue(); + assertThat(request.dbInstanceIdentifier(), equalTo(dbInstanceId)); + + assertThat(snapshotInfo, notNullValue()); + assertThat(snapshotInfo.getSnapshotArn(), equalTo(snapshotArn)); + assertThat(snapshotInfo.getStatus(), equalTo(status)); + assertThat(snapshotInfo.getCreateTime(), equalTo(createTime)); + } + + @Test + void test_create_snapshot_throws_exception_then_returns_null() { + String dbInstanceId = UUID.randomUUID().toString(); + when(rdsClient.createDBSnapshot(any(CreateDbSnapshotRequest.class))).thenThrow(new RuntimeException("Error")); + + SnapshotInfo snapshotInfo = snapshotManager.createSnapshot(dbInstanceId); + + assertThat(snapshotInfo, equalTo(null)); + } + + @Test + void test_check_snapshot_status_returns_correct_result() { + DBSnapshot dbSnapshot = mock(DBSnapshot.class); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:snapshot-0b5ae174"; + final String status = "creating"; + final Instant createTime = Instant.now(); + when(dbSnapshot.dbSnapshotArn()).thenReturn(snapshotArn); + when(dbSnapshot.status()).thenReturn(status); + when(dbSnapshot.snapshotCreateTime()).thenReturn(createTime); + DescribeDbSnapshotsResponse describeDbSnapshotsResponse = mock(DescribeDbSnapshotsResponse.class); + when(describeDbSnapshotsResponse.dbSnapshots()).thenReturn(List.of(dbSnapshot)); + + final String snapshotId = UUID.randomUUID().toString(); + DescribeDbSnapshotsRequest describeDbSnapshotsRequest = DescribeDbSnapshotsRequest.builder() + .dbSnapshotIdentifier(snapshotId) + .build(); + when(rdsClient.describeDBSnapshots(describeDbSnapshotsRequest)).thenReturn(describeDbSnapshotsResponse); + + SnapshotInfo snapshotInfo = snapshotManager.checkSnapshotStatus(snapshotId); + + assertThat(snapshotInfo, notNullValue()); + assertThat(snapshotInfo.getSnapshotId(), equalTo(snapshotId)); + assertThat(snapshotInfo.getSnapshotArn(), equalTo(snapshotArn)); + assertThat(snapshotInfo.getStatus(), equalTo(status)); + assertThat(snapshotInfo.getCreateTime(), equalTo(createTime)); + } + + private SnapshotManager createObjectUnderTest() { + return new SnapshotManager(rdsClient); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java new file mode 100644 index 0000000000..e844cc0ff4 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.leader; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; + +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +class LeaderSchedulerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock(answer = Answers.RETURNS_DEFAULTS) + private RdsSourceConfig sourceConfig; + + @Mock + private LeaderPartition leaderPartition; + + @Mock + private LeaderProgressState leaderProgressState; + + private LeaderScheduler leaderScheduler; + + @BeforeEach + void setUp() { + leaderScheduler = createObjectUnderTest(); + + AwsAuthenticationConfig awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); + lenient().when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); + lenient().when(sourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); + ExportConfig exportConfig = mock(ExportConfig.class); + lenient().when(exportConfig.getKmsKeyId()).thenReturn(UUID.randomUUID().toString()); + lenient().when(sourceConfig.getExport()).thenReturn(exportConfig); + } + + @Test + void non_leader_node_should_not_perform_init() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(sourceCoordinator, never()).createPartition(any(GlobalState.class)); + verify(sourceCoordinator, never()).createPartition(any(ExportPartition.class)); + } + + @Test + void leader_node_should_perform_init_if_not_initialized() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.of(leaderPartition)); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(leaderProgressState)); + when(leaderProgressState.isInitialized()).thenReturn(false); + when(sourceConfig.isExportEnabled()).thenReturn(true); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(sourceCoordinator).createPartition(any(GlobalState.class)); + verify(sourceCoordinator).createPartition(any(ExportPartition.class)); + verify(sourceCoordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + } + + @Test + void leader_node_should_skip_init_if_initialized() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.of(leaderPartition)); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(leaderProgressState)); + when(leaderProgressState.isInitialized()).thenReturn(true); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(sourceCoordinator, never()).createPartition(any(GlobalState.class)); + verify(sourceCoordinator, never()).createPartition(any(ExportPartition.class)); + verify(sourceCoordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + } + + @Test + void test_shutDown() { + lenient().when(sourceCoordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + leaderScheduler.shutdown(); + verifyNoMoreInteractions(sourceCoordinator); + executorService.shutdownNow(); + } + + private LeaderScheduler createObjectUnderTest() { + return new LeaderScheduler(sourceCoordinator, sourceConfig); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatusTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatusTest.java new file mode 100644 index 0000000000..16ef0c0a1b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportStatusTest.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class ExportStatusTest { + + @ParameterizedTest + @EnumSource(ExportStatus.class) + void fromString_returns_expected_value(final ExportStatus status) { + assertThat(ExportStatus.fromString(status.name()), equalTo(status)); + } + + @ParameterizedTest + @ArgumentsSource(ProvideTerminalStatusTestData.class) + void test_is_terminal_returns_expected_result(final String status, final boolean expected_result) { + assertThat(ExportStatus.isTerminal(status), equalTo(expected_result)); + } + + static class ProvideTerminalStatusTestData implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + Arguments.of("COMPLETE", true), + Arguments.of("CANCELED", true), + Arguments.of("FAILED", true), + Arguments.of("CANCELING", false), + Arguments.of("IN_PROGRESS", false), + Arguments.of("STARTING", false), + Arguments.of("INVALID_STATUS", false), + Arguments.of(null, false) + ); + } + } +} diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 638b8246a5..d8ca855b13 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -23,7 +23,7 @@ dependencies { exclude group: 'org.eclipse.jetty' exclude group: 'org.apache.hadoop', module: 'hadoop-auth' } - implementation 'org.apache.parquet:parquet-avro:1.14.0' + implementation libs.parquet.avro implementation 'software.amazon.awssdk:apache-client' implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.9.22' implementation libs.commons.lang3 diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index 2a09ce3d90..b0209a5d08 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -27,7 +27,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' implementation 'org.xerial.snappy:snappy-java:1.1.10.5' - implementation 'org.apache.parquet:parquet-common:1.14.0' + implementation libs.parquet.common implementation 'dev.failsafe:failsafe:3.3.2' implementation 'org.apache.httpcomponents:httpcore:4.4.16' testImplementation libs.commons.lang3 @@ -45,11 +45,10 @@ dependencies { testImplementation project(':data-prepper-plugins:parquet-codecs') testImplementation project(':data-prepper-test-event') testImplementation libs.avro.core - testImplementation testLibs.hadoop.common - testImplementation 'org.apache.parquet:parquet-avro:1.14.0' - testImplementation 'org.apache.parquet:parquet-column:1.14.0' - testImplementation 'org.apache.parquet:parquet-common:1.14.0' - testImplementation 'org.apache.parquet:parquet-hadoop:1.14.0' + testImplementation libs.hadoop.common + testImplementation libs.parquet.avro + testImplementation libs.parquet.column + testImplementation libs.parquet.hadoop } test { diff --git a/settings.gradle b/settings.gradle index 64d86219ea..ca9fcfbdfb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -60,7 +60,12 @@ dependencyResolutionManagement { library('commons-io', 'commons-io', 'commons-io').version('2.15.1') library('commons-codec', 'commons-codec', 'commons-codec').version('1.16.0') library('commons-compress', 'org.apache.commons', 'commons-compress').version('1.24.0') - version('hadoop', '3.3.6') + version('parquet', '1.14.1') + library('parquet-common', 'org.apache.parquet', 'parquet-common').versionRef('parquet') + library('parquet-avro', 'org.apache.parquet', 'parquet-avro').versionRef('parquet') + library('parquet-column', 'org.apache.parquet', 'parquet-column').versionRef('parquet') + library('parquet-hadoop', 'org.apache.parquet', 'parquet-hadoop').versionRef('parquet') + version('hadoop', '3.4.0') library('hadoop-common', 'org.apache.hadoop', 'hadoop-common').versionRef('hadoop') library('hadoop-mapreduce', 'org.apache.hadoop', 'hadoop-mapreduce-client-core').versionRef('hadoop') version('avro', '1.11.3') @@ -74,7 +79,6 @@ dependencyResolutionManagement { version('awaitility', '4.2.0') version('spring', '5.3.28') version('slf4j', '2.0.6') - version('hadoop', '3.3.6') library('junit-core', 'org.junit.jupiter', 'junit-jupiter').versionRef('junit') library('junit-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') library('junit-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') @@ -88,7 +92,6 @@ dependencyResolutionManagement { library('awaitility', 'org.awaitility', 'awaitility').versionRef('awaitility') library('spring-test', 'org.springframework', 'spring-test').versionRef('spring') library('slf4j-simple', 'org.slf4j', 'slf4j-simple').versionRef('slf4j') - library('hadoop-common', 'org.apache.hadoop', 'hadoop-common').versionRef('hadoop') } } }