From 0aba83b7dda738e3701af6c8959261390027d252 Mon Sep 17 00:00:00 2001 From: timo-mue Date: Fri, 21 Jun 2024 17:04:04 +0200 Subject: [PATCH 1/9] add remove_brackets option to flatten-processor (#4616) (#4653) Signed-off-by: Timo Mueller --- .../processor/flatten/FlattenProcessor.java | 7 ++- .../flatten/FlattenProcessorConfig.java | 14 +++++ .../flatten/FlattenProcessorConfigTest.java | 2 +- .../flatten/FlattenProcessorTest.java | 61 +++++++++++++++++++ 4 files changed, 80 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java index 2a07fd6d99..9e3218be88 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java @@ -119,14 +119,15 @@ private Map removeListIndicesInKeys(final Map in final Map resultMap = new HashMap<>(); for (final Map.Entry entry : inputMap.entrySet()) { - final String keyWithoutIndices = removeListIndices(entry.getKey()); + final String keyWithoutIndices = removeListIndices(entry.getKey(), config.isRemoveBrackets()); addFieldsToMapWithMerge(keyWithoutIndices, entry.getValue(), resultMap); } return resultMap; } - private String removeListIndices(final String key) { - return key.replaceAll("\\[\\d+\\]", "[]"); + private String removeListIndices(final String key, final boolean removeBrackets) { + final String replacement = removeBrackets ? "" : "[]"; + return key.replaceAll("\\[\\d+\\]", replacement); } private void addFieldsToMapWithMerge(String key, Object value, Map map) { diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java index 96c9d2e024..c1208f5f40 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugins.processor.flatten; import com.fasterxml.jackson.annotation.JsonProperty; + +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotNull; import java.util.ArrayList; @@ -29,6 +31,9 @@ public class FlattenProcessorConfig { @JsonProperty("remove_list_indices") private boolean removeListIndices = false; + @JsonProperty("remove_brackets") + private boolean removeBrackets = false; + @JsonProperty("exclude_keys") private List excludeKeys = DEFAULT_EXCLUDE_KEYS; @@ -54,6 +59,10 @@ public boolean isRemoveListIndices() { return removeListIndices; } + public boolean isRemoveBrackets() { + return removeBrackets; + } + public List getExcludeKeys() { return excludeKeys; } @@ -65,4 +74,9 @@ public String getFlattenWhen() { public List getTagsOnFailure() { return tagsOnFailure; } + + @AssertTrue(message = "remove_brackets can not be true if remove_list_indices is false.") + boolean removeBracketsNotTrueWhenRemoveListIndicesFalse() { + return (!removeBrackets || removeListIndices); + } } diff --git a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java index d11860df0e..960db201d5 100644 --- a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java +++ b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java @@ -20,7 +20,7 @@ void testDefaultConfig() { assertThat(FlattenProcessorConfig.getSource(), equalTo(null)); assertThat(FlattenProcessorConfig.getTarget(), equalTo(null)); assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false)); - assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false)); + assertThat(FlattenProcessorConfig.isRemoveBrackets(), equalTo(false)); assertThat(FlattenProcessorConfig.getFlattenWhen(), equalTo(null)); assertThat(FlattenProcessorConfig.getTagsOnFailure(), equalTo(null)); assertThat(FlattenProcessorConfig.getExcludeKeys(), equalTo(List.of())); diff --git a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java index 737d245ff5..df693f7f6f 100644 --- a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java +++ b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java @@ -52,6 +52,7 @@ void setUp() { lenient().when(mockConfig.getTarget()).thenReturn(""); lenient().when(mockConfig.isRemoveProcessedFields()).thenReturn(false); lenient().when(mockConfig.isRemoveListIndices()).thenReturn(false); + lenient().when(mockConfig.isRemoveBrackets()).thenReturn(false); lenient().when(mockConfig.getFlattenWhen()).thenReturn(null); lenient().when(mockConfig.getTagsOnFailure()).thenReturn(new ArrayList<>()); lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); @@ -119,6 +120,35 @@ void testFlattenEntireEventDataAndRemoveListIndices() { assertThat(resultData.get("list1[].list2[].value"), is(List.of("value1", "value2"))); } + @Test + void testFlattenEntireEventDataAndRemoveListIndicesAndRemoveBrackets() { + when(mockConfig.isRemoveListIndices()).thenReturn(true); + when(mockConfig.isRemoveBrackets()).thenReturn(true); + + final FlattenProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(createTestData()); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + Map resultData = resultEvent.get("", Map.class); + + assertThat(resultData.containsKey("key1"), is(true)); + assertThat(resultData.get("key1"), is("val1")); + + assertThat(resultData.containsKey("key1"), is(true)); + assertThat(resultData.get("key2.key3.key.4"), is("val2")); + + assertThat(resultData.containsKey("list1[].list2[].name"), is(false)); + assertThat(resultData.containsKey("list1.list2.name"), is(true)); + assertThat(resultData.get("list1.list2.name"), is(List.of("name1", "name2"))); + + assertThat(resultData.containsKey("list1[].list2[].value"), is(false)); + assertThat(resultData.containsKey("list1.list2.value"), is(true)); + assertThat(resultData.get("list1.list2.value"), is(List.of("value1", "value2"))); + } + @Test void testFlattenWithSpecificFieldsAsSourceAndTarget() { when(mockConfig.getSource()).thenReturn(SOURCE_KEY); @@ -187,6 +217,37 @@ void testFlattenWithSpecificFieldsAsSourceAndTargetAndRemoveListIndices() { assertThat(resultData.get("list1[].list2[].value"), is(List.of("value1", "value2"))); } + @Test + void testFlattenWithSpecificFieldsAsSourceAndTargetAndRemoveListIndicesAndRemoveBrackets() { + when(mockConfig.getSource()).thenReturn(SOURCE_KEY); + when(mockConfig.getTarget()).thenReturn(TARGET_KEY); + when(mockConfig.isRemoveListIndices()).thenReturn(true); + when(mockConfig.isRemoveBrackets()).thenReturn(true); + + final FlattenProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(Map.of(SOURCE_KEY, createTestData())); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + Map resultData = resultEvent.get(TARGET_KEY, Map.class); + + assertThat(resultData.containsKey("key1"), is(true)); + assertThat(resultData.get("key1"), is("val1")); + + assertThat(resultData.containsKey("key1"), is(true)); + assertThat(resultData.get("key2.key3.key.4"), is("val2")); + + assertThat(resultData.containsKey("list1[].list2[].name"), is(false)); + assertThat(resultData.containsKey("list1.list2.name"), is(true)); + assertThat(resultData.get("list1.list2.name"), is(List.of("name1", "name2"))); + + assertThat(resultData.containsKey("list1[].list2[].value"), is(false)); + assertThat(resultData.containsKey("list1.list2.value"), is(true)); + assertThat(resultData.get("list1.list2.value"), is(List.of("value1", "value2"))); + } + @Test public void testEventNotProcessedWhenTheWhenConditionIsFalse() { final String whenCondition = UUID.randomUUID().toString(); From b4b71e279a8bad9310d57c3cc476bd1536a7a0d2 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 21 Jun 2024 10:37:03 -0500 Subject: [PATCH 2/9] Fixes performance regression with JacksonEvent put/delete operations. (#4650) With the addition of the EventKey, JacksonEvent always creates a JacksonEventKey in order to use the same code for all paths. However, when put/delete calls are made with a String key, JacksonEvent does not need the JSON Pointer. But, it is created anyway. This adds more work to the put/delete calls that have not yet migrated to the String version. This fixes regression by adding a lazy initialization option when used in JacksonEvent. We should not be lazy when used with the EventKeyFactory since we may lose some up-front validations. Signed-off-by: David Venable --- .../dataprepper/model/event/JacksonEvent.java | 14 ++--- .../model/event/JacksonEventKey.java | 57 +++++++++++++++---- .../model/event/JacksonEventKeyTest.java | 17 ++++++ 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 35e0dd863b..25ef31ec8b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -170,7 +170,7 @@ public void put(EventKey key, Object value) { */ @Override public void put(final String key, final Object value) { - final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.PUT); + final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.PUT); put(jacksonEventKey, value); } @@ -229,7 +229,7 @@ private static JacksonEventKey asJacksonEventKey(EventKey key) { */ @Override public T get(final String key, final Class clazz) { - final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET); + final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET); return get(jacksonEventKey, clazz); } @@ -274,7 +274,7 @@ public List getList(EventKey key, Class clazz) { */ @Override public List getList(final String key, final Class clazz) { - JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET); + JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET); return getList(jacksonEventKey, clazz); } @@ -330,7 +330,7 @@ public void delete(final EventKey key) { */ @Override public void delete(final String key) { - final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.DELETE); + final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.DELETE); delete(jacksonEventKey); } @@ -362,7 +362,7 @@ public String getAsJsonString(EventKey key) { @Override public String getAsJsonString(final String key) { - JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET); + JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET); return getAsJsonString(jacksonEventKey); } @@ -459,7 +459,7 @@ public boolean containsKey(EventKey key) { @Override public boolean containsKey(final String key) { - JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET); + JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET); return containsKey(jacksonEventKey); } @@ -474,7 +474,7 @@ public boolean isValueAList(EventKey key) { @Override public boolean isValueAList(final String key) { - JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET); + JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET); return isValueAList(jacksonEventKey); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java index 2df755492a..50d59a6585 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java @@ -22,14 +22,45 @@ class JacksonEventKey implements EventKey { private final String key; private final EventKeyFactory.EventAction[] eventActions; private final String trimmedKey; - private final List keyPathList; - private final JsonPointer jsonPointer; + private List keyPathList; + private JsonPointer jsonPointer; private final Set supportedActions; + /** + * Constructor for the JacksonEventKey which should only be used by implementation + * of {@link EventKeyFactory} in Data Prepper core. + * + * @param key The key + * @param eventActions Event actions to support + */ JacksonEventKey(final String key, final EventKeyFactory.EventAction... eventActions) { + this(key, false, eventActions); + } + + /** + * Constructs a new JacksonEventKey. + *

+ * This overload should only be used by {@link JacksonEvent} directly. It allows for skipping creating + * some resources knowing they will not be needed. The {@link JacksonEvent} only needs a JSON pointer + * when performing GET event actions. So we can optimize PUT/DELETE actions when called with a string + * key instead of an EventKey by not creating the JSON Pointer at all. + *

+ * For EventKey's constructed through the factory, we should not perform lazy initialization since + * we may lose some possible validations. + * + * @param key the key + * @param lazy Use true to lazily initialize. This will not be thread-safe, however. + * @param eventActions Event actions to support + */ + JacksonEventKey(final String key, final boolean lazy, final EventKeyFactory.EventAction... eventActions) { this.key = Objects.requireNonNull(key, "Parameter key cannot be null for EventKey."); this.eventActions = eventActions.length == 0 ? new EventKeyFactory.EventAction[] { EventKeyFactory.EventAction.ALL } : eventActions; + supportedActions = EnumSet.noneOf(EventKeyFactory.EventAction.class); + for (final EventKeyFactory.EventAction eventAction : this.eventActions) { + supportedActions.addAll(eventAction.getSupportedActions()); + } + if(key.isEmpty()) { for (final EventKeyFactory.EventAction action : this.eventActions) { if (action.isMutableAction()) { @@ -40,14 +71,10 @@ class JacksonEventKey implements EventKey { trimmedKey = checkAndTrimKey(key); - keyPathList = Collections.unmodifiableList(Arrays.asList(trimmedKey.split(SEPARATOR, -1))); - jsonPointer = toJsonPointer(trimmedKey); - - supportedActions = EnumSet.noneOf(EventKeyFactory.EventAction.class); - for (final EventKeyFactory.EventAction eventAction : this.eventActions) { - supportedActions.addAll(eventAction.getSupportedActions()); + if(!lazy) { + keyPathList = toKeyPathList(); + jsonPointer = toJsonPointer(trimmedKey); } - } @Override @@ -60,10 +87,16 @@ String getTrimmedKey() { } List getKeyPathList() { + if(keyPathList == null) { + keyPathList = toKeyPathList(); + } return keyPathList; } JsonPointer getJsonPointer() { + if(jsonPointer == null) { + jsonPointer = toJsonPointer(trimmedKey); + } return jsonPointer; } @@ -136,7 +169,11 @@ private static boolean isValidKey(final String key) { return true; } - private JsonPointer toJsonPointer(final String key) { + private List toKeyPathList() { + return Collections.unmodifiableList(Arrays.asList(trimmedKey.split(SEPARATOR, -1))); + } + + private static JsonPointer toJsonPointer(final String key) { final String jsonPointerExpression; if (key.isEmpty() || key.startsWith("/")) { jsonPointerExpression = key; diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventKeyTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventKeyTest.java index 929151175b..5eb696a374 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventKeyTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventKeyTest.java @@ -115,6 +115,23 @@ void getJsonPointer_returns_the_same_instance_for_multiple_calls() { assertThat(objectUnderTest.getJsonPointer(), sameInstance(jsonPointer)); } + @ParameterizedTest + @EnumSource(value = EventKeyFactory.EventAction.class) + void getJsonPointer_returns_valid_JsonPointer_when_constructed_with_fromJacksonEvent(final EventKeyFactory.EventAction eventAction) { + final String testKey = UUID.randomUUID().toString(); + final JacksonEventKey objectUnderTest = new JacksonEventKey(testKey, true, eventAction); + + final JsonPointer jsonPointer = objectUnderTest.getJsonPointer(); + assertThat(jsonPointer, notNullValue()); + assertThat(jsonPointer.toString(), equalTo("/" + testKey)); + } + + @ParameterizedTest + @ArgumentsSource(KeyPathListArgumentsProvider.class) + void getKeyPathList_returns_expected_value_when_constructed_with_fromJacksonEvent(final String key, final List expectedKeyPathList) { + assertThat(new JacksonEventKey(key, true).getKeyPathList(), equalTo(expectedKeyPathList)); + } + @ParameterizedTest @ArgumentsSource(SupportsArgumentsProvider.class) void supports_returns_true_if_any_supports(final List eventActionsList, final EventKeyFactory.EventAction otherAction, final boolean expectedSupports) { From bbac579ba58b2399136e376d8cd016f04f241791 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 24 Jun 2024 16:56:23 -0500 Subject: [PATCH 3/9] MAINT: change log level for consumer properties in kafka source (#4658) Signed-off-by: George Chen --- .../dataprepper/plugins/kafka/source/KafkaSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 6a01a91bf0..9a385c836f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -241,7 +241,7 @@ private Properties getConsumerProperties(final TopicConsumerConfig topicConfig, } setConsumerTopicProperties(properties, topicConfig); setSchemaRegistryProperties(properties, topicConfig); - LOG.info("Starting consumer with the properties : {}", properties); + LOG.debug("Starting consumer with the properties : {}", properties); return properties; } From ab9700a70e4ff512ad456f6b9895d66c2d3e9bdc Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Mon, 24 Jun 2024 15:46:42 -0700 Subject: [PATCH 4/9] Add an option to count unique values of specified key(s) to CountAggregateAction (#4652) Add an option to count unique values of specified key(s) to CountAggregateAction Signed-off-by: Krishna Kondaka --- .../actions/CountAggregateAction.java | 28 +++++++++- .../actions/CountAggregateActionConfig.java | 10 +++- .../CountAggregateActionConfigTests.java | 8 +++ .../actions/CountAggregateActionTest.java | 54 +++++++++++++++++++ 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index f87cd5a7f0..c8fd772336 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -22,13 +22,17 @@ import static org.opensearch.dataprepper.plugins.processor.aggregate.AggregateProcessor.getTimeNanos; import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState; import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.time.Instant; -import java.util.List; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.Map; + import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * An AggregateAction that combines multiple Events into a single Event. This action will count the number of events with same keys and will create a combined event @@ -38,6 +42,7 @@ @DataPrepperPlugin(name = "count", pluginType = AggregateAction.class, pluginConfigurationType = CountAggregateActionConfig.class) public class CountAggregateAction implements AggregateAction { private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; + private static final String UNIQUE_KEYS_SETKEY = "__unique_keys"; private static final String exemplarKey = "__exemplar"; static final String EVENT_TYPE = "event"; static final String SUM_METRIC_DESCRIPTION = "Number of events"; @@ -49,6 +54,7 @@ public class CountAggregateAction implements AggregateAction { public final String outputFormat; private long startTimeNanos; private final String metricName; + private final IdentificationKeysHasher uniqueKeysHasher; @DataPrepperPluginConstructor public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) { @@ -57,6 +63,11 @@ public CountAggregateAction(final CountAggregateActionConfig countAggregateActio this.endTimeKey = countAggregateActionConfig.getEndTimeKey(); this.outputFormat = countAggregateActionConfig.getOutputFormat(); this.metricName = countAggregateActionConfig.getMetricName(); + if (countAggregateActionConfig.getUniqueKeys() != null) { + this.uniqueKeysHasher = new IdentificationKeysHasher(countAggregateActionConfig.getUniqueKeys()); + } else { + this.uniqueKeysHasher = null; + } } public Exemplar createExemplar(final Event event) { @@ -93,12 +104,24 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } if (groupState.get(countKey) == null) { groupState.putAll(aggregateActionInput.getIdentificationKeys()); + if (uniqueKeysHasher != null) { + Set uniqueKeysMapSet = new HashSet<>(); + + uniqueKeysMapSet.add(uniqueKeysHasher.createIdentificationKeysMapFromEvent(event)); + groupState.put(UNIQUE_KEYS_SETKEY, uniqueKeysMapSet); + } groupState.put(countKey, 1); groupState.put(exemplarKey, createExemplar(event)); groupState.put(startTimeKey, eventStartTime); groupState.put(endTimeKey, eventEndTime); } else { Integer v = (Integer)groupState.get(countKey) + 1; + + if (uniqueKeysHasher != null) { + Set uniqueKeysMapSet = (Set) groupState.get(UNIQUE_KEYS_SETKEY); + uniqueKeysMapSet.add(uniqueKeysHasher.createIdentificationKeysMapFromEvent(event)); + v = uniqueKeysMapSet.size(); + } groupState.put(countKey, v); Instant groupStartTime = (Instant)groupState.get(startTimeKey); Instant groupEndTime = (Instant)groupState.get(endTimeKey); @@ -117,6 +140,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA Instant startTime = (Instant)groupState.get(startTimeKey); Instant endTime = (Instant)groupState.get(endTimeKey); groupState.remove(endTimeKey); + groupState.remove(UNIQUE_KEYS_SETKEY); if (outputFormat.equals(OutputFormat.RAW.toString())) { groupState.put(startTimeKey, startTime.atZone(ZoneId.of(ZoneId.systemDefault().toString())).format(DateTimeFormatter.ofPattern(DATE_FORMAT))); event = JacksonEvent.builder() diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java index f7a2e6a48d..1144aee261 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java @@ -5,8 +5,9 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; -import java.util.Set; import java.util.HashSet; +import java.util.List; +import java.util.Set; import com.fasterxml.jackson.annotation.JsonProperty; public class CountAggregateActionConfig { @@ -22,6 +23,9 @@ public class CountAggregateActionConfig { @JsonProperty("metric_name") String metricName = SUM_METRIC_NAME; + @JsonProperty("unique_keys") + List uniqueKeys = null; + @JsonProperty("start_time_key") String startTimeKey = DEFAULT_START_TIME_KEY; @@ -35,6 +39,10 @@ public String getMetricName() { return metricName; } + public List getUniqueKeys() { + return uniqueKeys; + } + public String getCountKey() { return countKey; } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java index 53b7c846b6..1975918e37 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java @@ -13,6 +13,8 @@ import static org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateActionConfig.DEFAULT_COUNT_KEY; import static org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateActionConfig.DEFAULT_START_TIME_KEY; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; @@ -39,6 +41,7 @@ void testDefault() { assertThat(countAggregateActionConfig.getStartTimeKey(), equalTo(DEFAULT_START_TIME_KEY)); assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString())); assertThat(countAggregateActionConfig.getMetricName(), equalTo(CountAggregateActionConfig.SUM_METRIC_NAME)); + assertThat(countAggregateActionConfig.getUniqueKeys(), equalTo(null)); } @Test @@ -55,6 +58,11 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException { final String testName = UUID.randomUUID().toString(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "metricName", testName); assertThat(countAggregateActionConfig.getMetricName(), equalTo(testName)); + final List uniqueKeys = new ArrayList<>(); + uniqueKeys.add(UUID.randomUUID().toString()); + uniqueKeys.add(UUID.randomUUID().toString()); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "uniqueKeys", uniqueKeys); + assertThat(countAggregateActionConfig.getUniqueKeys(), equalTo(uniqueKeys)); } @Test diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java index b91311c1c6..af81ca001f 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java @@ -153,6 +153,7 @@ void testCountAggregateOTelFormat(int testCount) throws NoSuchFieldException, Il void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) { CountAggregateActionConfig mockConfig = mock(CountAggregateActionConfig.class); when(mockConfig.getCountKey()).thenReturn(CountAggregateActionConfig.DEFAULT_COUNT_KEY); + when(mockConfig.getUniqueKeys()).thenReturn(null); final String testName = UUID.randomUUID().toString(); when(mockConfig.getMetricName()).thenReturn(testName); String startTimeKey = UUID.randomUUID().toString(); @@ -233,4 +234,57 @@ void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) { assertThat(attributes.get(key2), equalTo(value2)); assertTrue(attributes.containsKey(dataKey2)); } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10, 20}) + void testCountAggregateOTelFormatUniqueKeys(int testCount) throws NoSuchFieldException, IllegalAccessException { + CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); + final String testName = UUID.randomUUID().toString(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "metricName", testName); + final String key1 = "key-"+UUID.randomUUID().toString(); + final String value1 = UUID.randomUUID().toString(); + final String dataKey1 = "datakey-"+UUID.randomUUID().toString(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "uniqueKeys", List.of(dataKey1)); + countAggregateAction = createObjectUnderTest(countAggregateActionConfig); + Map eventMap = Collections.singletonMap(key1, value1); + Event testEvent = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + AggregateActionInput aggregateActionInput = new AggregateActionTestUtils.TestAggregateActionInput(eventMap); + final String dataKey1_1 = UUID.randomUUID().toString(); + final String dataKey1_2 = UUID.randomUUID().toString(); + final String dataKey1_3 = UUID.randomUUID().toString(); + final String[] dataKeysList = {dataKey1_1, dataKey1_2, dataKey1_3}; + for (int i = 0; i < testCount; i++) { + testEvent.put(dataKey1, dataKeysList[i % 3]); + AggregateActionResponse aggregateActionResponse = countAggregateAction.handleEvent(testEvent, aggregateActionInput); + assertThat(aggregateActionResponse.getEvent(), equalTo(null)); + } + + AggregateActionOutput actionOutput = countAggregateAction.concludeGroup(aggregateActionInput); + final List result = actionOutput.getEvents(); + assertThat(result.size(), equalTo(1)); + Map expectedEventMap = new HashMap<>(); + double expectedCount = (testCount >= 3) ? 3 : testCount; + expectedEventMap.put("value", expectedCount); + expectedEventMap.put("description", "Number of events"); + expectedEventMap.put("isMonotonic", true); + expectedEventMap.put("aggregationTemporality", "AGGREGATION_TEMPORALITY_DELTA"); + expectedEventMap.put("unit", "1"); + expectedEventMap.put("name", testName); + expectedEventMap.forEach((k, v) -> assertThat(result.get(0).toMap(), hasEntry(k,v))); + assertThat(result.get(0).toMap().get("attributes"), equalTo(eventMap)); + JacksonMetric metric = (JacksonMetric) result.get(0); + assertThat(metric.toJsonString().indexOf("attributes"), not(-1)); + assertThat(result.get(0).toMap(), hasKey("startTime")); + assertThat(result.get(0).toMap(), hasKey("time")); + List> exemplars = (List >)result.get(0).toMap().get("exemplars"); + assertThat(exemplars.size(), equalTo(1)); + Map exemplar = exemplars.get(0); + Map attributes = (Map)exemplar.get("attributes"); + assertThat(attributes.get(key1), equalTo(value1)); + assertTrue(attributes.containsKey(dataKey1)); + + } } From dc77ba5519962cc0d45bf69f7363304de75ec7fb Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Tue, 25 Jun 2024 09:11:40 -0500 Subject: [PATCH 5/9] FIX: remove logging that includes credential info on kafka (#4659) * FIX: use sensitive marker Signed-off-by: George Chen --- .../plugins/kafka/consumer/KafkaCustomConsumerFactory.java | 4 +++- .../dataprepper/plugins/kafka/source/KafkaSource.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index d703538e42..0d091b8af7 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -55,6 +55,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; + public class KafkaCustomConsumerFactory { private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class); @@ -136,7 +138,7 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig, } setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId()); setSchemaRegistryProperties(sourceConfig, properties, topicConfig); - LOG.debug("Starting consumer with the properties : {}", properties); + LOG.debug(SENSITIVE, "Starting consumer with the properties : {}", properties); return properties; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 9a385c836f..525c754929 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -61,6 +61,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; + /** * The starting point of the Kafka-source plugin and the Kafka consumer * properties and kafka multithreaded consumers are being handled here. @@ -241,7 +243,7 @@ private Properties getConsumerProperties(final TopicConsumerConfig topicConfig, } setConsumerTopicProperties(properties, topicConfig); setSchemaRegistryProperties(properties, topicConfig); - LOG.debug("Starting consumer with the properties : {}", properties); + LOG.debug(SENSITIVE, "Starting consumer with the properties : {}", properties); return properties; } From 970f51f6d3a08bfcd789af3a7e517eb7ea3c4ce0 Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 25 Jun 2024 10:07:40 -0500 Subject: [PATCH 6/9] Fixes the loading of peer-forwarders when using multiple workers. This fixes a bug where the service_map processor would not load in a pipeline with multiple workers. Resolves #4660. (#4661) Signed-off-by: David Venable --- .../PeerForwardingProcessorDecorator.java | 9 +-- ...PeerForwardingProcessingDecoratorTest.java | 66 ++++++++++--------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index 097b2b6552..038bdb28c5 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -67,13 +67,10 @@ public static List decorateProcessors( "Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId); } + final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads); - return processors.stream() - .map(processor -> { - PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads); - return new PeerForwardingProcessorDecorator(peerForwarder, processor); - }) - .collect(Collectors.toList()); + return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor)) + .collect(Collectors.toList()); } private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index ceb424b0cb..7a85033842 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -39,6 +39,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -51,18 +52,6 @@ class PeerForwardingProcessingDecoratorTest { @Mock private Processor processor; - @Mock - private Processor processor1; - - @Mock - private Processor processor2; - - @Mock(extraInterfaces = Processor.class) - private RequiresPeerForwarding requiresPeerForwarding1; - - @Mock(extraInterfaces = Processor.class) - private RequiresPeerForwarding requiresPeerForwarding2; - @Mock(extraInterfaces = Processor.class) private RequiresPeerForwarding requiresPeerForwarding; @@ -82,13 +71,13 @@ record = mock(Record.class); pluginId = UUID.randomUUID().toString(); } - private List createObjectUnderTesDecoratedProcessors(final List processors) { + private List createObjectUnderTestDecoratedProcessors(final List processors) { return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, PIPELINE_WORKER_THREADS); } @Test void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_not_an_instance_of_RequiresPeerForwarding() { - assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor))); + assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor))); verifyNoInteractions(peerForwarderProvider); } @@ -97,7 +86,7 @@ void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_n void PeerForwardingProcessingDecorator_execute_with_empty_identification_keys_should_throw() { when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Collections.emptySet()); - assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding))); + assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding))); } @Test @@ -109,12 +98,12 @@ void decorateProcessors_with_different_identification_key_should_throw() { when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString())); when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString())); - assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); + assertThrows(RuntimeException.class, () -> createObjectUnderTestDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); } @Test void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList()); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.emptyList()); assertThat(processors.size(), equalTo(0)); } @@ -136,9 +125,22 @@ void setUp() { @Test void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys() { - createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); + verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); + verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); + } + + @Test + void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys_when_list_of_processors() { + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); + when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys); + + createObjectUnderTestDecoratedProcessors(List.of((Processor) requiresPeerForwarding, (Processor) requiresPeerForwardingCopy)); + verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); } @Test @@ -148,16 +150,15 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() { processorList.add((Processor) requiresPeerForwardingCopy); LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class); - lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); - lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwardingCopy, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); + when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); Event event = mock(Event.class); when(record.getData()).thenReturn(event); List> testData = Collections.singletonList(record); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false); when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(false); - processor1 = (Processor)requiresPeerForwarding; - processor2 = (Processor)requiresPeerForwardingCopy; + Processor processor1 = (Processor)requiresPeerForwarding; + Processor processor2 = (Processor)requiresPeerForwardingCopy; when(processor1.execute(testData)).thenReturn(testData); when(processor2.execute(testData)).thenReturn(testData); @@ -167,9 +168,10 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() { when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); when(requiresPeerForwardingCopy.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(processorList); + final List processors = createObjectUnderTestDecoratedProcessors(processorList); assertThat(processors.size(), equalTo(2)); verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); Collection> result = processors.get(0).execute(testData); assertThat(result.size(), equalTo(testData.size())); assertThat(result, equalTo(testData)); @@ -189,7 +191,7 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc when(processor.execute(testData)).thenReturn(testData); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); final Collection> records = processors.get(0).execute(testData); @@ -215,7 +217,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { when(((Processor) requiresPeerForwarding).execute(anyCollection())).thenReturn(expectedRecordsToProcessLocally); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)); assertThat(processors.size(), equalTo(1)); final Collection> records = processors.get(0).execute(forwardTestData); @@ -232,7 +234,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut Event event = mock(Event.class); when(record.getData()).thenReturn(event); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); assertThat(processors.size(), equalTo(1)); @@ -248,7 +250,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); assertThat(processors.size(), equalTo(1)); @@ -272,7 +274,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(false); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); when(record1.getData()).thenReturn(event1); when(record2.getData()).thenReturn(event2); when(record3.getData()).thenReturn(event3); @@ -303,7 +305,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(false); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); when(record1.getData()).thenReturn(event1); when(record2.getData()).thenReturn(event2); when(record3.getData()).thenReturn(event3); @@ -322,7 +324,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ @Test void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).prepareForShutdown(); @@ -331,7 +333,7 @@ void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_proces @Test void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_processors_isReadyForShutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).isReadyForShutdown(); @@ -340,7 +342,7 @@ void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_proces @Test void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).shutdown(); From 980b0ec50f0e7a905fab9a79896b413d9e23230e Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Tue, 25 Jun 2024 09:51:28 -0700 Subject: [PATCH 7/9] Refactor lambda plugin (#4643) * Refactor lambda plugin Signed-off-by: Srikanth Govindarajan * Address comments Signed-off-by: Srikanth Govindarajan * Address comments 2 Signed-off-by: Srikanth Govindarajan --------- Signed-off-by: Srikanth Govindarajan Signed-off-by: Srikanth Govindarajan --- .../{lambda-sink => lambda}/README.md | 0 .../{lambda-sink => lambda}/build.gradle | 5 ++ .../sink/lambda/LambdaSinkServiceIT.java | 17 +++--- .../lambda/common}/accumlator/Buffer.java | 7 ++- .../common}/accumlator/BufferFactory.java | 2 +- .../common}/accumlator/InMemoryBuffer.java | 20 ++++++- .../accumlator/InMemoryBufferFactory.java | 2 +- .../lambda/common}/codec/LambdaJsonCodec.java | 3 +- .../config/AwsAuthenticationOptions.java | 2 +- .../lambda/common}/config/BatchOptions.java | 4 +- .../common}/config/ThresholdOptions.java | 7 ++- .../lambda/common/util}/ThresholdCheck.java | 7 +-- .../lambda/sink}/LambdaClientFactory.java | 4 +- .../plugins/lambda/sink}/LambdaSink.java | 8 +-- .../lambda/sink}/LambdaSinkConfig.java | 8 +-- .../lambda/sink}/LambdaSinkService.java | 33 ++++++----- .../lambda/sink}/dlq/DlqPushHandler.java | 2 +- .../sink}/dlq/LambdaSinkFailedDlqData.java | 2 +- .../lambda/common}/ThresholdCheckTest.java | 12 ++-- .../InMemoryBufferFactoryTest.java | 6 +- .../accumulator/InMemoryBufferTest.java | 31 +++++----- .../common}/codec/LambdaJsonCodecTest.java | 2 +- .../common}/config/ThresholdOptionsTest.java | 7 +-- .../lambda/sink}/LambdaClientFactoryTest.java | 21 ++++--- .../lambda/sink}/LambdaSinkConfigTest.java | 5 +- .../lambda/sink}/LambdaSinkServiceTest.java | 58 +++++++++---------- .../plugins/lambda/sink}/LambdaSinkTest.java | 13 ++--- .../lambda/sink}/dlq/DlqPushHandlerTest.java | 19 +++--- .../org.mockito.plugins.MockMaker | 0 settings.gradle | 2 +- 30 files changed, 163 insertions(+), 146 deletions(-) rename data-prepper-plugins/{lambda-sink => lambda}/README.md (100%) rename data-prepper-plugins/{lambda-sink => lambda}/build.gradle (89%) rename data-prepper-plugins/{lambda-sink => lambda}/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/Buffer.java (68%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/BufferFactory.java (82%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/InMemoryBuffer.java (84%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/InMemoryBufferFactory.java (85%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/codec/LambdaJsonCodec.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/AwsAuthenticationOptions.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/BatchOptions.java (80%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/ThresholdOptions.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util}/ThresholdCheck.java (84%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaClientFactory.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSink.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkConfig.java (90%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkService.java (92%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/DlqPushHandler.java (98%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/LambdaSinkFailedDlqData.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/ThresholdCheckTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/accumulator/InMemoryBufferFactoryTest.java (78%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/accumulator/InMemoryBufferTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/codec/LambdaJsonCodecTest.java (98%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/config/ThresholdOptionsTest.java (93%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaClientFactoryTest.java (96%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkConfigTest.java (94%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkServiceTest.java (94%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/DlqPushHandlerTest.java (95%) rename data-prepper-plugins/{lambda-sink => lambda}/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker (100%) diff --git a/data-prepper-plugins/lambda-sink/README.md b/data-prepper-plugins/lambda/README.md similarity index 100% rename from data-prepper-plugins/lambda-sink/README.md rename to data-prepper-plugins/lambda/README.md diff --git a/data-prepper-plugins/lambda-sink/build.gradle b/data-prepper-plugins/lambda/build.gradle similarity index 89% rename from data-prepper-plugins/lambda-sink/build.gradle rename to data-prepper-plugins/lambda/build.gradle index 429e190a6a..d0c09c9c8b 100644 --- a/data-prepper-plugins/lambda-sink/build.gradle +++ b/data-prepper-plugins/lambda/build.gradle @@ -19,6 +19,11 @@ dependencies { implementation'org.json:json' implementation libs.commons.lang3 implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + implementation 'org.projectlombok:lombok:1.18.22' + compileOnly 'org.projectlombok:lombok:1.18.20' + annotationProcessor 'org.projectlombok:lombok:1.18.20' + testCompileOnly 'org.projectlombok:lombok:1.18.20' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.20' testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation project(':data-prepper-test-common') testImplementation project(':data-prepper-plugins:parse-json-processor') diff --git a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java b/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java rename to data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java index 89cf85ceac..76fb4831ce 100644 --- a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java +++ b/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java @@ -17,6 +17,7 @@ import org.mockito.Mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -29,12 +30,14 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig; +import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -45,8 +48,6 @@ import java.util.HashMap; import java.util.List; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class LambdaSinkServiceIT { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java similarity index 68% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java index 48afbe6a01..f52a8e5de0 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; import java.io.OutputStream; import java.time.Duration; @@ -21,7 +22,9 @@ public interface Buffer { Duration getDuration(); - void flushToLambda(); + void flushToLambdaAsync(); + + InvokeResponse flushToLambdaSync(); OutputStream getOutputStream(); diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java similarity index 82% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java index 80afd2f1ca..e44bbd6aee 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java similarity index 84% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index bba70c6e62..5d9d5a5134 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,7 +62,22 @@ public Duration getDuration() { @Override - public void flushToLambda() { + public void flushToLambdaAsync() { + InvokeResponse resp; + SdkBytes payload = getPayload(); + + // Setup an InvokeRequest. + InvokeRequest request = InvokeRequest.builder() + .functionName(functionName) + .payload(payload) + .invocationType(invocationType) + .build(); + + lambdaClient.invoke(request); + } + + @Override + public InvokeResponse flushToLambdaSync() { InvokeResponse resp; SdkBytes payload = getPayload(); @@ -74,6 +89,7 @@ public void flushToLambda() { .build(); resp = lambdaClient.invoke(request); + return resp; } private SdkBytes validatePayload(String payload_string) { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java similarity index 85% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java index e58952c5cb..37ad4a4105 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java index 5bf21f5e18..a1ccaa8561 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.codec; +package org.opensearch.dataprepper.plugins.lambda.common.codec; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; @@ -37,7 +37,6 @@ public String getExtension() { @Override public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); - Objects.requireNonNull(codecContext); this.codecContext = codecContext; generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); if(Objects.nonNull(keyName)){ diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java index 8d6c64829d..e40fa617ee 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java similarity index 80% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java index 3773d4e6ed..099bed2b54 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; @@ -18,7 +18,7 @@ public class BatchOptions { @JsonProperty("threshold") @NotNull - ThresholdOptions thresholdOptions; + ThresholdOptions thresholdOptions = new ThresholdOptions(); public String getBatchKey(){return batchKey;} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java index 031157c4be..1f92b90b48 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java @@ -3,15 +3,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; + import java.time.Duration; -import jakarta.validation.constraints.NotNull; -import jakarta.validation.constraints.Size; public class ThresholdOptions { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java similarity index 84% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java index 74aa98e7f9..6bbf8a4ab8 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java @@ -3,10 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.common.util; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; import java.time.Duration; @@ -15,9 +15,6 @@ */ public class ThresholdCheck { - private ThresholdCheck() { - } - public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration, final Boolean isBatchEnabled) { if (!isBatchEnabled) return true; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java index 3e33a4e835..03b94340f0 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index b1ef905233..54e484fd13 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -17,9 +17,9 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java similarity index 90% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java index a20fa41181..bb50e2510e 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; @@ -10,11 +10,11 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; -import java.util.Objects; import java.util.Map; +import java.util.Objects; public class LambdaSinkConfig { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java similarity index 92% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java index f10607e7d1..9a788e6816 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java @@ -3,29 +3,30 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.codec.LambdaJsonCodec; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.codec.LambdaJsonCodec; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -48,7 +49,7 @@ public class LambdaSinkService { private final PluginSetting pluginSetting; private final Lock reentrantLock; private final LambdaSinkConfig lambdaSinkConfig; - private LambdaClient lambdaClient; + private final LambdaClient lambdaClient; private final String functionName; private int maxEvents = 0; private ByteCount maxBytes = null; @@ -65,9 +66,9 @@ public class LambdaSinkService { private final List events; private OutputCodec codec = null; private final BatchOptions batchOptions; - private Boolean isBatchEnabled; + private final Boolean isBatchEnabled; private OutputCodecContext codecContext = null; - private String batchKey; + private final String batchKey; public LambdaSinkService(final LambdaClient lambdaClient, final LambdaSinkConfig lambdaSinkConfig, @@ -213,7 +214,7 @@ protected boolean retryFlushToLambda(Buffer currentBuffer, do { try { - currentBuffer.flushToLambda(); + currentBuffer.flushToLambdaAsync(); isUploadedToLambda = Boolean.TRUE; } catch (AwsServiceException | SdkClientException e) { errorMsgObj.set(e.getMessage()); diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java similarity index 98% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java index 1bdeb0a394..da8c52eb4e 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import com.fasterxml.jackson.databind.ObjectWriter; import io.micrometer.core.instrument.util.StringUtils; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java index 0808010e37..8941966b77 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import com.fasterxml.jackson.core.JsonProcessingException; import software.amazon.awssdk.core.SdkBytes; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java index b63553911a..d56420d18f 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java @@ -3,23 +3,23 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.common; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; import java.io.IOException; import java.time.Duration; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class ThresholdCheckTest { diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java similarity index 78% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java index d161b28bb0..37276db819 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; +package org.opensearch.dataprepper.plugins.lambda.common.accumulator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java index 478650a300..fb164b1ac1 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java @@ -3,16 +3,26 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; +package org.opensearch.dataprepper.plugins.lambda.common.accumulator; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; import org.hamcrest.Matchers; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import org.junit.jupiter.api.Assertions; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.ArgumentMatchers.any; import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -25,17 +35,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class InMemoryBufferTest { @@ -119,7 +118,7 @@ void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException { inMemoryBuffer.setEventCount(eventCount); } assertDoesNotThrow(() -> { - inMemoryBuffer.flushToLambda(); + inMemoryBuffer.flushToLambdaAsync(); }); } @@ -136,7 +135,7 @@ void test_uploadedToLambda_success() throws IOException { OutputStream outputStream = inMemoryBuffer.getOutputStream(); outputStream.write(generateByteArray()); assertDoesNotThrow(() -> { - inMemoryBuffer.flushToLambda(); + inMemoryBuffer.flushToLambdaAsync(); }); } @@ -153,7 +152,7 @@ void test_uploadedToLambda_fails() { inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); Assertions.assertNotNull(inMemoryBuffer); - SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambda()); + SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambdaAsync()); assertThat(actualException, Matchers.equalTo(sdkClientException)); } diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java similarity index 98% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java index 6de6ce8a0e..4b6e4c5caf 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.codec; +package org.opensearch.dataprepper.plugins.lambda.common.codec; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java index 53bd0a4edf..5d12aca3da 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java @@ -3,13 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; - -import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.model.types.ByteCount; +package org.opensearch.dataprepper.plugins.lambda.common.config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; class ThresholdOptionsTest { private static final String DEFAULT_BYTE_CAPACITY = "6mb"; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java similarity index 96% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java index ab72ee44b8..9ed5c71fb2 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java @@ -2,35 +2,34 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import static org.mockito.ArgumentMatchers.any; import org.mockito.Mock; import org.mockito.MockedStatic; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.LambdaClientBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.Map; import java.util.UUID; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java similarity index 94% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java index eda9488a04..2a6dad3a69 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java @@ -2,12 +2,13 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; @@ -21,7 +22,7 @@ class LambdaSinkConfigTest { @Test void lambda_sink_default_max_connection_retries_test(){ - assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); + MatcherAssert.assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); } @Test diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java similarity index 94% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java index bbab8778c0..4e678c191d 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java @@ -2,34 +2,46 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import io.micrometer.core.instrument.Counter; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.http.SdkHttpResponse; @@ -37,7 +49,6 @@ import software.amazon.awssdk.services.lambda.model.InvokeRequest; import software.amazon.awssdk.services.lambda.model.InvokeResponse; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.time.Duration; @@ -47,19 +58,6 @@ import java.util.List; import java.util.Map; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.ArgumentMatchers.any; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; - public class LambdaSinkServiceTest { public static final int maxEvents = 10; @@ -193,7 +191,7 @@ public void lambda_sink_test_max_retires_works() throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); - doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + doThrow(AwsServiceException.class).when(buffer).flushToLambdaAsync(); LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, lambdaSinkConfig, @@ -209,7 +207,7 @@ public void lambda_sink_test_max_retires_works() throws IOException { Collection> records = List.of(eventRecord); lambdaSinkService.output(records); - verify(buffer, times(3)).flushToLambda(); + verify(buffer, times(3)).flushToLambdaAsync(); } @Test @@ -232,7 +230,7 @@ public void lambda_sink_test_dlq_works() throws IOException { when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); - doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + doThrow(AwsServiceException.class).when(buffer).flushToLambdaAsync(); LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, lambdaSinkConfig, @@ -249,7 +247,7 @@ public void lambda_sink_test_dlq_works() throws IOException { lambdaSinkService.output(records); - verify(buffer, times(3)).flushToLambda(); + verify(buffer, times(3)).flushToLambdaAsync(); verify(dlqPushHandler,times(1)).perform(any(PluginSetting.class),any(Object.class)); } diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index 1687cbd285..9a042014f0 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -3,28 +3,27 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.junit.jupiter.api.Assertions; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - class LambdaSinkTest { public static final String S3_REGION = "us-east-1"; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java index 17f39973b7..e1de3303a1 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java @@ -2,17 +2,24 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; import software.amazon.awssdk.core.SdkBytes; import java.io.IOException; @@ -20,14 +27,6 @@ import java.util.Map; import java.util.Optional; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - class DlqPushHandlerTest { private static final String BUCKET = "bucket"; diff --git a/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker similarity index 100% rename from data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker rename to data-prepper-plugins/lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/settings.gradle b/settings.gradle index 8400ff98c2..64d86219ea 100644 --- a/settings.gradle +++ b/settings.gradle @@ -175,4 +175,4 @@ include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' -include 'data-prepper-plugins:lambda-sink' \ No newline at end of file +include 'data-prepper-plugins:lambda' \ No newline at end of file From d29954c25363b827b7697e6073e724c2d427ec1d Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 25 Jun 2024 17:09:27 -0500 Subject: [PATCH 8/9] Updates the chunking algorithm for http source's JsonCodec to account for actual byte size. Test using Unicode characters to prove this was incorrectly chunking and verify against future changes. (#4656) Signed-off-by: David Venable --- .../dataprepper/http/codec/JsonCodec.java | 3 +- .../dataprepper/http/codec/JsonCodecTest.java | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java index fc25193a9d..4c0020a83e 100644 --- a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/JsonCodec.java @@ -10,6 +10,7 @@ import com.linecorp.armeria.common.HttpData; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -56,7 +57,7 @@ public List> parse(HttpData httpData, int maxSize) throws IOExcepti size = OVERHEAD_CHARACTERS.length(); } innerJsonList.add(recordString); - size += recordString.length() + COMMA_OVERHEAD_LENGTH; + size += recordString.getBytes(Charset.defaultCharset()).length + COMMA_OVERHEAD_LENGTH; } if (size > OVERHEAD_CHARACTERS.length()) { jsonList.add(innerJsonList); diff --git a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java index 4863667bc0..8843d8d6e7 100644 --- a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/JsonCodecTest.java @@ -7,16 +7,31 @@ import com.linecorp.armeria.common.HttpData; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import java.io.IOException; +import java.nio.charset.Charset; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.params.provider.Arguments.arguments; class JsonCodecTest { private final HttpData goodTestData = HttpData.ofUtf8("[{\"a\":\"b\"}, {\"c\":\"d\"}]"); private final HttpData goodLargeTestData = HttpData.ofUtf8("[{\"a1\":\"b1\"}, {\"a2\":\"b2\"}, {\"a3\":\"b3\"}, {\"a4\":\"b4\"}, {\"a5\":\"b5\"}]"); + private final HttpData goodLargeTestDataUnicode = HttpData.ofUtf8("[{\"ὊὊὊ1\":\"ὊὊὊ1\"}, {\"ὊὊὊ2\":\"ὊὊὊ2\"}, {\"a3\":\"b3\"}, {\"ὊὊὊ4\":\"ὊὊὊ4\"}]"); private final HttpData badTestDataJsonLine = HttpData.ofUtf8("{\"a\":\"b\"}"); private final HttpData badTestDataMultiJsonLines = HttpData.ofUtf8("{\"a\":\"b\"}{\"c\":\"d\"}"); private final HttpData badTestDataNonJson = HttpData.ofUtf8("non json content"); @@ -51,6 +66,25 @@ public void testParseSuccessWithMaxSize() throws IOException { assertEquals("{\"a5\":\"b5\"}", res.get(2).get(0)); } + @ParameterizedTest + @ArgumentsSource(JsonArrayWithKnownFirstArgumentsProvider.class) + public void parse_should_return_lists_smaller_than_provided_length( + final String inputJsonArray, final String knownFirstPart) throws IOException { + final int knownSingleBodySize = knownFirstPart.getBytes(Charset.defaultCharset()).length; + final int maxSize = (knownSingleBodySize * 2) + 3; + final List> chunkedBodies = objectUnderTest.parse(HttpData.ofUtf8(inputJsonArray), + maxSize); + + assertThat(chunkedBodies, notNullValue()); + assertThat(chunkedBodies.size(), greaterThanOrEqualTo(1)); + final String firstReconstructed = chunkedBodies.get(0).stream().collect(Collectors.joining(",", "[", "]")); + assertThat(firstReconstructed.getBytes(Charset.defaultCharset()).length, + lessThanOrEqualTo(maxSize)); + + assertThat(chunkedBodies.get(0).size(), greaterThanOrEqualTo(1)); + assertThat(chunkedBodies.get(0).get(0), equalTo(knownFirstPart)); + } + @Test public void testParseJsonLineFailure() { assertThrows(IOException.class, () -> objectUnderTest.parse(badTestDataJsonLine)); @@ -65,4 +99,18 @@ public void testParseMultiJsonLinesFailure() { public void testParseNonJsonFailure() { assertThrows(IOException.class, () -> objectUnderTest.parse(badTestDataNonJson)); } + + static class JsonArrayWithKnownFirstArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext extensionContext) throws Exception { + return Stream.of( + arguments( + "[{\"ὊὊὊ1\":\"ὊὊὊ1\"}, {\"ὊὊὊ2\":\"ὊὊὊ2\"}, {\"a3\":\"b3\"}, {\"ὊὊὊ4\":\"ὊὊὊ4\"}]", + "{\"ὊὊὊ1\":\"ὊὊὊ1\"}"), + arguments( + "[{\"aaa1\":\"aaa1\"}, {\"aaa2\":\"aaa2\"}, {\"a3\":\"b3\"}, {\"bbb4\":\"bbb4\"}]", + "{\"aaa1\":\"aaa1\"}") + ); + } + } } \ No newline at end of file From c5fe1b48990e659b2e0e7b45d5f4ee916c24ce2a Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Wed, 26 Jun 2024 07:15:04 -0700 Subject: [PATCH 9/9] Support default route option for Events that match no other route (#4662) Support default route option for Events that match no other route Signed-off-by: Krishna Kondaka --- .../Router_ThreeRoutesDefaultIT.java | 130 ++++++++++++++++++ .../route/three-route-with-default-route.yaml | 41 ++++++ .../router/DataFlowComponentRouter.java | 5 +- .../router/DataFlowComponentRouterTest.java | 38 +++++ 4 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java create mode 100644 data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java new file mode 100644 index 0000000000..fbc61053a5 --- /dev/null +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.integration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; + +class Router_ThreeRoutesDefaultIT { + private static final String TESTING_KEY = "ConditionalRoutingIT"; + private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all"; + private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha"; + private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta"; + private static final String ALPHA_DEFAULT_SOURCE_KEY = TESTING_KEY + "_alpha_default"; + private static final String ALPHA_BETA_GAMMA_SOURCE_KEY = TESTING_KEY + "_alpha_beta_gamma"; + private static final String DEFAULT_SOURCE_KEY = TESTING_KEY + "_default"; + private static final String KNOWN_CONDITIONAL_KEY = "value"; + private static final String ALPHA_VALUE = "a"; + private static final String BETA_VALUE = "b"; + private static final String GAMMA_VALUE = "g"; + private static final String DEFAULT_VALUE = "z"; + private DataPrepperTestRunner dataPrepperTestRunner; + private InMemorySourceAccessor inMemorySourceAccessor; + private InMemorySinkAccessor inMemorySinkAccessor; + + @BeforeEach + void setUp() { + dataPrepperTestRunner = DataPrepperTestRunner.builder() + .withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml") + .build(); + + dataPrepperTestRunner.start(); + inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); + inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor(); + } + + @AfterEach + void tearDown() { + dataPrepperTestRunner.stop(); + } + + @Test + void test_default_route() { + final List> alphaEvents = createEvents(ALPHA_VALUE, 10); + final List> betaEvents = createEvents(BETA_VALUE, 20); + final List> gammaEvents = createEvents(GAMMA_VALUE, 20); + final List> defaultEvents = createEvents(DEFAULT_VALUE, 20); + + final List> allEvents = new ArrayList<>(alphaEvents); + allEvents.addAll(betaEvents); + allEvents.addAll(gammaEvents); + allEvents.addAll(defaultEvents); + Collections.shuffle(allEvents); + + inMemorySourceAccessor.submit(TESTING_KEY, allEvents); + + await().atMost(2, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALL_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty())); + assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty())); + }); + + final List> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY); + + assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size())); + + assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream() + .filter(alphaEvents::contains).toArray())); + + final List> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY); + + assertThat(actualBetaRecords.size(), equalTo(betaEvents.size())); + + assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream() + .filter(betaEvents::contains).toArray())); + + final List> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY); + + assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size())); + assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(defaultEvents::contains).toArray())); + + final List> actualAlphaDefaultRecords = new ArrayList<>(); + actualAlphaDefaultRecords.addAll(actualAlphaRecords); + actualAlphaDefaultRecords.addAll(actualDefaultRecords); + assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size())); + assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray())); + + } + + private List> createEvents(final String value, final int numberToCreate) { + return IntStream.range(0, numberToCreate) + .mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "arbitrary_field", UUID.randomUUID().toString())) + .map(map -> JacksonEvent.builder().withData(map).withEventType("TEST").build()) + .map(jacksonEvent -> (Event) jacksonEvent) + .map(Record::new) + .collect(Collectors.toList()); + } +} + diff --git a/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml new file mode 100644 index 0000000000..6d608a0d0b --- /dev/null +++ b/data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml @@ -0,0 +1,41 @@ +routing-pipeline: + workers: 4 + delay: 10 + source: + in_memory: + testing_key: ConditionalRoutingIT + buffer: + bounded_blocking: + # Use a small batch size to help ensure that multiple threads + # are picking up the different routes. + batch_size: 10 + route: + - alpha: '/value == "a"' + - beta: '/value == "b"' + - gamma: '/value == "g"' + sink: + - in_memory: + testing_key: ConditionalRoutingIT_alpha + routes: + - alpha + - in_memory: + testing_key: ConditionalRoutingIT_beta + routes: + - beta + - in_memory: + testing_key: ConditionalRoutingIT_alpha_default + routes: + - alpha + - _default + - in_memory: + testing_key: ConditionalRoutingIT_alpha_beta_gamma + routes: + - alpha + - beta + - gamma + - in_memory: + testing_key: ConditionalRoutingIT_default + routes: + - _default + - in_memory: + testing_key: ConditionalRoutingIT_all diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java index 1e8850219d..4e5c89cc29 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouter.java @@ -20,6 +20,7 @@ * intended to help break apart {@link Router} for better testing. */ class DataFlowComponentRouter { + static final String DEFAULT_ROUTE = "_default"; void route(final Collection allRecords, final DataFlowComponent dataFlowComponent, final Map> recordsToRoutes, @@ -37,7 +38,9 @@ void route(final Collection allRecords, final Set routesForEvent = recordsToRoutes .getOrDefault(record, Collections.emptySet()); - if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) { + if (routesForEvent.size() == 0 && dataFlowComponentRoutes.contains(DEFAULT_ROUTE)) { + recordsForComponent.add(getRecordStrategy.getRecord(record)); + } else if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) { recordsForComponent.add(getRecordStrategy.getRecord(record)); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java index 3802356592..1ea74afe70 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/router/DataFlowComponentRouterTest.java @@ -158,6 +158,17 @@ void route_no_Events_when_none_have_matching_routes() { verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList()); } + @Test + void route_no_Events_when_none_have_matching_routes_with_default_route() { + when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> noMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet())); + + createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + } + @Test void route_all_Events_when_all_have_matched_route() { @@ -236,6 +247,33 @@ void route_no_Events_when_none_have_matching_routes() { verify(componentRecordsConsumer).accept(testComponent, Collections.emptyList()); } + @Test + void route_no_Events_when_none_have_matching_routes_with_default_route() { + when(dataFlowComponent.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> noMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.emptySet())); + + createObjectUnderTest().route(recordsIn, dataFlowComponent, noMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + } + + @Test + void route_matched_events_with_none_to_default_route() { + DataFlowComponent dataFlowComponent2 = mock(DataFlowComponent.class); + when(dataFlowComponent2.getRoutes()).thenReturn(Set.of(DataFlowComponentRouter.DEFAULT_ROUTE)); + final Map> allMatchingRoutes = recordsIn.stream() + .collect(Collectors.toMap(Function.identity(), r -> Collections.singleton(knownRoute))); + + createObjectUnderTest().route(recordsIn, dataFlowComponent2, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + verify(componentRecordsConsumer).accept(null, Collections.emptyList()); + createObjectUnderTest().route(recordsIn, dataFlowComponent, allMatchingRoutes, getRecordStrategy, componentRecordsConsumer); + + verify(componentRecordsConsumer).accept(testComponent, recordsIn); + + } + + @Test void route_all_Events_when_all_have_matched_route() {