From 04098b3287cc796b8962f91c765996ebc1932250 Mon Sep 17 00:00:00 2001 From: David Venable Date: Mon, 19 Aug 2024 13:12:27 -0500 Subject: [PATCH 1/4] Update the parse JSON/XML/ION processors to use EventKey. (#4842) Signed-off-by: David Venable --- .../parse/AbstractParseProcessor.java | 30 +++++++++++-------- .../parse/ion/ParseIonProcessor.java | 6 ++-- .../parse/json/ParseJsonProcessor.java | 6 ++-- .../parse/xml/ParseXmlProcessor.java | 6 ++-- .../parse/ion/ParseIonProcessorTest.java | 4 +-- .../parse/json/ParseJsonProcessorTest.java | 25 +++++++++------- .../parse/xml/ParseXmlProcessorTest.java | 15 ++++++---- 7 files changed, 55 insertions(+), 37 deletions(-) diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index 878316c183..ffb0855590 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -9,6 +9,8 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.record.Record; @@ -30,8 +32,8 @@ public abstract class AbstractParseProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class); - private final String source; - private final String destination; + private final EventKey source; + private final EventKey destination; private final String pointer; private final String parseWhen; private final List tagsOnFailure; @@ -39,20 +41,23 @@ public abstract class AbstractParseProcessor extends AbstractProcessor> doExecute(final Collection> recor if(deleteSourceRequested) { event.delete(this.source); } - } catch (Exception e) { + } catch (final Exception e) { LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); } } @@ -128,7 +133,8 @@ private String getProcessorName() { private Map parseUsingPointer(final Event event, final Map parsedJson, final String pointer, final boolean doWriteToRoot) { final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build(); - temporaryEvent.put(source, parsedJson); + final EventKey temporaryPutKey = eventKeyFactory.createEventKey(source.getKey(), EventKeyFactory.EventAction.PUT); + temporaryEvent.put(temporaryPutKey, parsedJson); final String trimmedPointer = trimPointer(pointer); final String actualPointer = source + "/" + trimmedPointer; @@ -170,15 +176,15 @@ private String normalizePointerStructure(final String pointer) { return pointer.replace('/','.'); } - private String trimPointer(String pointer) { + private String trimPointer(final String pointer) { final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer; return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash; } private void writeToRoot(final Event event, final Map parsedJson) { - for (Map.Entry entry : parsedJson.entrySet()) { + for (final Map.Entry entry : parsedJson.entrySet()) { if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) { - event.put(entry.getKey(), entry.getValue()); + event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue()); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java index 9a58594da7..9d2677e0be 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -32,8 +33,9 @@ public class ParseIonProcessor extends AbstractParseProcessor { @DataPrepperPluginConstructor public ParseIonProcessor(final PluginMetrics pluginMetrics, final ParseIonProcessorConfig parseIonProcessorConfig, - final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator); + final ExpressionEvaluator expressionEvaluator, + final EventKeyFactory eventKeyFactory) { + super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator, eventKeyFactory); // Convert Timestamps to ISO-8601 Z strings objectMapper.registerModule(new IonTimestampConverterModule()); diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java index dd7b471b33..637cbdea0d 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -32,8 +33,9 @@ public class ParseJsonProcessor extends AbstractParseProcessor { @DataPrepperPluginConstructor public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig, - final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator); + final ExpressionEvaluator expressionEvaluator, + final EventKeyFactory eventKeyFactory) { + super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory); } @Override diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java index debacad49c..984a49964a 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -27,8 +28,9 @@ public class ParseXmlProcessor extends AbstractParseProcessor { @DataPrepperPluginConstructor public ParseXmlProcessor(final PluginMetrics pluginMetrics, final ParseXmlProcessorConfig parseXmlProcessorConfig, - final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator); + final ExpressionEvaluator expressionEvaluator, + final EventKeyFactory eventKeyFactory) { + super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator, eventKeyFactory); } @Override diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index c9a8fdf4e5..d7eb14c28a 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -37,7 +37,7 @@ public void setup() { @Override protected AbstractParseProcessor createObjectUnderTest() { - return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator); + return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator, testEventKeyFactory); } @Test @@ -58,7 +58,7 @@ void test_when_using_ion_features_then_processorParsesCorrectly() { @Test void test_when_deleteSourceFlagEnabled() { when(processorConfig.isDeleteSourceRequested()).thenReturn(true); - parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator); + parseJsonProcessor = createObjectUnderTest(); final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }"; final Event parsedEvent = createAndParseMessageEvent(serializedMessage); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index 1416d6cf35..cf71f2251f 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -5,16 +5,20 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.event.TestEventKeyFactory; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; @@ -48,6 +52,8 @@ public class ParseJsonProcessorTest { protected ExpressionEvaluator expressionEvaluator; protected AbstractParseProcessor parseJsonProcessor; + private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); + protected final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @BeforeEach public void setup() { @@ -61,7 +67,7 @@ public void setup() { } protected AbstractParseProcessor createObjectUnderTest() { - return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator); + return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator, testEventKeyFactory); } @Test @@ -197,7 +203,7 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { @Test void test_when_deleteSourceFlagEnabled() { when(processorConfig.isDeleteSourceRequested()).thenReturn(true); - parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator); + parseJsonProcessor = createObjectUnderTest(); final String key = "key"; final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); @@ -434,10 +440,7 @@ private Record createMessageEvent(final String message) { } private Record buildRecordWithEvent(final Map data) { - return new Record<>(JacksonEvent.builder() - .withData(data) - .withEventType("event") - .build()); + return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build()); } private void assertThatKeyEquals(final Event parsedEvent, final String key, final Object value) { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 8d9bc4cde3..5cd9037e5b 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -5,10 +5,14 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.event.TestEventKeyFactory; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; @@ -37,6 +41,8 @@ public class ParseXmlProcessorTest { private ExpressionEvaluator expressionEvaluator; private AbstractParseProcessor parseXmlProcessor; + private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); + private final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @BeforeEach public void setup() { @@ -46,7 +52,7 @@ public void setup() { } protected AbstractParseProcessor createObjectUnderTest() { - return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator); + return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator, testEventKeyFactory); } @Test @@ -104,9 +110,6 @@ private Record createMessageEvent(final String message) { } private Record buildRecordWithEvent(final Map data) { - return new Record<>(JacksonEvent.builder() - .withData(data) - .withEventType("event") - .build()); + return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build()); } } From 9a825909398f1e9064e01c4eb8860d5232f8426a Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 19 Aug 2024 16:15:05 -0500 Subject: [PATCH 2/4] Fix bug where race condition on ack callback could cause S3 folder partition to not be given up (#4835) Signed-off-by: Taylor Gray --- .../dataprepper/plugins/source/s3/ScanObjectWorker.java | 5 ++--- .../plugins/source/s3/S3ScanObjectWorkerTest.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 3cddb1a2e8..471a0efa3d 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -280,8 +280,6 @@ private void processFolderPartition(final SourcePartition sourceCoordinator.saveProgressStateForPartition(folderPartition.getPartitionKey(), folderPartitionState.get()); processObjectsForFolderPartition(objectsToProcess, folderPartition); - - sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); } private List getObjectsForPrefix(final String bucket, final String s3Prefix) { @@ -364,7 +362,8 @@ private void processObjectsForFolderPartition(final List obje objectIndex++; } - // Complete the final acknowledgment set + sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); + if (acknowledgementSet != null) { acknowledgementSet.complete(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java index a02fec1af4..fa1645db8d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java @@ -460,8 +460,8 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key()); inOrder.verify(acknowledgementSet1).complete(); inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, secondObject.key()); - inOrder.verify(acknowledgementSet2).complete(); inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + inOrder.verify(acknowledgementSet2).complete(); final Consumer firstAckCallback = ackCallbacks.get(0); firstAckCallback.accept(true); @@ -532,8 +532,8 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker); inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key()); - inOrder.verify(acknowledgementSet1).complete(); inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + inOrder.verify(acknowledgementSet1).complete(); final Consumer ackCallback = consumerArgumentCaptor.getValue(); ackCallback.accept(true); From ff2de268f9f111f329552f70c67c969652ab269b Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 19 Aug 2024 17:12:29 -0500 Subject: [PATCH 3/4] Add handle failed events option to parse json processors (#4844) Signed-off-by: Taylor Gray --- .../event}/HandleFailedEventsOption.java | 25 +++--- .../event/HandleFailedEventsOptionTest.java | 33 +++++++ .../drop/DropEventProcessorConfig.java | 1 + .../drop/DropEventsWhenCondition.java | 8 +- .../drop/DropEventProcessorConfigTest.java | 1 + .../drop/DropEventsProcessorTests.java | 1 + .../DropEventsWhenConditionBuilderTest.java | 1 + .../drop/DropEventsWhenConditionTest.java | 1 + .../drop/HandleFailedEventsOptionTest.java | 20 ----- .../parse-json-processor/build.gradle | 1 + .../parse/AbstractParseProcessor.java | 16 +++- .../processor/parse/CommonParseConfig.java | 7 ++ .../parse/ion/ParseIonProcessor.java | 20 ++++- .../parse/ion/ParseIonProcessorConfig.java | 29 +++++++ .../parse/json/ParseJsonProcessor.java | 18 +++- .../parse/json/ParseJsonProcessorConfig.java | 29 +++++++ .../parse/xml/ParseXmlProcessor.java | 19 +++- .../parse/xml/ParseXmlProcessorConfig.java | 29 +++++++ .../ion/ParseIonProcessorConfigTest.java | 17 ++++ .../parse/ion/ParseIonProcessorTest.java | 17 ++++ .../json/ParseJsonProcessorConfigTest.java | 21 ++++- .../parse/json/ParseJsonProcessorTest.java | 87 +++++++++++++++++++ .../xml/ParseXmlProcessorConfigTest.java | 17 ++++ .../parse/xml/ParseXmlProcessorTest.java | 57 ++++++++++++ 24 files changed, 430 insertions(+), 45 deletions(-) rename {data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop => data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event}/HandleFailedEventsOption.java (61%) create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java delete mode 100644 data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java similarity index 61% rename from data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java index b3f4532e65..6c310eb395 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java @@ -3,23 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.drop; +package org.opensearch.dataprepper.model.event; -import org.opensearch.dataprepper.model.event.Event; import com.fasterxml.jackson.annotation.JsonCreator; -import org.slf4j.Logger; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; - -enum HandleFailedEventsOption { - DROP("drop", true, false), - DROP_SILENTLY("drop_silently", true, true), - SKIP("skip", false, false), - SKIP_SILENTLY("skip_silently", false, true); +public enum HandleFailedEventsOption { + DROP("drop", true, true), + DROP_SILENTLY("drop_silently", true, false), + SKIP("skip", false, true), + SKIP_SILENTLY("skip_silently", false, false); private static final Map OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values()) .collect(Collectors.toMap( @@ -37,13 +33,14 @@ enum HandleFailedEventsOption { this.isLogRequired = isLogRequired; } - public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) { - if (isLogRequired) { - log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause); - } + public boolean shouldDropEvent() { return isDropEventOption; } + public boolean shouldLog() { + return isLogRequired; + } + @JsonCreator static HandleFailedEventsOption fromOptionValue(final String option) { return OPTIONS_MAP.get(option.toLowerCase()); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java new file mode 100644 index 0000000000..90a319ad24 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class HandleFailedEventsOptionTest { + @ParameterizedTest + @EnumSource(HandleFailedEventsOption.class) + void fromOptionValue(final HandleFailedEventsOption option) { + assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option)); + + if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) { + assertThat(option.shouldDropEvent(), equalTo(false)); + } else { + assertThat(option.shouldDropEvent(), equalTo(true)); + } + + if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) { + assertThat(option.shouldLog(), equalTo(false)); + } else { + assertThat(option.shouldLog(), equalTo(true)); + } + } +} diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java index 587a482064..ecc2d2d065 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotEmpty; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; public class DropEventProcessorConfig { @JsonProperty("drop_when") diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java index 7e2887e320..8d74ba6efb 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java @@ -7,11 +7,14 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + /** * @since 1.3 * @@ -57,7 +60,10 @@ public boolean isStatementFalseWith(final Event event) { try { return !expressionEvaluator.evaluateConditional(dropWhen, event); } catch (final Exception e) { - return handleFailedEventsSetting.isDropEventOption(event, e, LOG); + if (handleFailedEventsSetting.shouldLog()) { + LOG.warn(EVENT, "An exception occurred while processing when expression for event [{}]", event, e); + } + return handleFailedEventsSetting.shouldDropEvent(); } } diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java index 5cbdf91aad..84669606a3 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java index 3c5694f632..1af2146139 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java index 860b7042ae..a962e37b39 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java @@ -10,6 +10,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.UUID; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java index 98ec1536fc..8b309210db 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java @@ -15,6 +15,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.UUID; import java.util.stream.Stream; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java deleted file mode 100644 index 04b377ad2a..0000000000 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.drop; - -import org.hamcrest.CoreMatchers; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; - -import static org.hamcrest.MatcherAssert.assertThat; - -class HandleFailedEventsOptionTest { - @ParameterizedTest - @EnumSource(HandleFailedEventsOption.class) - void fromOptionValue(final HandleFailedEventsOption option) { - assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option)); - } -} diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index 5125409731..91275eb799 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -10,6 +10,7 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') + implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index ffb0855590..18acb3dfd8 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -11,9 +11,11 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.record.Record; +import io.micrometer.core.instrument.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class); + private static final String PROCESSING_FAILURES = "processingFailures"; private final EventKey source; private final EventKey destination; @@ -40,6 +43,10 @@ public abstract class AbstractParseProcessor extends AbstractProcessor> doExecute(final Collection> recor if(deleteSourceRequested) { event.delete(this.source); } - } catch (final Exception e) { - LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); + } catch (Exception e) { + processingFailuresCounter.increment(); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); + } } } return records; diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java index 5fd5050b3d..f10537bc7c 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.parse; import java.util.List; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; public interface CommonParseConfig { /** @@ -59,4 +60,10 @@ public interface CommonParseConfig { * Defaults to false. */ boolean isDeleteSourceRequested(); + + /** + * An optional setting used to determine how to handle parsing errors. Default is skip, which includes logging the error + * and passing the failed Event downstream to the next processor. + */ + HandleFailedEventsOption getHandleFailedEventsOption(); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java index 9d2677e0be..4bfb88ded6 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java @@ -8,12 +8,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -27,9 +29,14 @@ @DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class) public class ParseIonProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final IonObjectMapper objectMapper = new IonObjectMapper(); + private final Counter parseErrorsCounter; + + private final HandleFailedEventsOption handleFailedEventsOption; + @DataPrepperPluginConstructor public ParseIonProcessor(final PluginMetrics pluginMetrics, final ParseIonProcessorConfig parseIonProcessorConfig, @@ -39,6 +46,9 @@ public ParseIonProcessor(final PluginMetrics pluginMetrics, // Convert Timestamps to ISO-8601 Z strings objectMapper.registerModule(new IonTimestampConverterModule()); + + handleFailedEventsOption = parseIonProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -47,10 +57,16 @@ protected Optional> readValue(String message, Event cont // We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java index fcc2950477..6fad364e17 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -6,8 +6,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.ion; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.List; @@ -38,6 +41,14 @@ public class ParseIonProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with ION processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -78,4 +89,22 @@ boolean isValidDestination() { public boolean isDeleteSourceRequested() { return deleteSource; } + + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java index 637cbdea0d..407b59fab1 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java @@ -8,12 +8,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -27,15 +29,21 @@ @DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class) public class ParseJsonProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final ObjectMapper objectMapper = new ObjectMapper(); + private final HandleFailedEventsOption handleFailedEventsOption; + private final Counter parseErrorsCounter; + @DataPrepperPluginConstructor public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig, final ExpressionEvaluator expressionEvaluator, final EventKeyFactory eventKeyFactory) { super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory); + this.handleFailedEventsOption = parseJsonProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -43,10 +51,16 @@ protected Optional> readValue(String message, Event cont try { return Optional.of(objectMapper.readValue(message, new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid JSON while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid JSON while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_json processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_json processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index 49ff2a5969..b6a1b14a23 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -6,8 +6,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.Objects; @@ -38,6 +41,14 @@ public class ParseJsonProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with JSON processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -71,6 +82,11 @@ public boolean isDeleteSourceRequested() { return deleteSource; } + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; @@ -78,4 +94,17 @@ boolean isValidDestination() { final String trimmedDestination = destination.trim(); return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java index 984a49964a..9dced55355 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java @@ -3,12 +3,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -22,8 +24,12 @@ @DataPrepperPlugin(name = "parse_xml", pluginType =Processor.class, pluginConfigurationType =ParseXmlProcessorConfig.class) public class ParseXmlProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseXmlProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final XmlMapper xmlMapper = new XmlMapper(); + private final Counter parseErrorsCounter; + + private final HandleFailedEventsOption handleFailedEventsOption; @DataPrepperPluginConstructor public ParseXmlProcessor(final PluginMetrics pluginMetrics, @@ -31,6 +37,9 @@ public ParseXmlProcessor(final PluginMetrics pluginMetrics, final ExpressionEvaluator expressionEvaluator, final EventKeyFactory eventKeyFactory) { super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator, eventKeyFactory); + + handleFailedEventsOption = parseXmlProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -38,10 +47,16 @@ protected Optional> readValue(final String message, fina try { return Optional.of(xmlMapper.readValue(message, new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid XML while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid XML while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_xml processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_xml processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java index c90173dc43..f84f2de4b6 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -1,8 +1,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.xml; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.List; @@ -33,6 +36,14 @@ public class ParseXmlProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with XML processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -75,4 +86,22 @@ boolean isValidDestination() { public boolean isDeleteSourceRequested() { return deleteSource; } + + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java index 8c47650c05..1768b701bb 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.List; @@ -29,6 +30,8 @@ public void test_when_defaultParseIonProcessorConfig_then_returns_default_values assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -61,5 +64,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseIonProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseIonProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseIonProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index d7eb14c28a..8bd63c3eec 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.parse.ion; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -16,6 +17,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -33,6 +36,12 @@ public void setup() { when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(this.processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(this.parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } @Override @@ -53,6 +62,10 @@ void test_when_using_ion_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -68,5 +81,9 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java index aa138a0e7e..8d27120b36 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java @@ -7,13 +7,14 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; + +import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; -import java.util.List; - public class ParseJsonProcessorConfigTest { private ParseJsonProcessorConfig createObjectUnderTest() { @@ -30,6 +31,8 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -62,5 +65,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseJsonProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseJsonProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseJsonProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index cf71f2251f..9aac54b23f 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -18,6 +19,7 @@ import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; @@ -34,6 +36,9 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -51,6 +56,15 @@ public class ParseJsonProcessorTest { @Mock protected ExpressionEvaluator expressionEvaluator; + @Mock + protected HandleFailedEventsOption handleFailedEventsOption; + + @Mock + protected Counter processingFailuresCounter; + + @Mock + protected Counter parseErrorsCounter; + protected AbstractParseProcessor parseJsonProcessor; private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); protected final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @@ -64,6 +78,12 @@ public void setup() { when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } protected AbstractParseProcessor createObjectUnderTest() { @@ -86,6 +106,10 @@ void test_when_differentSourceAndDestination_then_processorParsesCorrectly() { assertThat(parsedEvent.containsKey(destination), equalTo(true)); assertThatFirstMapIsSubsetOfSecondMap(data, parsedEvent.get(destination, Map.class)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -104,6 +128,10 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() { assertThatKeyEquals(parsedEvent, source, "value_that_will_overwrite_source"); assertThatKeyEquals(parsedEvent, "key", "value"); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -119,6 +147,10 @@ void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() { final Event parsedEvent = createAndParseMessageEvent(serializedMessage); assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_not_be_overwritten\"}"); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -136,6 +168,10 @@ void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}"); assertThat(parsedEvent.containsKey("key"), equalTo(false)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -149,6 +185,8 @@ void test_when_valueIsEmpty_then_notParsed() { assertThatKeyEquals(parsedEvent, processorConfig.getSource(), serializedMessage); assertThat(parsedEvent.toMap().size(), equalTo(1)); + + verify(parseErrorsCounter).increment(); } @Test @@ -164,6 +202,10 @@ void test_when_deeplyNestedFieldInRoot_then_canReachDeepestLayer() { assertThatKeyEquals(parsedEvent, DEEPLY_NESTED_KEY_NAME, messageMap.get(DEEPLY_NESTED_KEY_NAME)); final String jsonPointerToValue = constructDeeplyNestedJsonPointer(numberOfLayers); assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -183,6 +225,10 @@ void test_when_deeplyNestedFieldInKey_then_canReachDeepestLayer() { final String jsonPointerToValue = destination + constructDeeplyNestedJsonPointer(numberOfLayers); assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -198,6 +244,10 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); final String pointerToFirstElement = key + "/0"; assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -214,6 +264,10 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); final String pointerToFirstElement = key + "/0"; assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -232,6 +286,10 @@ void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() final String pointerToInternalValue = key + "/0/key0"; assertThat(parsedEvent.get(pointerToInternalValue, String.class), equalTo("value0")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -250,6 +308,10 @@ void test_when_nestedJSONArrayOfJSONAndPointer_then_parsedIntoValue() { assertThat(parsedEvent.get("key0", String.class), equalTo("value0")); assertThat(parsedEvent.containsKey("key1"),equalTo(false)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -263,6 +325,10 @@ void test_when_nestedJSONArrayAndIndexPointer_then_parsedIntoArrayAndIndicesAcce assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); assertThat(parsedEvent.get("key.0", String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -283,6 +349,10 @@ void test_when_pointerKeyAlreadyPresentInEvent_then_usesAbsolutePath() { assertThatKeyEquals(parsedEvent, "s3", data.get("s3")); assertThatKeyEquals(parsedEvent, "log.s3", Collections.singletonMap("data", "sample data")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -298,6 +368,10 @@ void test_when_nestedDestinationField_then_writesToNestedDestination() { assertThat(parsedEvent.get(location, String.class), equalTo("value")); assertThat(parsedEvent.get(destination, Map.class), equalTo(data)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -346,10 +420,16 @@ void test_when_condition_skips_processing_when_evaluates_to_false() { assertThat(parsedEvent.toMap(), equalTo(testEvent.getData().toMap())); + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); + } @Test void test_tags_when_json_parse_fails() { + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + final String source = "different_source"; final String destination = "destination_key"; when(processorConfig.getSource()).thenReturn(source); @@ -364,10 +444,14 @@ void test_tags_when_json_parse_fails() { final Event parsedEvent = createAndParseMessageEvent(testEvent); assertTrue(parsedEvent.getMetadata().hasTags(testTags)); + + verify(parseErrorsCounter).increment(); } @Test void when_evaluate_conditional_throws_RuntimeException_events_are_not_dropped() { + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + final String source = "different_source"; final String destination = "destination_key"; when(processorConfig.getSource()).thenReturn(source); @@ -383,6 +467,9 @@ void when_evaluate_conditional_throws_RuntimeException_events_are_not_dropped() final Event parsedEvent = createAndParseMessageEvent(testEvent); assertThat(parsedEvent.toMap(), equalTo(testEvent.getData().toMap())); + + verify(processingFailuresCounter).increment(); + verifyNoInteractions(parseErrorsCounter); } private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java index bab6d6e919..a0ef665124 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.List; @@ -24,6 +25,8 @@ public void test_when_defaultParseXmlProcessorConfig_then_returns_default_values assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -56,5 +59,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseXmlProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseXmlProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseXmlProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 5cd9037e5b..900a7a7bef 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -1,5 +1,9 @@ package org.opensearch.dataprepper.plugins.processor.parse.xml; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,8 +17,10 @@ import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import java.util.Collections; import java.util.HashMap; @@ -24,6 +30,11 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE; @@ -40,6 +51,15 @@ public class ParseXmlProcessorTest { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private Counter processingFailuresCounter; + + @Mock + private Counter parseErrorsCounter; + + @Mock + private HandleFailedEventsOption handleFailedEventsOption; + private AbstractParseProcessor parseXmlProcessor; private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); private final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @@ -49,6 +69,11 @@ public void setup() { when(processorConfig.getSource()).thenReturn(DEFAULT_SOURCE); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } protected AbstractParseProcessor createObjectUnderTest() { @@ -64,6 +89,9 @@ void test_when_using_xml_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); assertThat(parsedEvent.get("age", String.class), equalTo("30")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -80,6 +108,9 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); assertThat(parsedEvent.get("age", String.class), equalTo("30")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -87,6 +118,7 @@ void test_when_using_invalid_xml_tags_correctly() { final String tagOnFailure = UUID.randomUUID().toString(); when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(handleFailedEventsOption.shouldLog()).thenReturn(true); parseXmlProcessor = createObjectUnderTest(); @@ -94,6 +126,31 @@ void test_when_using_invalid_xml_tags_correctly() { final Event parsedEvent = createAndParseMessageEvent(serializedMessage); assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true)); + + verify(parseErrorsCounter).increment(); + verifyNoInteractions(processingFailuresCounter); + } + + @Test + void test_when_object_mapper_throws_other_exception_tags_correctly() throws JsonProcessingException, NoSuchFieldException, IllegalAccessException { + + final String tagOnFailure = UUID.randomUUID().toString(); + when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + + parseXmlProcessor = createObjectUnderTest(); + + final XmlMapper mockMapper = mock(XmlMapper.class); + when(mockMapper.readValue(anyString(), any(TypeReference.class))).thenThrow(IllegalArgumentException.class); + + ReflectivelySetField.setField(ParseXmlProcessor.class, parseXmlProcessor, "xmlMapper", mockMapper); + + final String serializedMessage = "invalidXml"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true)); + + verify(processingFailuresCounter).increment(); } private Event createAndParseMessageEvent(final String message) { From 385d43862879485cc387df50cc7cc0dd123c8dcb Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:19:42 -0700 Subject: [PATCH 4/4] Add support for AWS security lake sink as a bucket selector mode in S3 sink (#4846) * dplive1.yaml Signed-off-by: Kondaka * Delete .github/workflows/static.yml Signed-off-by: Kondaka * Add support for AWS security lake sink as a bucket selector mode in S3 sink Signed-off-by: Kondaka * Fixed tests Signed-off-by: Kondaka * Added javadoc for S3BucketSelector Signed-off-by: Kondaka * Added new tests for KeyGenerator Signed-off-by: Kondaka * Added new tests and fixed style errors Signed-off-by: Kondaka * Addressed review comments Signed-off-by: Kondaka * Fixed test build failure Signed-off-by: Kondaka --------- Signed-off-by: Kondaka --- data-prepper-plugins/s3-sink/build.gradle | 1 + .../plugins/sink/s3/S3SinkServiceIT.java | 6 +- .../plugins/sink/s3/KeyGenerator.java | 5 +- .../sink/s3/PredefinedObjectMetadata.java | 17 ++++ .../plugins/sink/s3/S3BucketSelector.java | 29 ++++++ .../dataprepper/plugins/sink/s3/S3Sink.java | 24 ++++- .../plugins/sink/s3/S3SinkConfig.java | 29 +++++- .../plugins/sink/s3/S3SinkService.java | 2 - .../sink/s3/SecurityLakeBucketSelector.java | 80 ++++++++++++++++ .../s3/SecurityLakeBucketSelectorConfig.java | 44 +++++++++ .../sink/s3/accumulator/BufferFactory.java | 4 +- .../sink/s3/accumulator/BufferUtilities.java | 16 ++-- .../s3/accumulator/CodecBufferFactory.java | 5 +- .../accumulator/CompressionBufferFactory.java | 5 +- .../sink/s3/accumulator/InMemoryBuffer.java | 17 +++- .../s3/accumulator/InMemoryBufferFactory.java | 5 +- .../sink/s3/accumulator/LocalFileBuffer.java | 9 +- .../accumulator/LocalFileBufferFactory.java | 3 + .../accumulator/MultipartBufferFactory.java | 4 + .../sink/s3/grouping/S3GroupIdentifier.java | 5 + .../s3/grouping/S3GroupIdentifierFactory.java | 18 +++- .../sink/s3/grouping/S3GroupManager.java | 2 +- .../plugins/sink/s3/KeyGeneratorTest.java | 66 ++++++++++++- .../s3/SecurityLakeBucketSelectorTest.java | 94 +++++++++++++++++++ .../s3/accumulator/BufferUtilitiesTest.java | 10 +- .../CompressionBufferFactoryTest.java | 16 ++-- .../InMemoryBufferFactoryTest.java | 2 +- .../s3/accumulator/InMemoryBufferTest.java | 16 ++-- .../LocalFileBufferFactoryTest.java | 2 +- .../s3/accumulator/LocalFileBufferTest.java | 2 +- .../S3GroupIdentifierFactoryTest.java | 2 +- .../s3/grouping/S3GroupIdentifierTest.java | 8 +- .../sink/s3/grouping/S3GroupManagerTest.java | 13 +-- 33 files changed, 488 insertions(+), 73 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/PredefinedObjectMetadata.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3BucketSelector.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelector.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorConfig.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorTest.java diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 4ea0a364fd..57198bf274 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:securitylake:2.26.18' implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22' implementation project(':data-prepper-plugins:avro-codecs') implementation libs.avro.core diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index b7bbb1b97d..68c5ffb9cd 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -199,7 +199,7 @@ void verify_flushed_object_count_into_s3_bucket() { void configureNewLineCodec() { codec = new NdjsonOutputCodec(ndjsonOutputConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); + keyGenerator = new KeyGenerator(s3SinkConfig, null, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); } @Test @@ -272,7 +272,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx private S3SinkService createObjectUnderTest() { OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); - final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, null); s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient, bucketOwnerProvider); return new S3SinkService(s3SinkConfig, codecContext, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); @@ -389,7 +389,7 @@ private void configureParquetCodec() { parquetOutputCodecConfig = new ParquetOutputCodecConfig(); parquetOutputCodecConfig.setSchema(parseSchema().toString()); codec = new ParquetOutputCodec(parquetOutputCodecConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); + keyGenerator = new KeyGenerator(s3SinkConfig, null, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); } private Collection> getRecordList() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java index 7a86687684..7a742d0217 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java @@ -12,14 +12,17 @@ public class KeyGenerator { private final S3SinkConfig s3SinkConfig; + private final S3BucketSelector s3BucketSelector; private final ExtensionProvider extensionProvider; private final ExpressionEvaluator expressionEvaluator; public KeyGenerator(final S3SinkConfig s3SinkConfig, + final S3BucketSelector s3BucketSelector, final ExtensionProvider extensionProvider, final ExpressionEvaluator expressionEvaluator) { this.s3SinkConfig = s3SinkConfig; + this.s3BucketSelector = s3BucketSelector; this.extensionProvider = extensionProvider; this.expressionEvaluator = expressionEvaluator; } @@ -30,7 +33,7 @@ public KeyGenerator(final S3SinkConfig s3SinkConfig, * @return object key path. */ public String generateKeyForEvent(final Event event) { - final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator); + final String pathPrefix = s3BucketSelector != null ? s3BucketSelector.getPathPrefix() : ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator); final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension(), event, expressionEvaluator); return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/PredefinedObjectMetadata.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/PredefinedObjectMetadata.java new file mode 100644 index 0000000000..3e24c07ddd --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/PredefinedObjectMetadata.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; +public class PredefinedObjectMetadata { + @JsonProperty("number_of_objects") + private String numberOfObjects; + + public String getNumberOfObjects() { + return numberOfObjects; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3BucketSelector.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3BucketSelector.java new file mode 100644 index 0000000000..e48314e447 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3BucketSelector.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +public interface S3BucketSelector { + /** + * initialize - initializes the selector + * @param s3SinkConfig - s3 sink configuration + */ + void initialize(S3SinkConfig s3SinkConfig); + + /** + * getBucketName - returns the name of the bucket created by the bucket selector + * + * @return - bucket name + */ + String getBucketName(); + + /** + * getPathPrefix - returns the prefix to be used for the objects created in the bucket + * + * @return path prefix + */ + String getPathPrefix(); +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 4aa2898476..c4c88dc323 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -94,7 +94,17 @@ public S3Sink(final PluginSetting pluginSetting, bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec); ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption); - KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator); + String bucketName; + S3BucketSelector s3BucketSelector = null; + if (s3SinkConfig.getBucketSelector() != null) { + s3BucketSelector = loadS3BucketSelector(pluginFactory); + s3BucketSelector.initialize(s3SinkConfig); + bucketName = s3BucketSelector.getBucketName(); + } else { + bucketName = s3SinkConfig.getBucketName(); + } + + KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, s3BucketSelector, extensionProvider, expressionEvaluator); if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null && !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix())) { @@ -106,8 +116,8 @@ public S3Sink(final PluginSetting pluginSetting, throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression"); } - if (s3SinkConfig.getBucketName() != null && - !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getBucketName())) { + if (bucketName != null && + !expressionEvaluator.isValidFormatExpression(bucketName)) { throw new InvalidPluginConfigurationException("bucket name is not a valid format expression"); } @@ -115,13 +125,19 @@ public S3Sink(final PluginSetting pluginSetting, testCodec.validateAgainstCodecContext(s3OutputCodecContext); - final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, s3BucketSelector); final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider); s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager); } + private S3BucketSelector loadS3BucketSelector(PluginFactory pluginFactory) { + final PluginModel modeConfiguration = s3SinkConfig.getBucketSelector(); + final PluginSetting modePluginSetting = new PluginSetting(modeConfiguration.getPluginName(), modeConfiguration.getPluginSettings()); + return pluginFactory.loadPlugin(S3BucketSelector.class, modePluginSetting); + } + @Override public boolean isReady() { return sinkInitialized; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index eb12f0790b..71e523e5f6 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -6,8 +6,8 @@ package org.opensearch.dataprepper.plugins.sink.s3; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.aws.validator.AwsAccountId; @@ -36,10 +36,21 @@ public class S3SinkConfig { private AwsAuthenticationOptions awsAuthenticationOptions; @JsonProperty("bucket") - @NotEmpty @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucketName; + @JsonProperty("bucket_selector") + private PluginModel bucketSelector; + + @JsonProperty("predefined_object_metadata") + private PredefinedObjectMetadata predefinedObjectMetadata; + + @AssertTrue(message = "You may not use both bucket and bucket_selector together in one S3 sink.") + private boolean isValidBucketConfig() { + return (bucketName != null && bucketSelector == null) || + (bucketName == null && bucketSelector != null); + } + /** * The default bucket to send to if using a dynamic bucket name and failures occur * for any reason when sending to a dynamic bucket @@ -127,6 +138,18 @@ public ObjectKeyOptions getObjectKeyOptions() { return objectKeyOptions; } + public PredefinedObjectMetadata getPredefinedObjectMetadata() { + return predefinedObjectMetadata; + } + + /** + * Bucket selector configuration options. + * @return bucketSelector plugin model. + */ + public PluginModel getBucketSelector() { + return bucketSelector; + } + /** * Sink codec configuration Options. * @return codec plugin model. @@ -172,4 +195,4 @@ public Map getBucketOwners() { public String getDefaultBucketOwner() { return defaultBucketOwner; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index c0b7c18db5..571a952f01 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -52,7 +52,6 @@ public class S3SinkService { private final int maxEvents; private final ByteCount maxBytes; private final Duration maxCollectionDuration; - private final String bucket; private final int maxRetries; private final Counter objectsSucceededCounter; private final Counter objectsFailedCounter; @@ -84,7 +83,6 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize(); maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut(); - bucket = s3SinkConfig.getBucketName(); maxRetries = s3SinkConfig.getMaxUploadRetries(); objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelector.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelector.java new file mode 100644 index 0000000000..3fee07fabb --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelector.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import org.apache.commons.lang3.RandomStringUtils; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import software.amazon.awssdk.services.securitylake.SecurityLakeClient; +import software.amazon.awssdk.services.securitylake.model.AwsIdentity; +import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceRequest; +import software.amazon.awssdk.services.securitylake.model.CustomLogSourceProvider; +import software.amazon.awssdk.services.securitylake.model.CustomLogSourceConfiguration; +import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceResponse; +import software.amazon.awssdk.services.securitylake.model.CustomLogSourceCrawlerConfiguration; + +import java.time.LocalDate; +import java.util.List; + +@DataPrepperPlugin(name = "aws_security_lake", pluginType = S3BucketSelector.class, pluginConfigurationType = SecurityLakeBucketSelectorConfig.class) +public class SecurityLakeBucketSelector implements S3BucketSelector { + private static final String EXT_PATH = "/ext/"; + private final SecurityLakeBucketSelectorConfig securityLakeBucketSelectorConfig; + + private S3SinkConfig s3SinkConfig; + + private String pathPrefix; + + private String sourceLocation; + + @DataPrepperPluginConstructor + public SecurityLakeBucketSelector(final SecurityLakeBucketSelectorConfig securityLakeBucketSelectorConfig) { + this.securityLakeBucketSelectorConfig = securityLakeBucketSelectorConfig; + } + + public void initialize(S3SinkConfig s3SinkConfig) { + this.s3SinkConfig = s3SinkConfig; + SecurityLakeClient securityLakeClient = SecurityLakeClient.create(); + String arn = s3SinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(); + String principal = arn.split(":")[4]; + String sourceName = securityLakeBucketSelectorConfig.getSourceName() != null ? securityLakeBucketSelectorConfig.getSourceName() : RandomStringUtils.randomAlphabetic(7); + CreateCustomLogSourceResponse response = + securityLakeClient.createCustomLogSource( + CreateCustomLogSourceRequest.builder() + .sourceName(sourceName+RandomStringUtils.randomAlphabetic(4)) + .eventClasses(List.of(securityLakeBucketSelectorConfig.getLogClass())) + .sourceVersion(securityLakeBucketSelectorConfig.getSourceVersion()) + .configuration(CustomLogSourceConfiguration.builder() + .crawlerConfiguration(CustomLogSourceCrawlerConfiguration.builder() + .roleArn(arn) + .build()) + .providerIdentity(AwsIdentity.builder() + .externalId(securityLakeBucketSelectorConfig.getExternalId()) + .principal(principal) + .build()) + .build()) + .build()); + CustomLogSourceProvider provider = response.source().provider(); + this.sourceLocation = provider.location(); + final String region=s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion().toString(); + final String accountId=arn.split(":")[4]; + + final LocalDate now = LocalDate.now(); + final String eventDay = String.format("%d%02d%02d", now.getYear(), now.getMonthValue(), now.getDayOfMonth()); + int locIndex = sourceLocation.indexOf(EXT_PATH); + pathPrefix = String.format("%sregion=%s/accountId=%s/eventDay=%s/",sourceLocation.substring(locIndex+1), region, accountId, eventDay); + } + + public String getPathPrefix() { + return pathPrefix; + } + + @Override + public String getBucketName() { + int locIndex = sourceLocation.indexOf(EXT_PATH); + return sourceLocation.substring(EXT_PATH.length(), locIndex); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorConfig.java new file mode 100644 index 0000000000..b4ff100020 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorConfig.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SecurityLakeBucketSelectorConfig { + static final String DEFAULT_SOURCE_VERSION = "1.0"; + + static final String DEFAULT_EXTERNAL_ID = "extid"; + + @JsonProperty("source_name") + private String sourceName; + + @JsonProperty("source_version") + private String sourceVersion = DEFAULT_SOURCE_VERSION; + + @JsonProperty("external_id") + private String externalId = DEFAULT_EXTERNAL_ID; + + @JsonProperty("log_class") + private String logClass; + + + public String getSourceName() { + return sourceName; + } + + public String getSourceVersion() { + return sourceVersion; + } + + public String getExternalId() { + return externalId; + } + + public String getLogClass() { + return logClass; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java index 84ad85fdd8..05a5a2425b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java @@ -8,8 +8,10 @@ import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; public interface BufferFactory { - Buffer getBuffer(S3AsyncClient s3Client, Supplier bucketSupplier, Supplier keySupplier, String defaultBucket, BucketOwnerProvider bucketOwnerProvider); + Buffer getBuffer(S3AsyncClient s3Client, Supplier bucketSupplier, Supplier keySupplier, String defaultBucket, Function> metadataSupplier, BucketOwnerProvider bucketOwnerProvider); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java index 9fd051b3d5..7af1f8e764 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java @@ -14,6 +14,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -32,15 +33,18 @@ static CompletableFuture putObjectOrSendToDefaultBucket(final final String objectKey, final String targetBucket, final String defaultBucket, + final Map objectMetadata, final BucketOwnerProvider bucketOwnerProvider) { final boolean[] defaultBucketAttempted = new boolean[1]; - return s3Client.putObject( - PutObjectRequest.builder() - .bucket(targetBucket) - .key(objectKey) - .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null)) - .build(), requestBody) + PutObjectRequest.Builder builder = PutObjectRequest.builder() + .bucket(targetBucket) + .key(objectKey) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null)); + if (objectMetadata != null) { + builder = builder.metadata(objectMetadata); + } + return s3Client.putObject(builder.build(), requestBody) .handle((result, ex) -> { if (ex != null) { runOnFailure.accept(ex); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java index d263926849..8ed5f32d64 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java @@ -4,7 +4,9 @@ import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; public class CodecBufferFactory implements BufferFactory { private final BufferFactory innerBufferFactory; @@ -20,8 +22,9 @@ public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, final String defaultBucket, + final Function> metadataSupplier, final BucketOwnerProvider bucketOwnerProvider) { - Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, metadataSupplier, bucketOwnerProvider); return new CodecBuffer(innerBuffer, bufferedCodec); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java index f79cdd0779..891c1327a7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -11,7 +11,9 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Objects; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; public class CompressionBufferFactory implements BufferFactory { private final BufferFactory innerBufferFactory; @@ -31,8 +33,9 @@ public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, final String defaultBucket, + final Function> metadataSupplier, final BucketOwnerProvider bucketOwnerProvider) { - final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, metadataSupplier, bucketOwnerProvider); if(compressionInternal) return internalBuffer; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index 5334f42313..624ade0809 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -13,11 +13,13 @@ import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.function.Function; /** * A buffer can hold in memory data and flushing it to S3. @@ -29,6 +31,7 @@ public class InMemoryBuffer implements Buffer { private final S3AsyncClient s3Client; private final Supplier bucketSupplier; private final Supplier keySupplier; + private final Function> metadataSupplier; private final BucketOwnerProvider bucketOwnerProvider; private int eventCount; @@ -42,11 +45,13 @@ public class InMemoryBuffer implements Buffer { InMemoryBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, + final Function> metadataSupplier, final String defaultBucket, final BucketOwnerProvider bucketOwnerProvider) { this.s3Client = s3Client; this.bucketSupplier = bucketSupplier; this.keySupplier = keySupplier; + this.metadataSupplier = metadataSupplier; byteArrayOutputStream.reset(); eventCount = 0; watch = new StopWatch(); @@ -78,7 +83,7 @@ public Optional> flushToS3(final Consumer consumeO final byte[] byteArray = byteArrayOutputStream.toByteArray(); return Optional.ofNullable(BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromBytes(byteArray), consumeOnCompletion, consumeOnException, - getKey(), getBucket(), defaultBucket, bucketOwnerProvider)); + getKey(), getBucket(), defaultBucket, getMetadata(getEventCount()), bucketOwnerProvider)); } private String getBucket() { @@ -87,6 +92,14 @@ private String getBucket() { return bucket; } + private Map getMetadata(int eventCount) { + if (metadataSupplier != null) { + return metadataSupplier.apply(getEventCount()); + } else { + return null; + } + } + @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; @@ -103,4 +116,4 @@ public String getKey() { public OutputStream getOutputStream() { return byteArrayPositionOutputStream; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java index 8e9cb8c7d9..2606a0a280 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java @@ -8,7 +8,9 @@ import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; public class InMemoryBufferFactory implements BufferFactory { @Override @@ -16,7 +18,8 @@ public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, final String defaultBucket, + final Function> metadataSupplier, final BucketOwnerProvider bucketOwnerProvider) { - return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, metadataSupplier, defaultBucket, bucketOwnerProvider); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index eec6c77996..8fdee31c03 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -22,6 +22,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -98,7 +99,7 @@ public Optional> flushToS3(final Consumer consumeO final CompletableFuture putObjectResponseCompletableFuture = BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromFile(localFile), consumeOnCompletion, consumeOnException, - getKey(), getBucket(), defaultBucket, bucketOwnerProvider) + getKey(), getBucket(), defaultBucket, null, bucketOwnerProvider) .whenComplete(((response, throwable) -> removeTemporaryFile())); return Optional.of(putObjectResponseCompletableFuture); } @@ -138,6 +139,10 @@ public OutputStream getOutputStream() { return outputStream; } + private Map getMetadata() { + return null; + } + private String getBucket() { if(bucket == null) @@ -151,4 +156,4 @@ public String getKey() { key = keySupplier.get(); return key; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java index b3eb3caf42..1f1176b3c2 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java @@ -12,7 +12,9 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; public class LocalFileBufferFactory implements BufferFactory { @@ -25,6 +27,7 @@ public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, final String defaultBucket, + final Function> metadataSupplier, final BucketOwnerProvider bucketOwnerProvider) { File tempFile = null; Buffer localfileBuffer = null; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java index 55d8cec616..ae10c45bab 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java @@ -9,7 +9,10 @@ import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; +import java.util.Map; import java.util.function.Supplier; +import java.util.function.Function; + public class MultipartBufferFactory implements BufferFactory { @Override @@ -17,6 +20,7 @@ public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, final String defaultBucket, + final Function> metadataSupplier, final BucketOwnerProvider bucketOwnerProvider) { return new MultipartBuffer(new S3OutputStream(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider)); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java index 170a2426dd..52fa2578fd 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.grouping; +import org.opensearch.dataprepper.plugins.sink.s3.PredefinedObjectMetadata; import java.util.Map; import java.util.Objects; @@ -12,13 +13,16 @@ class S3GroupIdentifier { private final Map groupIdentifierHash; private final String groupIdentifierFullObjectKey; + private final PredefinedObjectMetadata predefinedObjectMetadata; private final String fullBucketName; public S3GroupIdentifier(final Map groupIdentifierHash, final String groupIdentifierFullObjectKey, + final PredefinedObjectMetadata predefineObjectMetadata, final String fullBucketName) { this.groupIdentifierHash = groupIdentifierHash; this.groupIdentifierFullObjectKey = groupIdentifierFullObjectKey; + this.predefinedObjectMetadata = predefineObjectMetadata; this.fullBucketName = fullBucketName; } @@ -39,5 +43,6 @@ public int hashCode() { public Map getGroupIdentifierHash() { return groupIdentifierHash; } + public Map getMetadata(int eventCount) { return predefinedObjectMetadata != null ? Map.of(predefinedObjectMetadata.getNumberOfObjects(), Integer.toString(eventCount)) : null; } public String getFullBucketName() { return fullBucketName; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java index 5ec264616a..89315d95a1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.KeyGenerator; import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.s3.S3BucketSelector; import java.util.HashMap; import java.util.List; @@ -25,30 +26,36 @@ public class S3GroupIdentifierFactory { private final ExpressionEvaluator expressionEvaluator; private final S3SinkConfig s3SinkConfig; + private final S3BucketSelector s3BucketSelector; private static final String BUCKET_NAME_REPLACEMENT_FOR_NON_EXISTING_KEYS = ""; public S3GroupIdentifierFactory(final KeyGenerator keyGenerator, final ExpressionEvaluator expressionEvaluator, - final S3SinkConfig s3SinkConfig) { + final S3SinkConfig s3SinkConfig, + final S3BucketSelector s3BucketSelector) { this.keyGenerator = keyGenerator; this.expressionEvaluator = expressionEvaluator; this.s3SinkConfig = s3SinkConfig; + this.s3BucketSelector = s3BucketSelector; dynamicExpressions = expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); - dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getBucketName())); + if (s3BucketSelector == null) + dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getBucketName())); dynamicEventsKeys = expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); - dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getBucketName())); + if (s3BucketSelector == null) + dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getBucketName())); } public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { final String fullObjectKey = keyGenerator.generateKeyForEvent(event); - final String fullBucketName = event.formatString(s3SinkConfig.getBucketName(), expressionEvaluator, BUCKET_NAME_REPLACEMENT_FOR_NON_EXISTING_KEYS); + final String fullBucketName = s3BucketSelector != null ? s3BucketSelector.getBucketName() : + event.formatString(s3SinkConfig.getBucketName(), expressionEvaluator, BUCKET_NAME_REPLACEMENT_FOR_NON_EXISTING_KEYS); final Map groupIdentificationHash = new HashMap<>(); @@ -62,6 +69,7 @@ public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { groupIdentificationHash.put(expression, value); } - return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, fullBucketName); + + return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, s3SinkConfig.getPredefinedObjectMetadata(), fullBucketName); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java index beae9ed157..5ac8adc325 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -79,7 +79,7 @@ public S3Group getOrCreateGroupForEvent(final Event event) { if (allGroups.containsKey(s3GroupIdentifier)) { return allGroups.get(s3GroupIdentifier); } else { - final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket(), bucketOwnerProvider); + final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket(), s3GroupIdentifier::getMetadata, bucketOwnerProvider); final OutputCodec outputCodec = codecFactory.provideCodec(); final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec); allGroups.put(s3GroupIdentifier, s3Group); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java index 64189cd939..edd8b18953 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -41,8 +42,8 @@ void setUp() { } - private KeyGenerator createObjectUnderTest() { - return new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator); + private KeyGenerator createObjectUnderTest(S3BucketSelector s3BucketSelector) { + return new KeyGenerator(s3SinkConfig, s3BucketSelector, extensionProvider, expressionEvaluator); } @Test @@ -51,7 +52,7 @@ void test_generateKey_with_date_prefix() { final String objectName = UUID.randomUUID().toString(); when(extensionProvider.getExtension()).thenReturn(null); - final KeyGenerator objectUnderTest = createObjectUnderTest(); + final KeyGenerator objectUnderTest = createObjectUnderTest(null); final Event event = mock(Event.class); @@ -78,7 +79,7 @@ void generateKey_with_extension() { String pathPrefix = "events/"; final Event event = mock(Event.class); - final KeyGenerator objectUnderTest = createObjectUnderTest(); + final KeyGenerator objectUnderTest = createObjectUnderTest(null); try (final MockedStatic objectKeyMockedStatic = mockStatic(ObjectKey.class)) { objectKeyMockedStatic.when(() -> ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator)) @@ -93,4 +94,59 @@ void generateKey_with_extension() { } } -} \ No newline at end of file + + @Test + void test_generateKey_with_date_prefix_with_bucketSelector() { + String objectKeyPathPrefix = "logdata/"; + final String objectName = UUID.randomUUID().toString(); + when(extensionProvider.getExtension()).thenReturn(null); + + String pathPrefix = RandomStringUtils.randomAlphabetic(5); + S3BucketSelector s3BucketSelector = mock(S3BucketSelector.class); + when(s3BucketSelector.getPathPrefix()).thenReturn(pathPrefix); + final KeyGenerator objectUnderTest = createObjectUnderTest(s3BucketSelector); + + final Event event = mock(Event.class); + + try (final MockedStatic objectKeyMockedStatic = mockStatic(ObjectKey.class)) { + + objectKeyMockedStatic.when(() -> ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator)) + .thenReturn(objectKeyPathPrefix); + objectKeyMockedStatic.when(() -> ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator)) + .thenReturn(objectName); + + String key = objectUnderTest.generateKeyForEvent(event); + assertNotNull(key); + assertThat(key, true); + assertThat(key.contains(pathPrefix), equalTo(true)); + assertThat(key.contains(objectName), equalTo(true)); + } + } + + @Test + void generateKey_with_extension_with_bucketSelector() { + String extension = UUID.randomUUID().toString(); + final String objectName = UUID.randomUUID().toString(); + when(extensionProvider.getExtension()).thenReturn(extension); + String objectKeyPathPrefix = "events/"; + + final Event event = mock(Event.class); + String pathPrefix = RandomStringUtils.randomAlphabetic(5); + S3BucketSelector s3BucketSelector = mock(S3BucketSelector.class); + when(s3BucketSelector.getPathPrefix()).thenReturn(pathPrefix); + final KeyGenerator objectUnderTest = createObjectUnderTest(s3BucketSelector); + try (final MockedStatic objectKeyMockedStatic = mockStatic(ObjectKey.class)) { + + objectKeyMockedStatic.when(() -> ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator)) + .thenReturn(objectKeyPathPrefix); + objectKeyMockedStatic.when(() -> ObjectKey.objectFileName(s3SinkConfig, extension, event, expressionEvaluator)) + .thenReturn(objectName); + + String key = objectUnderTest.generateKeyForEvent(event); + assertThat(key, notNullValue()); + assertThat(key.contains(pathPrefix), equalTo(true)); + assertThat(key.contains(objectName), equalTo(true)); + } + + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorTest.java new file mode 100644 index 0000000000..d250c3481a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/SecurityLakeBucketSelectorTest.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockedStatic; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; + + +import software.amazon.awssdk.regions.Region; +import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.services.securitylake.SecurityLakeClient; +import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceRequest; +import software.amazon.awssdk.services.securitylake.model.CustomLogSourceProvider; +import software.amazon.awssdk.services.securitylake.model.CustomLogSourceResource; +import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceResponse; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +public class SecurityLakeBucketSelectorTest { + + @Mock + SecurityLakeBucketSelectorConfig securityLakeBucketSelectorConfig; + + @Mock + S3SinkConfig s3SinkConfig; + + @Mock + AwsAuthenticationOptions awsOptions; + + static final String accountId = "123456789123"; + + static final String regionStr = "us-west-2"; + + @BeforeEach + void setup() { + s3SinkConfig = mock(S3SinkConfig.class); + securityLakeBucketSelectorConfig = mock(SecurityLakeBucketSelectorConfig.class); + awsOptions = mock(AwsAuthenticationOptions.class); + when(awsOptions.getAwsRegion()).thenReturn(Region.of(regionStr)); + when(awsOptions.getAwsStsRoleArn()).thenReturn("arn:aws:iam::"+accountId+":role/Admin"); + when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsOptions); + when(securityLakeBucketSelectorConfig.getSourceName()).thenReturn(RandomStringUtils.randomAlphabetic(5)); + when(securityLakeBucketSelectorConfig.getLogClass()).thenReturn(RandomStringUtils.randomAlphabetic(5)); + when(securityLakeBucketSelectorConfig.getSourceVersion()).thenReturn(RandomStringUtils.randomAlphabetic(5)); + when(securityLakeBucketSelectorConfig.getExternalId()).thenReturn(RandomStringUtils.randomAlphabetic(5)); + } + + private SecurityLakeBucketSelector createObjectUnderTest() { + return new SecurityLakeBucketSelector(securityLakeBucketSelectorConfig); + } + + @Test + public void test_securityLakeBucketSelector() { + final SecurityLakeClient securityLakeClient = mock(SecurityLakeClient.class); + CreateCustomLogSourceRequest createCustomLogSourceRequest = mock(CreateCustomLogSourceRequest.class); + CreateCustomLogSourceResponse createCustomLogSourceResponse = mock(CreateCustomLogSourceResponse.class); + CustomLogSourceResource customLogSourceResource = mock(CustomLogSourceResource.class); + CustomLogSourceProvider customLogSourceProvider = mock(CustomLogSourceProvider.class); + when(createCustomLogSourceResponse.source()).thenReturn(customLogSourceResource); + when(customLogSourceResource.provider()).thenReturn(customLogSourceProvider); + String testLocation = "/aws/bucket1/ext/location1/"; + when(customLogSourceProvider.location()).thenReturn(testLocation); + when(securityLakeClient.createCustomLogSource(any(CreateCustomLogSourceRequest.class))).thenReturn(createCustomLogSourceResponse); + try (final MockedStatic securityLakeClientMockedStatic = mockStatic(SecurityLakeClient.class)) { + securityLakeClientMockedStatic.when(() -> SecurityLakeClient.create()) + .thenReturn(securityLakeClient); + SecurityLakeBucketSelector securityLakeBucketSelector = createObjectUnderTest(); + securityLakeBucketSelector.initialize(s3SinkConfig); + assertThat(securityLakeBucketSelector.getBucketName(), equalTo("bucket1")); + int index = testLocation.indexOf("/ext/"); + LocalDate today = LocalDate.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); + String formattedDate = today.format(formatter); + String expectedPathPrefix = testLocation.substring(index+1)+"region="+regionStr+"/accountId="+accountId+"/eventDay="+formattedDate+"/"; + assertThat(securityLakeBucketSelector.getPathPrefix(), equalTo(expectedPathPrefix)); + } + } +} + + diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java index 6107e75205..a3ce02e667 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java @@ -80,7 +80,7 @@ void putObjectOrSendToDefaultBucket_with_no_exception_sends_to_target_bucket() { when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(successfulFuture); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider).join(); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, null, bucketOwnerProvider).join(); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(1)).putObject(argumentCaptor.capture(), eq(requestBody)); @@ -101,7 +101,7 @@ void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_defau final CompletableFuture failedFuture = CompletableFuture.failedFuture(NoSuchBucketException.builder().build()); when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, null, bucketOwnerProvider).join(); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, null, null, bucketOwnerProvider).join(); verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); verify(mockRunOnCompletion).accept(false); @@ -115,7 +115,7 @@ void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_o when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture); BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, - defaultBucketEnabled ? defaultBucket : null, bucketOwnerProvider); + defaultBucketEnabled ? defaultBucket : null, null, bucketOwnerProvider); verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); verify(mockRunOnCompletion).accept(false); @@ -130,7 +130,7 @@ void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_or_access_denied_ when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture).thenReturn(successfulFuture); when(bucketOwnerProvider.getBucketOwner(anyString())).thenReturn(Optional.empty()); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, null, bucketOwnerProvider); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); @@ -168,7 +168,7 @@ void putObject_failing_to_send_to_bucket_and_default_bucket_completes_as_expecte when(bucketOwnerProvider.getBucketOwner(targetBucket)).thenReturn(Optional.of(bucketOwner)); when(bucketOwnerProvider.getBucketOwner(defaultBucket)).thenReturn(Optional.of(defaultBucketOwner)); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, null, bucketOwnerProvider); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java index d9c1384ae7..d07a1e71a5 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -91,21 +91,21 @@ class WithBuffer { @BeforeEach void setUp() { - when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider)).thenReturn(innerBuffer); + when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider)).thenReturn(innerBuffer); } @Test void getBuffer_returns_CompressionBuffer() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider); assertThat(buffer, instanceOf(CompressionBuffer.class)); } @Test void getBuffer_returns_new_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider); - assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider), not(equalTo(firstBuffer))); + assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider), not(equalTo(firstBuffer))); } @Nested @@ -117,17 +117,17 @@ void setUp() { @Test void getBuffer_returns_innerBuffer_directly() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider); assertThat(buffer, sameInstance(innerBuffer)); } @Test void getBuffer_calls_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, null, bucketOwnerProvider); - verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket,null, bucketOwnerProvider); } } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java index 33cca9dd99..945717520c 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java @@ -22,7 +22,7 @@ void test_inMemoryBufferFactory_notNull(){ void test_buffer_notNull(){ InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); Assertions.assertNotNull(inMemoryBufferFactory); - Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null, null, null); + Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(Buffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java index ea42dafd27..59677f2f59 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java @@ -65,7 +65,7 @@ class InMemoryBufferTest { @Test void test_with_write_event_into_buffer() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -93,7 +93,7 @@ void test_with_write_event_into_buffer() throws IOException { */ void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { Instant startTime = Instant.now(); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); Instant endTime = Instant.now(); @@ -122,7 +122,7 @@ void test_flush_to_s3_success() { when(keySupplier.get()).thenReturn(key); when(bucketSupplier.get()).thenReturn(bucket); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); Assertions.assertNotNull(inMemoryBuffer); final CompletableFuture expectedFuture = mock(CompletableFuture.class); @@ -130,7 +130,7 @@ void test_flush_to_s3_success() { try (final MockedStatic bufferUtilitiesMockedStatic = mockStatic(BufferUtilities.class)) { bufferUtilitiesMockedStatic.when(() -> BufferUtilities.putObjectOrSendToDefaultBucket(eq(s3Client), any(AsyncRequestBody.class), - eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(key), eq(bucket), eq(null), eq(bucketOwnerProvider))) + eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(key), eq(bucket), eq(null), eq(null), eq(bucketOwnerProvider))) .thenReturn(expectedFuture); final Optional> result = inMemoryBuffer.flushToS3(mockRunOnCompletion, mockRunOnFailure); @@ -143,14 +143,14 @@ void test_flush_to_s3_success() { @Test void getOutputStream_is_PositionOutputStream() { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); assertThat(inMemoryBuffer.getOutputStream(), instanceOf(PositionOutputStream.class)); } @Test void getOutputStream_getPos_equals_written_size() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -167,7 +167,7 @@ void getOutputStream_getPos_equals_written_size() throws IOException { @Test void getSize_across_multiple_in_sequence() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -177,7 +177,7 @@ void getSize_across_multiple_in_sequence() throws IOException { } assertThat(inMemoryBuffer.getSize(), equalTo((long) MAX_EVENTS * 1000)); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, null, bucketOwnerProvider); assertThat(inMemoryBuffer.getSize(), equalTo(0L)); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java index fb45e781aa..f19c42a433 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java @@ -21,7 +21,7 @@ void test_localFileBufferFactory_notNull() { void test_buffer_notNull() { LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); Assertions.assertNotNull(localFileBufferFactory); - Buffer buffer = localFileBufferFactory.getBuffer(null, null, null, null, null); + Buffer buffer = localFileBufferFactory.getBuffer(null, null, null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(LocalFileBuffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java index fcca931caa..bb83988e38 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java @@ -123,7 +123,7 @@ void test_with_write_events_into_buffer_and_flush_toS3() throws IOException { try (final MockedStatic bufferUtilitiesMockedStatic = mockStatic(BufferUtilities.class)) { bufferUtilitiesMockedStatic.when(() -> BufferUtilities.putObjectOrSendToDefaultBucket(eq(s3Client), any(AsyncRequestBody.class), - eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(KEY), eq(BUCKET_NAME), eq(defaultBucket), eq(bucketOwnerProvider))) + eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(KEY), eq(BUCKET_NAME), eq(defaultBucket), eq(null), eq(bucketOwnerProvider))) .thenReturn(expectedFuture); final Optional> result = localFileBuffer.flushToS3(mockRunOnCompletion, mockRunOnFailure); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java index e76f8ecaaf..0a8bf72645 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java @@ -48,7 +48,7 @@ void setup() { } private S3GroupIdentifierFactory createObjectUnderTest() { - return new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + return new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, null); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java index b9d22b8c12..5ef5c3a294 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java @@ -24,8 +24,8 @@ void S3GroupIdentifier_with_the_same_identificationHash_and_different_fullObject final String groupTwoFullObjectKey = UUID.randomUUID().toString(); final String fullBucketName = UUID.randomUUID().toString(); - final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupOneFullObjectKey, fullBucketName); - final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupTwoFullObjectKey, fullBucketName); + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupOneFullObjectKey, null, fullBucketName); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupTwoFullObjectKey, null, fullBucketName); assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(true)); assertThat(s3GroupIdentifier.hashCode(), equalTo(seconds3GroupIdentifier.hashCode())); @@ -39,8 +39,8 @@ void S3GroupIdentifier_with_different_identificationHash_is_not_considered_equal final String groupTwoFullObjectKey = UUID.randomUUID().toString(); final String fullBucketName = UUID.randomUUID().toString(); - final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHashOne, groupOneFullObjectKey, fullBucketName); - final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHashTwo, groupTwoFullObjectKey, fullBucketName); + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHashOne, groupOneFullObjectKey, null, fullBucketName); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHashTwo, groupTwoFullObjectKey, null, fullBucketName); assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(false)); assertNotEquals(s3GroupIdentifier.hashCode(), seconds3GroupIdentifier.hashCode()); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java index 545b6feb77..8449738594 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.UUID; import java.util.function.Supplier; +import java.util.function.Function; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -55,7 +56,7 @@ public class S3GroupManagerTest { private BucketOwnerProvider bucketOwnerProvider; private S3GroupManager createObjectUnderTest() { - return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider); + return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider); } @Test @@ -74,7 +75,7 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), any(Function.class), eq(bucketOwnerProvider))) .thenAnswer(invocation -> { Supplier bucketSupplier = invocation.getArgument(1); Supplier objectKeySupplier = invocation.getArgument(2); @@ -116,7 +117,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { final Buffer buffer = mock(Buffer.class); final OutputCodec outputCodec = mock(OutputCodec.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), any(Function.class), eq(bucketOwnerProvider))) .thenReturn(buffer); when(codecFactory.provideCodec()).thenReturn(outputCodec); @@ -137,7 +138,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { assertThat(secondResult.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); assertThat(secondResult.getBuffer(), equalTo(buffer)); - verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider)); + verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), any(Function.class), eq(bucketOwnerProvider)); final Collection groups = objectUnderTest.getS3GroupEntries(); assertThat(groups, notNullValue()); @@ -177,7 +178,7 @@ void recalculateAndGetGroupSize_returns_expected_size() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), any(Function.class), eq(bucketOwnerProvider))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec outputCodec = mock(OutputCodec.class); @@ -223,7 +224,7 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), any(Function.class), eq(bucketOwnerProvider))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec firstOutputCodec = mock(OutputCodec.class);