diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index c1c65fda49..daf820c6fe 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -2,6 +2,14 @@ name: Release Artifacts
on:
workflow_dispatch:
+ inputs:
+ release-major-tag:
+ description: 'Whether to create major tag of docker image or not. This will create a tag such as 2.3 which points to this version.'
+ required: true
+ release-latest-tag:
+ description: >
+ 'Whether to create latest tag of docker image or not. This will update the latest tag to point to this version. You should set this when releasing the latest version, but not patches to old versions.'
+ required: true
permissions:
id-token: write
@@ -95,3 +103,68 @@ jobs:
- name: Smoke Test Tarball Files
run: ./release/smoke-tests/run-tarball-files-smoke-tests.sh -v ${{ env.version }} -u ${{ secrets.ARCHIVES_PUBLIC_URL }} -n ${{ github.run_number }} -i ${{ matrix.image }} -t ${{ matrix.archive }}
+
+ protomote:
+ runs-on: ubuntu-latest
+ if: success() || failure()
+ needs: [build, validate-docker, validate-archive]
+ permissions:
+ contents: write
+ issues: write
+
+ steps:
+ - name: Checkout Data Prepper
+ uses: actions/checkout@v3
+ - name: Get Version
+ run: grep '^version=' gradle.properties >> $GITHUB_ENV
+
+ - name: Get Approvers
+ id: get_approvers
+ run: |
+ echo "approvers=$(cat .github/CODEOWNERS | grep @ | tr -d '* ' | sed 's/@/,/g' | sed 's/,//1')" >> $GITHUB_OUTPUT
+ - uses: trstringer/manual-approval@v1
+ with:
+ secret: ${{ github.TOKEN }}
+ approvers: ${{ steps.get_approvers.outputs.approvers }}
+ minimum-approvals: 2
+ issue-title: 'Release Data Prepper : ${{ env.version }}'
+ issue-body: >
+ Please approve or deny the release of Data Prepper.
+
+ **VERSION**: ${{ env.version }}
+
+ **BUILD NUMBER**: ${{ github.run_number }}
+
+ **RELEASE MAJOR TAG**: ${{ github.event.inputs.release-major-tag }}
+
+ **RELEASE LATEST TAG**: ${{ github.event.inputs.release-latest-tag }}
+
+ exclude-workflow-initiator-as-approver: false
+
+ - name: Create Release Description
+ run: |
+ echo 'version: ${{ env.version }}' > release-description.yaml
+ echo 'build_number: ${{ github.run_number }}' >> release-description.yaml
+ echo 'release_major_tag: ${{ github.event.inputs.release-major-tag }}' >> release-description.yaml
+ echo 'release_latest_tag: ${{ github.event.inputs.release-latest-tag }}' >> release-description.yaml
+
+ - name: Create tag
+ uses: actions/github-script@v6
+ with:
+ github-token: ${{ github.TOKEN }}
+ script: |
+ github.rest.git.createRef({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ ref: 'refs/tags/${{ env.version }}',
+ sha: context.sha
+ })
+
+ - name: Draft release
+ uses: softprops/action-gh-release@v1
+ with:
+ draft: true
+ name: '${{ env.version }}'
+ tag_name: 'refs/tags/${{ env.version }}'
+ files: |
+ release-description.yaml
diff --git a/build.gradle b/build.gradle
index 006e641354..efc059810d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -196,7 +196,7 @@ subprojects {
configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
- implementation platform('software.amazon.awssdk:bom:2.17.264')
+ implementation platform('software.amazon.awssdk:bom:2.20.67')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java
index 22280b7464..afb9b975fe 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java
@@ -58,6 +58,22 @@ public interface OutputCodec {
*/
String getExtension();
+ /**
+ * Returns true if this codec has an internal compression. That is, the entire
+ * {@link OutputStream} should not be compressed.
+ *
+ * When this value is true, sinks should not attempt to encrypt the final {@link OutputStream}
+ * at all.
+ *
+ * For example, Parquet compression happens within the file. Each column chunk
+ * is compressed independently.
+ *
+ * @return True if the compression is internal to the codec; false if whole-file compression is ok.
+ */
+ default boolean isCompressionInternal() {
+ return false;
+ }
+
default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java
index 328a84e586..936a5445e8 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java
@@ -12,13 +12,11 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
* Represents an extension of the {@link PluginModel} which is specific to Sink
@@ -120,10 +118,11 @@ private SinkInternalJsonModel(@JsonProperty("routes") final List routes,
private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : Collections.emptyList();
- this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList();
- this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList();
+ this.includeKeys = includeKeys != null ? includeKeys : Collections.emptyList();
+ this.excludeKeys = excludeKeys != null ? excludeKeys : Collections.emptyList();
this.tagsTargetKey = tagsTargetKey;
validateConfiguration();
+ validateKeys();
}
void validateConfiguration() {
@@ -132,24 +131,18 @@ void validateConfiguration() {
}
}
-
/**
- * Pre-processes a list of Keys and returns a sorted list
- * The keys must start with `/` and not end with `/`
- *
- * @param keys a list of raw keys
- * @return a sorted processed keys
+ * Validates both include and exclude keys if they contain /
*/
- private List preprocessingKeys(final List keys) {
- if (keys.contains("/")) {
- return new ArrayList<>();
- }
- List result = keys.stream()
- .map(k -> k.startsWith("/") ? k : "/" + k)
- .map(k -> k.endsWith("/") ? k.substring(0, k.length() - 1) : k)
- .collect(Collectors.toList());
- Collections.sort(result);
- return result;
+ private void validateKeys() {
+ includeKeys.forEach(key -> {
+ if(key.contains("/"))
+ throw new InvalidPluginConfigurationException("include_keys cannot contain /");
+ });
+ excludeKeys.forEach(key -> {
+ if(key.contains("/"))
+ throw new InvalidPluginConfigurationException("exclude_keys cannot contain /");
+ });
}
}
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java
index d9cc83c9cb..f784233d49 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java
@@ -7,6 +7,7 @@
import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
/**
* Data Prepper Output Codec Context class.
@@ -18,6 +19,7 @@ public class OutputCodecContext {
private final List includeKeys;
private final List excludeKeys;
+ private final Predicate inclusionPredicate;
public OutputCodecContext() {
this(null, Collections.emptyList(), Collections.emptyList());
@@ -28,6 +30,14 @@ public OutputCodecContext(String tagsTargetKey, List includeKeys, List includeKeys.contains(k);
+ } else if (excludeKeys != null && !excludeKeys.isEmpty()) {
+ inclusionPredicate = k -> !excludeKeys.contains(k);
+ } else {
+ inclusionPredicate = k -> true;
+ }
}
@@ -49,4 +59,8 @@ public List getIncludeKeys() {
public List getExcludeKeys() {
return excludeKeys;
}
+
+ public boolean shouldIncludeKey(String key) {
+ return inclusionPredicate.test(key);
+ }
}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java
index 4c70ef1b18..193f0ff196 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java
@@ -1,8 +1,8 @@
package org.opensearch.dataprepper.model.codec;
import com.fasterxml.jackson.core.JsonProcessingException;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
@@ -19,12 +19,17 @@
import java.util.Set;
import java.util.UUID;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
public class OutputCodecTest {
+ @Test
+ void isCompressionInternal_returns_false() {
+ OutputCodec objectUnderTest = mock(OutputCodec.class, InvocationOnMock::callRealMethod);
- @BeforeEach
- public void setUp() {
+ assertThat(objectUnderTest.isCompressionInternal(), equalTo(false));
}
@Test
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java
index a5fc6363cb..9d895ecd32 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java
@@ -144,27 +144,41 @@ void serialize_with_just_pluginModel() throws IOException {
}
@Test
- void sinkModel_with_include_keys() throws IOException {
+ void sinkModel_with_include_keys() {
final Map pluginSettings = new LinkedHashMap<>();
- final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "/abc", "efg/"), null, pluginSettings);
+ final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "abc", "efg"), null, pluginSettings);
assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList()));
- assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));
+ assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("bcd", "abc", "efg")));
}
@Test
- void sinkModel_with_exclude_keys() throws IOException {
+ void sinkModel_with_invalid_include_keys() {
final Map pluginSettings = new LinkedHashMap<>();
- final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);
+ assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/bcd"), List.of(), pluginSettings));
+ }
+
+ @Test
+ void sinkModel_with_exclude_keys() {
+ final Map pluginSettings = new LinkedHashMap<>();
+ final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), pluginSettings);
assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList()));
- assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));
+ assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("abc", "bcd", "efg")));
+
+ }
+ @Test
+ void sinkModel_with_invalid_exclude_keys() {
+ final Map pluginSettings = new LinkedHashMap<>();
+ assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), List.of("/bcd"), pluginSettings));
}
+
+
@Test
- void sinkModel_with_both_include_and_exclude_keys() throws IOException {
+ void sinkModel_with_both_include_and_exclude_keys() {
final Map pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings));
}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java
index db1ec8cf86..964559afff 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java
@@ -10,14 +10,13 @@
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;
public class OutputCodecContextTest {
-
-
@Test
public void testOutputCodecContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
@@ -32,8 +31,6 @@ public void testOutputCodecContextBasic() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));
-
-
}
@Test
@@ -53,7 +50,43 @@ public void testOutputCodecContextAdapter() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));
+ }
+
+ @Test
+ void shouldIncludeKey_returns_expected_when_no_include_exclude() {
+ OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, null);
+ assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
+ }
+
+ @Test
+ void shouldIncludeKey_returns_expected_when_empty_lists_for_include_exclude() {
+ OutputCodecContext objectUnderTest = new OutputCodecContext(null, Collections.emptyList(), Collections.emptyList());
+ assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
+ }
+
+ @Test
+ void shouldIncludeKey_returns_expected_when_includeKey() {
+ String includeKey1 = UUID.randomUUID().toString();
+ String includeKey2 = UUID.randomUUID().toString();
+ final List includeKeys = List.of(includeKey1, includeKey2);
+
+ OutputCodecContext objectUnderTest = new OutputCodecContext(null, includeKeys, null);
+
+ assertThat(objectUnderTest.shouldIncludeKey(includeKey1), equalTo(true));
+ assertThat(objectUnderTest.shouldIncludeKey(includeKey2), equalTo(true));
+ assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(false));
+ }
+
+ @Test
+ void shouldIncludeKey_returns_expected_when_excludeKey() {
+ String excludeKey1 = UUID.randomUUID().toString();
+ String excludeKey2 = UUID.randomUUID().toString();
+ final List excludeKeys = List.of(excludeKey1, excludeKey2);
+ OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, excludeKeys);
+ assertThat(objectUnderTest.shouldIncludeKey(excludeKey1), equalTo(false));
+ assertThat(objectUnderTest.shouldIncludeKey(excludeKey2), equalTo(false));
+ assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}
}
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java
index 827f3c9567..dc34bd5cd0 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/breaker/HeapCircuitBreaker.java
@@ -66,7 +66,7 @@ class HeapCircuitBreaker implements InnerCircuitBreaker, AutoCloseable {
scheduledExecutorService
.scheduleAtFixedRate(this::checkMemory, 0L, checkInterval.toMillis(), TimeUnit.MILLISECONDS);
- LOG.info("Heap circuit breaker with usage of {} bytes.", usageBytes);
+ LOG.info("Circuit breaker heap limit is set to {} bytes.", usageBytes);
}
@Override
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java
index f47a4da2ca..f41ab33d7c 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java
@@ -274,7 +274,7 @@ public synchronized void shutdown() {
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
- "proceeding with termination of process workers", name);
+ "proceeding with termination of process workers", name, ex);
}
shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");
diff --git a/data-prepper-plugins/aggregate-processor/README.md b/data-prepper-plugins/aggregate-processor/README.md
index a932c9a28d..2876aaa4dd 100644
--- a/data-prepper-plugins/aggregate-processor/README.md
+++ b/data-prepper-plugins/aggregate-processor/README.md
@@ -98,7 +98,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
The values in a list are merely appended, so there can be duplicates.
###
-* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped.
+* `count`: Count Events belonging to the same group and generate a new event with values of the identification keys and the count, indicating the number of events. All Events that make up the combined Event will be dropped. One of the events is added as exemplar. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null.
* It supports the following config options
* `count_key`: key name to use for storing the count, default name is `aggr._count`
* `start_time_key`: key name to use for storing the start time, default name is `aggr._start_time`
@@ -114,7 +114,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```
The following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
- {"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1"}
+ {"isMonotonic":true,"unit":"1","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","kind":"SUM","name":"count","description":"Number of events","startTime":"2022-12-02T19:29:51.245358486Z","time":"2022-12-02T19:30:15.247799684Z","value":3.0,"sourceIp":"127.0.0.1","destinationIp":"192.168.0.1", "exemplars":[{"time":"2022-12-02T19:29:51.245358486Z", "value": 1.0, "attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html"}, "spanId":null, "traceId":null}]}
```
If raw output format is used, the following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
@@ -130,7 +130,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```
###
-* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped.
+* `histogram`: Aggreates events belonging to the same group and generate a new event with values of the identification keys and histogram of the aggregated events based on a configured `key`. The histogram contains the number of events, sum, buckets, bucket counts, and optionally min and max of the values corresponding to the `key`. All events that make up the combined Event will be dropped. Events corresponding to min and max values are added as exemplars. If the aggregation is done on traces, then traceId and spanId are included in the exemplar, otherwise, spanId and traceId would be null.
* It supports the following config options
* `key`: name of the field in the events for which histogram needs to be generated
* `generated_key_prefix`: key prefix to be used for all the fields created in the aggregated event. This allows the user to make sure that the names of the histogram event does not conflict with the field names in the event
@@ -149,7 +149,7 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
```
The following Event will be created and processed by the rest of the pipeline when the group is concluded:
```json
- {"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency"}
+ {"max":0.55,"kind":"HISTOGRAM","buckets":[{"min":-3.4028234663852886E38,"max":0.0,"count":0},{"min":0.0,"max":0.25,"count":2},{"min":0.25,"max":0.50,"count":1},{"min":0.50,"max":3.4028234663852886E38,"count":1}],"count":4,"bucketCountsList":[0,2,1,1],"description":"Histogram of latency in the events","sum":1.15,"unit":"seconds","aggregationTemporality":"AGGREGATION_TEMPORALITY_DELTA","min":0.15,"bucketCounts":4,"name":"histogram","startTime":"2022-12-14T06:43:40.848762215Z","explicitBoundsCount":3,"time":"2022-12-14T06:44:04.852564623Z","explicitBounds":[0.0,0.25,0.5],"request":"/index.html","sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "key": "latency", "exemplars": [{"time":"2023-12-14T06:43:43.840684487Z","value":0.50,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html", "exemplar_id":"min"},"spanId":null,"traceId":null},{"time":"2022-12-14T06:43:43.844339030Z","value":0.55,"attributes":{"sourceIp":"127.0.0.1","destinationIp": "192.168.0.1", "request" : "/index.html","exemplar_id":"max"},"spanId":null,"traceId":null}]}
```
If raw output format is used, the following event will be created and processed by the rest of the pipeline when the group is concluded:
```json
diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java
index 61a4ecce90..382d1cc99b 100644
--- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java
+++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java
@@ -6,11 +6,14 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;
import org.opensearch.dataprepper.model.metric.JacksonSum;
+import org.opensearch.dataprepper.model.metric.Exemplar;
+import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
+import org.opensearch.dataprepper.model.trace.Span;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
@@ -19,6 +22,7 @@
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@@ -42,12 +46,14 @@ public class CountAggregateAction implements AggregateAction {
public final String startTimeKey;
public final String outputFormat;
private long startTimeNanos;
+ private List exemplarList;
@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
this.countKey = countAggregateActionConfig.getCountKey();
this.startTimeKey = countAggregateActionConfig.getStartTimeKey();
this.outputFormat = countAggregateActionConfig.getOutputFormat();
+ this.exemplarList = new ArrayList<>();
}
private long getTimeNanos(Instant time) {
@@ -56,6 +62,24 @@ private long getTimeNanos(Instant time) {
return currentTimeNanos;
}
+ public Exemplar createExemplar(final Event event) {
+ long curTimeNanos = getTimeNanos(Instant.now());
+ Map attributes = event.toMap();
+ String spanId = null;
+ String traceId = null;
+ if (event instanceof Span) {
+ Span span = (Span)event;
+ spanId = span.getSpanId();
+ traceId = span.getTraceId();
+ }
+ return new DefaultExemplar(
+ OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
+ 1.0,
+ spanId,
+ traceId,
+ attributes);
+ }
+
@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
@@ -63,6 +87,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
groupState.put(startTimeKey, Instant.now());
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(countKey, 1);
+ exemplarList.add(createExemplar(event));
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
groupState.put(countKey, v);
@@ -98,6 +123,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withUnit(SUM_METRIC_UNIT)
.withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name())
.withValue((double)countValue)
+ .withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)sum;
diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java
index f535a10e7c..6db82130ea 100644
--- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java
+++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java
@@ -7,6 +7,9 @@
import org.opensearch.dataprepper.model.metric.JacksonHistogram;
import org.opensearch.dataprepper.model.metric.Bucket;
+import org.opensearch.dataprepper.model.metric.Exemplar;
+import org.opensearch.dataprepper.model.trace.Span;
+import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
@@ -25,6 +28,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
+import java.util.Objects;
import java.util.List;
import java.util.HashMap;
import java.util.Arrays;
@@ -53,6 +57,11 @@ public class HistogramAggregateAction implements AggregateAction {
private final String key;
private final String units;
private final boolean recordMinMax;
+ private List exemplarList;
+ private Event minEvent;
+ private Event maxEvent;
+ private double minValue;
+ private double maxValue;
private long startTimeNanos;
private double[] buckets;
@@ -62,6 +71,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg
this.key = histogramAggregateActionConfig.getKey();
List bucketList = histogramAggregateActionConfig.getBuckets();
this.buckets = new double[bucketList.size()+2];
+ this.exemplarList = new ArrayList<>();
int bucketIdx = 0;
this.buckets[bucketIdx++] = -Float.MAX_VALUE;
for (int i = 0; i < bucketList.size(); i++) {
@@ -101,6 +111,26 @@ private double convertToDouble(Number value) {
return doubleValue;
}
+ public Exemplar createExemplar(final String id, final Event event, double value) {
+ long curTimeNanos = getTimeNanos(Instant.now());
+ Map attributes = event.toMap();
+ if (Objects.nonNull(id)) {
+ attributes.put("exemplarId", id);
+ }
+ String spanId = null;
+ String traceId = null;
+ if (event instanceof Span) {
+ Span span = (Span)event;
+ spanId = span.getSpanId();
+ traceId = span.getTraceId();
+ }
+ return new DefaultExemplar(OTelProtoCodec.convertUnixNanosToISO8601(curTimeNanos),
+ value,
+ spanId,
+ traceId,
+ attributes);
+ }
+
@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
@@ -126,6 +156,10 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
if (this.recordMinMax) {
groupState.put(minKey, doubleValue);
groupState.put(maxKey, doubleValue);
+ minEvent = event;
+ maxEvent = event;
+ minValue = doubleValue;
+ maxValue = doubleValue;
}
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
@@ -138,10 +172,14 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
double min = (double)groupState.get(minKey);
if (doubleValue < min) {
groupState.put(minKey, doubleValue);
+ minEvent = event;
+ minValue = doubleValue;
}
double max = (double)groupState.get(maxKey);
if (doubleValue > max) {
groupState.put(maxKey, doubleValue);
+ maxEvent = event;
+ maxValue = doubleValue;
}
}
}
@@ -159,6 +197,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
long startTimeNanos = getTimeNanos(startTime);
long endTimeNanos = getTimeNanos(endTime);
String histogramKey = HISTOGRAM_METRIC_NAME + "_key";
+ exemplarList.add(createExemplar("min", minEvent, minValue));
+ exemplarList.add(createExemplar("max", maxEvent, maxValue));
if (outputFormat.equals(OutputFormat.RAW.toString())) {
groupState.put(histogramKey, key);
groupState.put(durationKey, endTimeNanos-startTimeNanos);
@@ -203,6 +243,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withBuckets(buckets)
.withBucketCountsList(bucketCounts)
.withExplicitBoundsList(explicitBoundsList)
+ .withExemplars(exemplarList)
.withAttributes(attr)
.build(false);
event = (Event)histogram;
diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java
index 9e35507a2a..0f96a83c41 100644
--- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java
+++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java
@@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
+import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -108,5 +109,7 @@ void testCountAggregateOTelFormat(int testCount) {
assertThat(metric.toJsonString().indexOf("attributes"), not(-1));
assertThat(result.get(0).toMap(), hasKey("startTime"));
assertThat(result.get(0).toMap(), hasKey("time"));
+ List exemplars = (List )result.get(0).toMap().get("exemplars");
+ assertThat(exemplars.size(), equalTo(1));
}
}
diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java
index 6e9d783998..b2b498306b 100644
--- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java
+++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java
@@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
+import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -207,6 +208,8 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException
assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i)));
}
assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey));
+ List exemplars = (List )result.get(0).toMap().get("exemplars");
+ assertThat(exemplars.size(), equalTo(2));
assertThat(((Map)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue));
final String expectedDurationKey = histogramAggregateActionConfig.getDurationKey();
assertThat(((Map)result.get(0).toMap().get("attributes")), hasKey(expectedDurationKey));
diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java
index 0eb33f979f..28bba890b1 100644
--- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java
+++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java
@@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.anomalydetector;
+import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
@@ -17,7 +18,11 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -30,15 +35,19 @@ public class AnomalyDetectorProcessor extends AbstractProcessor, R
public static final String DEVIATION_KEY = "deviation_from_expected";
public static final String GRADE_KEY = "grade";
static final String NUMBER_RCF_INSTANCES = "RCFInstances";
+ static final String CARDINALITY_OVERFLOW = "cardinalityOverflow";
private final Boolean verbose;
+ private final int cardinalityLimit;
private final IdentificationKeysHasher identificationKeysHasher;
private final List keys;
private final PluginFactory pluginFactory;
private final HashMap forestMap;
private final AtomicInteger cardinality;
private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig;
-
+ private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorProcessor.class);
+ private final Counter cardinalityOverflowCounter;
+ Instant nextWarnTime = Instant.MIN;
@DataPrepperPluginConstructor
public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
super(pluginMetrics);
@@ -48,6 +57,8 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete
this.keys = anomalyDetectorProcessorConfig.getKeys();
this.verbose = anomalyDetectorProcessorConfig.getVerbose();
this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger());
+ this.cardinalityLimit = anomalyDetectorProcessorConfig.getCardinalityLimit();
+ this.cardinalityOverflowCounter = pluginMetrics.counter(CARDINALITY_OVERFLOW);
forestMap = new HashMap<>();
}
@@ -68,12 +79,20 @@ public Collection> doExecute(Collection> records) {
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
AnomalyDetectorMode forest = forestMap.get(identificationKeysMap.hashCode());
- if (Objects.isNull(forest)) {
+ if (Objects.nonNull(forest)) {
+ recordsOut.addAll(forest.handleEvents(List.of(record)));
+ } else if (forestMap.size() < cardinalityLimit) {
forest = loadAnomalyDetectorMode(pluginFactory);
forest.initialize(keys, verbose);
forestMap.put(identificationKeysMap.hashCode(), forest);
+ recordsOut.addAll(forest.handleEvents(List.of(record)));
+ } else {
+ if (Instant.now().isAfter(nextWarnTime)) {
+ LOG.warn("Cardinality limit reached, see cardinalityOverflow metric for count of skipped records");
+ nextWarnTime = Instant.now().plus(5, ChronoUnit.MINUTES);
+ }
+ cardinalityOverflowCounter.increment();
}
- recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record)));
}
cardinality.set(forestMap.size());
return recordsOut;
diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java
index 7e796e660a..6331ee1f21 100644
--- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java
+++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java
@@ -28,6 +28,9 @@ public class AnomalyDetectorProcessorConfig {
@JsonProperty("verbose")
private Boolean verbose = false;
+ @JsonProperty("cardinality_limit")
+ private int cardinalityLimit = 5000;
+
public PluginModel getDetectorMode() {
return detectorMode;
}
@@ -47,6 +50,9 @@ public List getIdentificationKeys() {
public boolean getVerbose() {
return verbose;
}
+ public int getCardinalityLimit() {
+ return cardinalityLimit;
+ }
}
diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java
index 302a692dd7..c7ab98a4d4 100644
--- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java
+++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java
@@ -55,6 +55,8 @@ public class AnomalyDetectorProcessorTests {
@Mock
private Counter recordsOut;
+ @Mock
+ private Counter cardinalityOverflow;
@Mock
private Timer timeElapsed;
@@ -77,6 +79,7 @@ void setUp() {
RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig();
when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration);
+ when(mockConfig.getCardinalityLimit()).thenReturn(5000);
when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class)))
@@ -215,6 +218,43 @@ void testAnomalyDetectorCardinality() {
}
+ @Test
+ void testAnomalyDetectorMaxCardinality() {
+ List identificationKeyList = new ArrayList();
+ identificationKeyList.add("ip");
+ when(mockConfig.getIdentificationKeys()).thenReturn(identificationKeyList);
+ when(mockConfig.getCardinalityLimit()).thenReturn(2);
+ when(pluginMetrics.counter(AnomalyDetectorProcessor.CARDINALITY_OVERFLOW)).thenReturn(cardinalityOverflow);
+
+ anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory);
+ final int numSamples = 1024;
+ final List> records = new ArrayList>();
+ for (int i = 0; i < numSamples; i++) {
+ if (i % 2 == 0) {
+ records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "1.1.1.1"));
+ } else {
+ records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "255.255.255.255"));
+ }
+ }
+ // Since limit is 2, the next two IPs will not have anomaly detection
+ for (int i = 0; i < numSamples; i++) {
+ if (i % 2 == 0) {
+ records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "2.2.2.2"));
+ } else {
+ records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "254.254.254.254"));
+ }
+ }
+
+ anomalyDetectorProcessor.doExecute(records);
+
+ final List> newIpOne = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1000.5, 1000.6), ThreadLocalRandom.current().nextLong(1000, 1110), "3.3.3.3")));
+ assertThat(newIpOne.size(), equalTo(0));
+
+ final List> newIpTwo = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1500.5, 1500.6), ThreadLocalRandom.current().nextLong(1000, 1110), "144.123.412.123")));
+ assertThat(newIpTwo.size(), equalTo(0));
+
+ }
+
static Record buildRecordWithEvent(final Map data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGenerator.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGenerator.java
new file mode 100644
index 0000000000..533fd60178
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGenerator.java
@@ -0,0 +1,91 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.opensearch.dataprepper.model.sink.OutputCodecContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generates an Avro schema from Event data.
+ */
+public class AvroAutoSchemaGenerator {
+ public Schema autoDetermineSchema(final Map data,
+ final OutputCodecContext outputCodecContext) {
+ return autoGenerateRecordSchema(data, outputCodecContext, "Event");
+ }
+
+ private Schema autoGenerateRecordSchema(final Map eventData, OutputCodecContext codecContext, String typeName) {
+ SchemaBuilder.FieldAssembler fieldsAssembler = SchemaBuilder.record(typeName).fields();
+ for (final String key : eventData.keySet()) {
+ if (codecContext != null && codecContext.getExcludeKeys().contains(key)) {
+ continue;
+ }
+ Schema schemaForValue = createSchemaForValue(key, eventData.get(key), codecContext);
+ fieldsAssembler = fieldsAssembler.name(key).type().unionOf()
+ .nullType()
+ .and()
+ .type(schemaForValue)
+ .endUnion()
+ .nullDefault();
+ }
+ return fieldsAssembler.endRecord();
+ }
+
+
+ private Schema createSchemaForValue(String key, final Object value, OutputCodecContext codecContext) {
+ if(value == null)
+ throw new SchemaGenerationException("Unable to auto-generate a schema because a provided value is null. key='" + key + "'.");
+ if (value instanceof String) {
+ return SchemaBuilder.builder().stringType();
+ } else if (value instanceof Long) {
+ return SchemaBuilder.builder().longType();
+ } else if (value instanceof Integer) {
+ return SchemaBuilder.builder().intType();
+ } else if (value instanceof Float) {
+ return SchemaBuilder.builder().floatType();
+ } else if (value instanceof Double) {
+ return SchemaBuilder.builder().doubleType();
+ } else if (value instanceof Boolean) {
+ return SchemaBuilder.builder().booleanType();
+ } else if (value instanceof Byte[] || value instanceof byte[]) {
+ return SchemaBuilder.builder().bytesType();
+ } else if (value instanceof Map) {
+ return autoGenerateRecordSchema((Map) value, codecContext, convertFieldNameToTypeName(key));
+ } else if (value instanceof List) {
+ List> listProvided = (List>) value;
+ if(listProvided.isEmpty()) {
+ throw new SchemaGenerationException("Cannot determine the element type for the Avro array because a provided list is empty and has no type information. key='" + key + "'.");
+ }
+ Object sampleElement = listProvided.get(0);
+ return SchemaBuilder.builder()
+ .array()
+ .items(nullableType(createSchemaForValue(null, sampleElement, codecContext)));
+ }
+ throw new SchemaGenerationException("Unable to auto-generate a schema for field '" +
+ key +
+ "' because the the type '" + value.getClass() + "' is not a recognized type for auto-generation.");
+ }
+
+ private Schema nullableType(Schema schema) {
+ return SchemaBuilder.unionOf()
+ .nullType()
+ .and()
+ .type(schema)
+ .endUnion();
+ }
+
+ private String convertFieldNameToTypeName(String fieldName) {
+ char startCharacter = fieldName.charAt(0);
+ if(Character.isAlphabetic(startCharacter)) {
+ startCharacter = Character.toUpperCase(startCharacter);
+ }
+
+ char[] typeChars = new char[fieldName.length()];
+ typeChars[0] = startCharacter;
+ fieldName.getChars(1, fieldName.length(), typeChars, 1);
+ return new String(typeChars);
+ }
+
+}
diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java
new file mode 100644
index 0000000000..6366bda3ef
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/AvroEventConverter.java
@@ -0,0 +1,58 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.opensearch.dataprepper.model.sink.OutputCodecContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts an Event into an Avro record.
+ *
+ * It might be a good idea to consolidate similar logic for input.
+ */
+public class AvroEventConverter {
+
+ private final SchemaChooser schemaChooser;
+
+ public AvroEventConverter() {
+ schemaChooser = new SchemaChooser();
+ }
+
+ public GenericRecord convertEventDataToAvro(final Schema schema,
+ final Map eventData,
+ OutputCodecContext codecContext) {
+ final GenericRecord avroRecord = new GenericData.Record(schema);
+ final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty();
+ for (final String key : eventData.keySet()) {
+ if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) {
+ continue;
+ }
+ final Schema.Field field = schema.getField(key);
+ if (field == null) {
+ throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema.");
+ }
+ final Object value = schemaMapper(field, eventData.get(key), codecContext);
+ avroRecord.put(key, value);
+ }
+ return avroRecord;
+ }
+
+ private Object schemaMapper(final Schema.Field field, final Object rawValue, OutputCodecContext codecContext) {
+ Schema providedSchema = schemaChooser.chooseSchema(field.schema());
+
+ if (providedSchema.getType() == Schema.Type.RECORD && rawValue instanceof Map) {
+ return convertEventDataToAvro(providedSchema, (Map) rawValue, codecContext);
+ } else if (providedSchema.getType() == Schema.Type.ARRAY && rawValue instanceof List) {
+ GenericData.Array avroArray =
+ new GenericData.Array<>(((List) rawValue).size(), providedSchema);
+ for (String element : ((List) rawValue)) {
+ avroArray.add(element);
+ }
+ return avroArray;
+ }
+ return rawValue;
+ }
+}
diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaChooser.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaChooser.java
new file mode 100644
index 0000000000..4ce0c99437
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaChooser.java
@@ -0,0 +1,21 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+
+class SchemaChooser {
+ Schema chooseSchema(Schema providedSchema) {
+ if(providedSchema.isNullable()) {
+ return getFirstNonNullable(providedSchema);
+ }
+
+ return providedSchema;
+ }
+
+ private Schema getFirstNonNullable(Schema providedSchema) {
+ return providedSchema.getTypes()
+ .stream()
+ .filter(s -> s.getType() != Schema.Type.NULL)
+ .findFirst()
+ .orElse(providedSchema);
+ }
+}
diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaGenerationException.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaGenerationException.java
new file mode 100644
index 0000000000..1940370ab7
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/avro/SchemaGenerationException.java
@@ -0,0 +1,7 @@
+package org.opensearch.dataprepper.avro;
+
+public class SchemaGenerationException extends RuntimeException {
+ SchemaGenerationException(String message) {
+ super(message);
+ }
+}
diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java
index fc5e298be1..f9ce073c2a 100644
--- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java
+++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java
@@ -6,10 +6,11 @@
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
+import org.opensearch.dataprepper.avro.AvroAutoSchemaGenerator;
+import org.opensearch.dataprepper.avro.AvroEventConverter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
@@ -20,9 +21,6 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -32,14 +30,12 @@
*/
@DataPrepperPlugin(name = "avro", pluginType = OutputCodec.class, pluginConfigurationType = AvroOutputCodecConfig.class)
public class AvroOutputCodec implements OutputCodec {
-
- private static final List PRIMITIVE_TYPES = Arrays.asList("int", "long", "string", "float", "double", "bytes");
private static final Logger LOG = LoggerFactory.getLogger(AvroOutputCodec.class);
private static final String AVRO = "avro";
- private static final String BASE_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"AvroRecords\",\"fields\":[";
- private static final String END_SCHEMA_STRING = "]}";
private final AvroOutputCodecConfig config;
+ private final AvroEventConverter avroEventConverter;
+ private final AvroAutoSchemaGenerator avroAutoSchemaGenerator;
private DataFileWriter dataFileWriter;
private Schema schema;
@@ -51,6 +47,9 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
+ avroEventConverter = new AvroEventConverter();
+ avroAutoSchemaGenerator = new AvroAutoSchemaGenerator();
+
if (config.getSchema() != null) {
schema = parseSchema(config.getSchema());
} else if (config.getFileLocation() != null) {
@@ -76,64 +75,14 @@ public void start(final OutputStream outputStream, final Event event, final Outp
}
public Schema buildInlineSchemaFromEvent(final Event event) throws IOException {
+ final Map data;
if (codecContext != null && codecContext.getTagsTargetKey() != null) {
- return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap(), false));
+ data = addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap();
} else {
- return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false));
+ data = event.toMap();
}
- }
- private String buildSchemaStringFromEventMap(final Map eventData, boolean nestedRecordFlag) {
- final StringBuilder builder = new StringBuilder();
- int nestedRecordIndex = 1;
- if (!nestedRecordFlag) {
- builder.append(BASE_SCHEMA_STRING);
- } else {
- builder.append("{\"type\":\"record\",\"name\":\"" + "NestedRecord" + nestedRecordIndex + "\",\"fields\":[");
- nestedRecordIndex++;
- }
- String fields;
- int index = 0;
- for (final String key : eventData.keySet()) {
- if (codecContext != null && codecContext.getExcludeKeys().contains(key)) {
- continue;
- }
- if (index == 0) {
- if (!(eventData.get(key) instanceof Map)) {
- fields = "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}";
- } else {
- fields = "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}";
- }
- } else {
- if (!(eventData.get(key) instanceof Map)) {
- fields = "," + "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}";
- } else {
- fields = "," + "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}";
- }
- }
- builder.append(fields);
- index++;
- }
- builder.append(END_SCHEMA_STRING);
- return builder.toString();
- }
-
- private String typeMapper(final Object value) {
- if (value instanceof Integer || value.getClass().equals(int.class)) {
- return "int";
- } else if (value instanceof Float || value.getClass().equals(float.class)) {
- return "float";
- } else if (value instanceof Double || value.getClass().equals(double.class)) {
- return "double";
- } else if (value instanceof Long || value.getClass().equals(long.class)) {
- return "long";
- } else if (value instanceof Byte[]) {
- return "bytes";
- } else if (value instanceof Map) {
- return buildSchemaStringFromEventMap((Map) value, true);
- } else {
- return "string";
- }
+ return avroAutoSchemaGenerator.autoDetermineSchema(data, codecContext);
}
@Override
@@ -145,13 +94,14 @@ public void complete(final OutputStream outputStream) throws IOException {
@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
+ final Map data;
if (codecContext.getTagsTargetKey() != null) {
- final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap());
- dataFileWriter.append(avroRecord);
+ data = addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap();
} else {
- final GenericRecord avroRecord = buildAvroRecord(schema, event.toMap());
- dataFileWriter.append(avroRecord);
+ data = event.toMap();
}
+ final GenericRecord avroRecord = avroEventConverter.convertEventDataToAvro(schema, data, codecContext);
+ dataFileWriter.append(avroRecord);
}
@Override
@@ -169,65 +119,6 @@ Schema parseSchema(final String schemaString) {
}
}
- private GenericRecord buildAvroRecord(final Schema schema, final Map eventData) {
- final GenericRecord avroRecord = new GenericData.Record(schema);
- final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty();
- for (final String key : eventData.keySet()) {
- if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) {
- continue;
- }
- final Schema.Field field = schema.getField(key);
- if (field == null) {
- throw new RuntimeException("The event has a key ('" + key + "') which is not included in the schema.");
- }
- final Object value = schemaMapper(field, eventData.get(key));
- avroRecord.put(key, value);
- }
- return avroRecord;
- }
-
- private Object schemaMapper(final Schema.Field field, final Object rawValue) {
- Object finalValue = null;
- final String fieldType = field.schema().getType().name().toLowerCase();
- if (PRIMITIVE_TYPES.contains(fieldType)) {
- switch (fieldType) {
- case "string":
- finalValue = rawValue.toString();
- break;
- case "int":
- finalValue = Integer.parseInt(rawValue.toString());
- break;
- case "float":
- finalValue = Float.parseFloat(rawValue.toString());
- break;
- case "double":
- finalValue = Double.parseDouble(rawValue.toString());
- break;
- case "long":
- finalValue = Long.parseLong(rawValue.toString());
- break;
- case "bytes":
- finalValue = rawValue.toString().getBytes(StandardCharsets.UTF_8);
- break;
- default:
- LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType);
- break;
- }
- } else {
- if (fieldType.equals("record") && rawValue instanceof Map) {
- finalValue = buildAvroRecord(field.schema(), (Map) rawValue);
- } else if (fieldType.equals("array") && rawValue instanceof List) {
- GenericData.Array avroArray =
- new GenericData.Array<>(((List) rawValue).size(), field.schema());
- for (String element : ((List) rawValue)) {
- avroArray.add(element);
- }
- finalValue = avroArray;
- }
- }
- return finalValue;
- }
-
private boolean checkS3SchemaValidity() {
return config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null;
}
diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java
new file mode 100644
index 0000000000..9cdb5c9d69
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java
@@ -0,0 +1,201 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.mockito.Mock;
+import org.opensearch.dataprepper.model.sink.OutputCodecContext;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+
+class AvroAutoSchemaGeneratorTest {
+ @Mock
+ private OutputCodecContext outputCodecContext;
+
+ private AvroAutoSchemaGenerator createObjectUnderTest() {
+ return new AvroAutoSchemaGenerator();
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class)
+ void autoDetermineSchema_with_primitive_type_returns_nullable_of_that_type(Object value, Schema.Type expectedType) {
+ String key = randomAvroName();
+
+ Schema schema = createObjectUnderTest().autoDetermineSchema(Map.of(key, value), outputCodecContext);
+
+ assertThat(schema, notNullValue());
+ assertThat(schema.getName(), equalTo("Event"));
+ assertThat(schema.getFields(), notNullValue());
+ assertThat(schema.getFields().size(), equalTo(1));
+ Schema.Field field = schema.getField(key);
+ assertThat(field, notNullValue());
+ assertThat(field.defaultVal(), equalTo(Schema.NULL_VALUE));
+ assertThat(field.schema(), notNullValue());
+ assertThat(field.schema().isNullable(), equalTo(true));
+ assertThat(field.schema().isUnion(), equalTo(true));
+ assertThat(field.schema().getTypes(), notNullValue());
+ assertThat(field.schema().getTypes().size(), equalTo(2));
+ assertThat(field.schema().getTypes().get(0), notNullValue());
+ assertThat(field.schema().getTypes().get(0).getType(), equalTo(Schema.Type.NULL));
+ assertThat(field.schema().getTypes().get(1), notNullValue());
+ assertThat(field.schema().getTypes().get(1).getType(), equalTo(expectedType));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class)
+ void autoDetermineSchema_with_map_returns_nullable_record_with_nullable_primitives(Object primitiveType, Schema.Type expectedType) {
+ String recordKey = randomAvroName();
+ String innerKey = randomAvroName();
+
+ Map inputMap = Map.of(recordKey, Map.of(innerKey, primitiveType));
+ Schema actualEventSchema = createObjectUnderTest().autoDetermineSchema(inputMap, outputCodecContext);
+
+ assertThat(actualEventSchema, notNullValue());
+ assertThat(actualEventSchema.getName(), equalTo("Event"));
+ assertThat(actualEventSchema.getFields(), notNullValue());
+ assertThat(actualEventSchema.getFields().size(), equalTo(1));
+ Schema.Field field = actualEventSchema.getField(recordKey);
+ assertThat(field, notNullValue());
+ assertThat(field.defaultVal(), equalTo(Schema.NULL_VALUE));
+ assertThat(field.schema(), notNullValue());
+ assertThat(field.schema().isNullable(), equalTo(true));
+ assertThat(field.schema().isUnion(), equalTo(true));
+ assertThat(field.schema().getTypes(), notNullValue());
+ assertThat(field.schema().getTypes().size(), equalTo(2));
+ assertThat(field.schema().getTypes().get(0), notNullValue());
+ assertThat(field.schema().getTypes().get(0).getType(), equalTo(Schema.Type.NULL));
+ Schema actualRecordSchema = field.schema().getTypes().get(1);
+ assertThat(actualRecordSchema, notNullValue());
+ assertThat(actualRecordSchema.getType(), equalTo(Schema.Type.RECORD));
+
+ assertThat(actualRecordSchema, notNullValue());
+ assertThat(actualRecordSchema.getName(), equalTo(recordKey.replaceFirst("a", "A")));
+ assertThat(actualRecordSchema.getFields(), notNullValue());
+ assertThat(actualRecordSchema.getFields().size(), equalTo(1));
+ Schema.Field actualInnerField = actualRecordSchema.getField(innerKey);
+ assertThat(actualInnerField, notNullValue());
+ assertThat(actualInnerField.defaultVal(), equalTo(Schema.NULL_VALUE));
+ assertThat(actualInnerField.schema(), notNullValue());
+ assertThat(actualInnerField.schema().isNullable(), equalTo(true));
+ assertThat(actualInnerField.schema().isUnion(), equalTo(true));
+ assertThat(actualInnerField.schema().getTypes(), notNullValue());
+ assertThat(actualInnerField.schema().getTypes().size(), equalTo(2));
+ assertThat(actualInnerField.schema().getTypes().get(0), notNullValue());
+ assertThat(actualInnerField.schema().getTypes().get(0).getType(), equalTo(Schema.Type.NULL));
+ assertThat(actualInnerField.schema().getTypes().get(1), notNullValue());
+ assertThat(actualInnerField.schema().getTypes().get(1).getType(), equalTo(expectedType));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class)
+ void autoDetermineSchema_with_list_returns_nullable_array_with_nullable_primitives(Object primitiveType, Schema.Type expectedType) {
+ String arrayKey = randomAvroName();
+
+ Map inputMap = Map.of(arrayKey, List.of(primitiveType));
+ Schema actualEventSchema = createObjectUnderTest().autoDetermineSchema(inputMap, outputCodecContext);
+
+ assertThat(actualEventSchema, notNullValue());
+ assertThat(actualEventSchema.getName(), equalTo("Event"));
+ assertThat(actualEventSchema.getFields(), notNullValue());
+ assertThat(actualEventSchema.getFields().size(), equalTo(1));
+ Schema.Field field = actualEventSchema.getField(arrayKey);
+ assertThat(field, notNullValue());
+ assertThat(field.defaultVal(), equalTo(Schema.NULL_VALUE));
+ assertThat(field.schema(), notNullValue());
+ assertThat(field.schema().isNullable(), equalTo(true));
+ assertThat(field.schema().isUnion(), equalTo(true));
+ assertThat(field.schema().getTypes(), notNullValue());
+ assertThat(field.schema().getTypes().size(), equalTo(2));
+ assertThat(field.schema().getTypes().get(0), notNullValue());
+ assertThat(field.schema().getTypes().get(0).getType(), equalTo(Schema.Type.NULL));
+ Schema actualArraySchema = field.schema().getTypes().get(1);
+ assertThat(actualArraySchema, notNullValue());
+ assertThat(actualArraySchema.getType(), equalTo(Schema.Type.ARRAY));
+
+ assertThat(actualArraySchema, notNullValue());
+ Schema actualElementType = actualArraySchema.getElementType();
+ assertThat(actualElementType, notNullValue());
+ assertThat(actualElementType.isNullable(), equalTo(true));
+ assertThat(actualElementType.isUnion(), equalTo(true));
+ assertThat(actualElementType.getTypes(), notNullValue());
+ assertThat(actualElementType.getTypes().size(), equalTo(2));
+ assertThat(actualElementType.getTypes().get(0), notNullValue());
+ assertThat(actualElementType.getTypes().get(0).getType(), equalTo(Schema.Type.NULL));
+ assertThat(actualElementType.getTypes().get(1), notNullValue());
+ assertThat(actualElementType.getTypes().get(1).getType(), equalTo(expectedType));
+ }
+
+ @Test
+ void autoDetermineSchema_with_empty_list_throws() {
+ String arrayKey = randomAvroName();
+
+ Map inputMap = Map.of(arrayKey, List.of());
+ AvroAutoSchemaGenerator objectUnderTest = createObjectUnderTest();
+ SchemaGenerationException actualException = assertThrows(SchemaGenerationException.class, () -> objectUnderTest.autoDetermineSchema(inputMap, outputCodecContext));
+
+ assertThat(actualException.getMessage(), notNullValue());
+ assertThat(actualException.getMessage(), containsString(arrayKey));
+ }
+
+ @Test
+ void autoDetermineSchema_with_null_value_throws() {
+ String fieldKey = randomAvroName();
+
+ Map inputMap = Collections.singletonMap(fieldKey, null);
+ AvroAutoSchemaGenerator objectUnderTest = createObjectUnderTest();
+ SchemaGenerationException actualException = assertThrows(SchemaGenerationException.class, () -> objectUnderTest.autoDetermineSchema(inputMap, outputCodecContext));
+
+ assertThat(actualException.getMessage(), notNullValue());
+ assertThat(actualException.getMessage(), containsString(fieldKey));
+ assertThat(actualException.getMessage(), containsString("null"));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(SomeUnknownTypesArgumentsProvider.class)
+ void autoDetermineSchema_with_unknown_type_throws(Class> unknownType) {
+ Object value = mock(unknownType);
+ String fieldKey = randomAvroName();
+
+ AvroAutoSchemaGenerator objectUnderTest = createObjectUnderTest();
+ Map inputMap = Map.of(fieldKey, value);
+ SchemaGenerationException actualException = assertThrows(SchemaGenerationException.class, () -> objectUnderTest.autoDetermineSchema(inputMap, outputCodecContext));
+
+ assertThat(actualException.getMessage(), notNullValue());
+ assertThat(actualException.getMessage(), containsString(fieldKey));
+ assertThat(actualException.getMessage(), containsString(value.getClass().toString()));
+ }
+
+ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ return Stream.of(
+ arguments(Random.class),
+ arguments(InputStream.class),
+ arguments(File.class)
+ );
+ }
+ }
+
+ private static String randomAvroName() {
+ return "a" + UUID.randomUUID().toString().replaceAll("-", "");
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java
new file mode 100644
index 0000000000..bbe5c20991
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroEventConverterTest.java
@@ -0,0 +1,53 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.dataprepper.model.sink.OutputCodecContext;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AvroEventConverterTest {
+ @Mock
+ private Schema schema;
+
+ @Mock
+ private OutputCodecContext codecContext;
+
+ @BeforeEach
+ void setUp() {
+ when(schema.getType()).thenReturn(Schema.Type.RECORD);
+ }
+
+ private AvroEventConverter createObjectUnderTest() {
+ return new AvroEventConverter();
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveClassesToTypesArgumentsProvider.class)
+ void convertEventDataToAvro_does_not_need_to_getField_on_empty_map() {
+ Map data = Collections.emptyMap();
+ GenericRecord actualRecord = createObjectUnderTest().convertEventDataToAvro(schema, data, codecContext);
+
+ assertThat(actualRecord, notNullValue());
+ assertThat(actualRecord.getSchema(), equalTo(schema));
+
+ verify(schema, never()).getField(anyString());
+ }
+
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/PrimitiveClassesToTypesArgumentsProvider.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/PrimitiveClassesToTypesArgumentsProvider.java
new file mode 100644
index 0000000000..ad996c6a2b
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/PrimitiveClassesToTypesArgumentsProvider.java
@@ -0,0 +1,31 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+class PrimitiveClassesToTypesArgumentsProvider implements ArgumentsProvider {
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ byte[] bytes = new byte[10];
+ RANDOM.nextBytes(bytes);
+ return Stream.of(
+ arguments(UUID.randomUUID().toString(), Schema.Type.STRING),
+ arguments(RANDOM.nextInt(10_000), Schema.Type.INT),
+ arguments(RANDOM.nextLong(), Schema.Type.LONG),
+ arguments(RANDOM.nextFloat(), Schema.Type.FLOAT),
+ arguments(RANDOM.nextDouble(), Schema.Type.DOUBLE),
+ arguments(RANDOM.nextBoolean(), Schema.Type.BOOLEAN),
+ arguments(bytes, Schema.Type.BYTES)
+ );
+ }
+}
diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaChooserTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaChooserTest.java
new file mode 100644
index 0000000000..bc2971e5bb
--- /dev/null
+++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/SchemaChooserTest.java
@@ -0,0 +1,130 @@
+package org.opensearch.dataprepper.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaChooserTest {
+ private SchemaChooser createObjectUnderTest() {
+ return new SchemaChooser();
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_should_return_non_nullable_when_nullable_with_primitives(Schema.Type primitiveType) {
+ Schema innerSchema = SchemaBuilder.builder().type(primitiveType.getName());
+ Schema schema = SchemaBuilder
+ .nullable()
+ .type(innerSchema);
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(schema);
+ assertThat(actualSchema, notNullValue());
+ assertThat(actualSchema.getType(), equalTo(primitiveType));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_should_return_non_nullable_when_nullable_with_record(Schema.Type fieldType) {
+ Schema innerSchema = SchemaBuilder.builder().record(randomAvroName())
+ .fields()
+ .name(randomAvroName()).type(fieldType.getName()).noDefault()
+ .endRecord();
+ Schema schema = SchemaBuilder
+ .nullable()
+ .type(innerSchema);
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(schema);
+ assertThat(actualSchema, notNullValue());
+ assertThat(actualSchema.getType(), equalTo(Schema.Type.RECORD));
+ assertThat(actualSchema.getName(), equalTo(innerSchema.getName()));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_should_return_non_nullable_when_nullable_with_array(Schema.Type itemType) {
+ Schema innerSchema = SchemaBuilder.builder()
+ .array()
+ .items(SchemaBuilder.builder().type(itemType.getName()));
+
+ Schema schema = SchemaBuilder
+ .nullable()
+ .type(innerSchema);
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(schema);
+ assertThat(actualSchema, notNullValue());
+ assertThat(actualSchema.getType(), equalTo(Schema.Type.ARRAY));
+ assertThat(actualSchema.getElementType(), notNullValue());
+ assertThat(actualSchema.getElementType().getType(), equalTo(itemType));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_with_non_nullable_returns_input_with_non_nullable_primitive_types(Schema.Type primitiveType) {
+ Schema inputSchema = SchemaBuilder.builder().type(primitiveType.getName());
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(inputSchema);
+ assertThat(actualSchema, notNullValue());
+ assertThat(actualSchema.getType(), equalTo(primitiveType));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_with_non_nullable_returns_input_with_non_nullable_record(Schema.Type fieldType) {
+ Schema schema = SchemaBuilder.builder().record(randomAvroName())
+ .fields()
+ .name(randomAvroName()).type(fieldType.getName()).noDefault()
+ .endRecord();
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(schema);
+ assertThat(actualSchema, equalTo(schema));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(PrimitiveTypesArgumentsProvider.class)
+ void chooseSchema_with_non_nullable_returns_input_with_non_nullable_array(Schema.Type itemType) {
+ Schema schema = SchemaBuilder.builder()
+ .array()
+ .items(SchemaBuilder.builder().type(itemType.getName()));
+
+
+ Schema actualSchema = createObjectUnderTest().chooseSchema(schema);
+ assertThat(actualSchema, notNullValue());
+ assertThat(actualSchema.getType(), equalTo(Schema.Type.ARRAY));
+ assertThat(actualSchema.getElementType(), notNullValue());
+ assertThat(actualSchema.getElementType().getType(), equalTo(itemType));
+ }
+
+ private static String randomAvroName() {
+ return "a" + UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ static class PrimitiveTypesArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ return Stream.of(
+ arguments(Schema.Type.STRING),
+ arguments(Schema.Type.INT),
+ arguments(Schema.Type.LONG),
+ arguments(Schema.Type.FLOAT),
+ arguments(Schema.Type.DOUBLE),
+ arguments(Schema.Type.BOOLEAN),
+ arguments(Schema.Type.BYTES)
+ );
+ }
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java
index f632bb88dc..6e297eba6c 100644
--- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java
+++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java
@@ -7,23 +7,21 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
-import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
-import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,29 +29,35 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AvroOutputCodecTest {
- private static final String EXPECTED_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"AvroRecords\",\"fields\":[{\"name\"" +
- ":\"name\",\"type\":\"string\"},{\"name\":\"nestedRecord\",\"type\":{\"type\":\"record\",\"name\":" +
- "\"NestedRecord1\",\"fields\":[{\"name\":\"secondFieldInNestedRecord\",\"type\":\"int\"},{\"name\":\"" +
- "firstFieldInNestedRecord\",\"type\":\"string\"}]}},{\"name\":\"age\",\"type\":\"int\"}]}";
+ private static final String EXPECTED_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Event\",\"fields\":" +
+ "[{\"name\":\"myDouble\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"myLong\",\"type\":[\"null\",\"long\"],\"default\":null}," +
+ "{\"name\":\"myArray\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"string\"]}],\"default\":null}," +
+ "{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," +
+ "{\"name\":\"nestedRecord\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"NestedRecord\",\"fields\":[{\"name\":\"secondFieldInNestedRecord\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"firstFieldInNestedRecord\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null}," +
+ "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"myFloat\",\"type\":[\"null\",\"float\"],\"default\":null}]}";
+ public static final int TOTAL_TOP_LEVEL_FIELDS = 7;
private AvroOutputCodecConfig config;
private ByteArrayOutputStream outputStream;
- private static int numberOfRecords;
-
@BeforeEach
void setUp() {
config = new AvroOutputCodecConfig();
- config.setSchema(parseSchema().toString());
+ config.setSchema(createStandardSchema().toString());
}
private AvroOutputCodec createObjectUnderTest() {
@@ -62,7 +66,7 @@ private AvroOutputCodec createObjectUnderTest() {
@Test
void constructor_throws_if_schema_is_invalid() {
- String invalidSchema = parseSchema().toString().replaceAll(",", ";");
+ String invalidSchema = createStandardSchema().toString().replaceAll(",", ";");
config.setSchema(invalidSchema);
RuntimeException actualException = assertThrows(RuntimeException.class, this::createObjectUnderTest);
@@ -75,39 +79,132 @@ void constructor_throws_if_schema_is_invalid() {
@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
void test_happy_case(final int numberOfRecords) throws Exception {
- AvroOutputCodecTest.numberOfRecords = numberOfRecords;
AvroOutputCodec avroOutputCodec = createObjectUnderTest();
outputStream = new ByteArrayOutputStream();
OutputCodecContext codecContext = new OutputCodecContext();
avroOutputCodec.start(outputStream, null, codecContext);
- for (int index = 0; index < numberOfRecords; index++) {
- final Event event = getRecord(index).getData();
+ List