Skip to content

Commit

Permalink
Merge branch 'main' into fix/missed-exception-in-plugin-error
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 committed Oct 24, 2024
2 parents 6e1edaa + e9bffee commit 466014a
Show file tree
Hide file tree
Showing 694 changed files with 51,874 additions and 42,007 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje

| Maintainer | GitHub ID | Affiliation |
| -------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
Expand Down
70,369 changes: 31,336 additions & 39,033 deletions THIRD-PARTY

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ subprojects {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
testLogging {
exceptionFormat "full"
showStackTraces false
}
reports {
junitXml.required
html.required
Expand All @@ -265,7 +269,7 @@ subprojects {

configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
implementation platform('software.amazon.awssdk:bom:2.21.23')
implementation platform('software.amazon.awssdk:bom:2.25.11')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ dependencies {
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation libs.parquet.common
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.commons.lang3
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
testImplementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
public final class DataPrepperMarkers {
public static final Marker EVENT = MarkerFactory.getMarker("EVENT");
public static final Marker SENSITIVE = MarkerFactory.getMarker("SENSITIVE");
public static final Marker NOISY = MarkerFactory.getMarker("NOISY");

static {
EVENT.add(SENSITIVE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* dropped, etc)
*/
public interface AcknowledgementSet {
/**
* Adds an event handle to the acknowledgement set. Assigns initial reference
* count of 1.
*
* @param eventHandle event handle to be added
* @since 2.11
*/
void add(EventHandle eventHandle);

/**
* Adds an event to the acknowledgement set. Assigns initial reference
Expand All @@ -29,7 +37,9 @@ public interface AcknowledgementSet {
* @param event event to be added
* @since 2.2
*/
public void add(Event event);
default void add(Event event) {
add(event.getEventHandle());
}

/**
* Aquires a reference to the event by incrementing the reference
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation used in schema generation to define the names and corresponding values of other required
* configurations if the configuration represented by the annotated field/method takes non-null or true value.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD})
public @interface AlsoRequired {
/**
* Array of Required annotations, each representing a required property with its allowed values.
*/
Required[] values();

/**
* Annotation to represent a required property and its allowed values.
*/
@interface Required {
/**
* Name of the required property.
*/
String name();

/**
* Allowed values for the required property. The default value of {} means any non-null value is allowed.
*/
String[] allowedValues() default {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
public @interface DataPrepperPlugin {
String DEFAULT_DEPRECATED_NAME = "";

String DEFAULT_ALTERNATE_NAME = "";

/**
*
* @return Name of the plugin which should be unique for the type
Expand All @@ -46,6 +48,12 @@
*/
String deprecatedName() default DEFAULT_DEPRECATED_NAME;

/**
*
* @return Alternate name of the plugin which should be unique for the type
*/
String[] alternateNames() default {};

/**
* The class type for this plugin.
*
Expand All @@ -64,4 +72,16 @@
* @since 1.2
*/
Class<?> pluginConfigurationType() default PluginSetting.class;

/**
* Optional Packages to scan for Data Prepper DI components.
* Plugins provide this list if they want to use Dependency Injection in its module.
* Providing this value, implicitly assumes and initiates plugin specific isolated ApplicationContext.
* <p>
* The package names that spring context scans will be picked up by these marker classes.
*
* @return Array of classes to use for package scan.
* @since 2.2
*/
Class[] packagesToScan() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Use this annotation to provide example values for plugin configuration.
*
* @since 2.11
*/
@Documented
@Retention(RUNTIME)
@Target({FIELD})
public @interface ExampleValues {
/**
* One or more examples.
* @return the examples.
* @since 2.11
*/
Example[] value();

/**
* A single example.
*
* @since 2.11
*/
@interface Example {
/**
* The example value
* @return The example value
*
* @since 2.11
*/
String value();

/**
* A description of the example value.
*
* @since 2.11
*/
String description() default "";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotates a field that uses Data Prepper plugin config as its value.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface UsesDataPrepperPlugin {
/**
* The class type for this plugin.
*
* @return The Java class
* @since 1.2
*/
Class<?> pluginType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,81 @@
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
private String keyName;
private Collection<String> includeKeys;
private Collection<String> includeKeysMetadata;

public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata) {
this.keyName = keyName;
this.includeKeys = includeKeys;
this.includeKeysMetadata = includeKeysMetadata;
}

public JsonDecoder() {
this.keyName = null;
this.includeKeys = null;
this.includeKeysMetadata = null;
}

public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

Map<String, Object> includeKeysMap = new HashMap<>();
Map<String, Object> includeMetadataKeysMap = new HashMap<>();
while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
final String nodeName = jsonParser.currentName();

if (includeKeys != null && includeKeys.contains(nodeName) ||
(includeKeysMetadata != null && includeKeysMetadata.contains(nodeName))) {
jsonParser.nextToken();
if (includeKeys != null && includeKeys.contains(nodeName)) {
includeKeysMap.put(nodeName, jsonParser.getValueAsString());
}
if (includeKeysMetadata != null && includeKeysMetadata.contains(nodeName)) {
includeMetadataKeysMap.put(nodeName, jsonParser.getValueAsString());
}
continue;
}

if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, timeReceived, eventConsumer);
if (keyName != null && !nodeName.equals(keyName)) {
continue;
}
parseRecordsArray(jsonParser, timeReceived, eventConsumer, includeKeysMap, includeMetadataKeysMap);
}
}
}

private void parseRecordsArray(final JsonParser jsonParser, final Instant timeReceived, final Consumer<Record<Event>> eventConsumer) throws IOException {
private void parseRecordsArray(final JsonParser jsonParser,
final Instant timeReceived,
final Consumer<Record<Event>> eventConsumer,
final Map<String, Object> includeKeysMap,
final Map<String, Object> includeMetadataKeysMap
) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson, timeReceived);
for (final Map.Entry<String, Object> entry : includeKeysMap.entrySet()) {
record.getData().put(entry.getKey(), entry.getValue());
}

for (final Map.Entry<String, Object> entry : includeMetadataKeysMap.entrySet()) {
record.getData().getMetadata().setAttribute(entry.getKey(), entry.getValue());
}

eventConsumer.accept(record);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
@JsonPropertyOrder
@JsonClassDescription("The key-value pair defines routing condition, where the key is the name of a route and the " +
"value is a Data Prepper expression representing the routing condition.")
"value is an expression representing the routing condition.")
@JsonSerialize(using = ConditionalRoute.ConditionalRouteSerializer.class)
@JsonDeserialize(using = ConditionalRoute.ConditionalRouteDeserializer.class)
public class ConditionalRoute {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.regex.Pattern;

public class DataPrepperVersion {
private static final String CURRENT_VERSION = "2.10";
private static final String CURRENT_VERSION = "2.11";

private static final String FULL_FORMAT = "%d.%d";
private static final String SHORTHAND_FORMAT = "%d";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public boolean release(boolean result) {
return returnValue;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef : acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}
}
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public String getTypeName() {
}

@JsonCreator
static DataType fromTypeName(final String option) {
public static DataType fromTypeName(final String option) {
return TYPES_MAP.get(option);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() {
return acknowledgementSet != null;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}

@Override
public void acquireReference() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.event;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -45,4 +46,9 @@ public boolean shouldLog() {
static HandleFailedEventsOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
}

@JsonValue
public String toOptionValue() {
return option;
}
}
Loading

0 comments on commit 466014a

Please sign in to comment.