diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 54c084c0f8..b6d5905f4a 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,2 @@ # This should match the owning team set up in https://github.com/orgs/opensearch-project/teams -* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh \ No newline at end of file +* @sb2k16 @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh \ No newline at end of file diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 6b3cd83eb1..d52c15f225 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -6,6 +6,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Maintainer | GitHub ID | Affiliation | | -------------------- | --------------------------------------------------------- | ----------- | +| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon | | Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon | | Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon | | Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon | diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index efd36e123d..6093ce9c6b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -21,6 +21,14 @@ * dropped, etc) */ public interface AcknowledgementSet { + /** + * Adds an event handle to the acknowledgement set. Assigns initial reference + * count of 1. + * + * @param eventHandle event handle to be added + * @since 2.11 + */ + void add(EventHandle eventHandle); /** * Adds an event to the acknowledgement set. Assigns initial reference @@ -29,7 +37,9 @@ public interface AcknowledgementSet { * @param event event to be added * @since 2.2 */ - public void add(Event event); + default void add(Event event) { + add(event.getEventHandle()); + } /** * Aquires a reference to the event by incrementing the reference diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/ExampleValues.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/ExampleValues.java new file mode 100644 index 0000000000..5ab7593cd3 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/ExampleValues.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Use this annotation to provide example values for plugin configuration. + * + * @since 2.11 + */ +@Documented +@Retention(RUNTIME) +@Target({FIELD}) +public @interface ExampleValues { + /** + * One or more examples. + * @return the examples. + * @since 2.11 + */ + Example[] value(); + + /** + * A single example. + * + * @since 2.11 + */ + @interface Example { + /** + * The example value + * @return The example value + * + * @since 2.11 + */ + String value(); + + /** + * A description of the example value. + * + * @since 2.11 + */ + String description() default ""; + } +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java index 921d689a3c..511490ed5b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/AggregateEventHandle.java @@ -68,6 +68,18 @@ public boolean release(boolean result) { return returnValue; } + @Override + public void addEventHandle(EventHandle eventHandle) { + synchronized (this) { + for (WeakReference acknowledgementSetRef : acknowledgementSetRefList) { + AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + } + } + // For testing List> getAcknowledgementSetRefs() { return acknowledgementSetRefList; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 340c104a14..2782fdfa7c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() { return acknowledgementSet != null; } + @Override + public void addEventHandle(EventHandle eventHandle) { + AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); + if (acknowledgementSet != null) { + acknowledgementSet.add(eventHandle); + } + } + @Override public void acquireReference() { synchronized (this) { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java index 3ee88f698b..a842033b01 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/InternalEventHandle.java @@ -31,5 +31,12 @@ public interface InternalEventHandle { */ void acquireReference(); + /** + * Adds new event handle to the acknowledgement sets associated + * with this event handle + * @param eventHandle event handle to add + * @since 2.11 + */ + void addEventHandle(EventHandle eventHandle); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java index e3037427f5..3ca0f2e40b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java @@ -79,11 +79,13 @@ public static ByteCount parse(final String string) { final String unitString = matcher.group("unit"); if(unitString == null) { - throw new ByteCountInvalidInputException("Byte counts must have a unit."); + throw new ByteCountInvalidInputException("Byte counts must have a unit. Valid byte units include: " + + Arrays.stream(Unit.values()).map(unitValue -> unitValue.unitString).collect(Collectors.toList())); } final Unit unit = Unit.fromString(unitString) - .orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'")); + .orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'. Valid byte units include: " + + Arrays.stream(Unit.values()).map(unitValue -> unitValue.unitString).collect(Collectors.toList()))); final BigDecimal valueBigDecimal = new BigDecimal(valueString); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java new file mode 100644 index 0000000000..d74ae9ceef --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetTests.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.acknowledgements; + +import org.junit.jupiter.api.Test; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.junit.jupiter.api.BeforeEach; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; + +public class AcknowledgementSetTests { + + Event event; + EventHandle eventHandle; + AcknowledgementSet acknowledgementSet; + + @BeforeEach + public void setup() { + event = mock(Event.class); + eventHandle = mock(EventHandle.class); + when(event.getEventHandle()).thenReturn(eventHandle); + acknowledgementSet = spy(AcknowledgementSet.class); + } + + @Test + public void testAcknowledgementSetAdd() { + acknowledgementSet.add(event); + verify(acknowledgementSet).add(eventHandle); + } +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java index 9998d6eb6d..c05739c940 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/AggregateEventHandleTests.java @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() { } + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + AggregateEventHandle eventHandle = new AggregateEventHandle(now); + acknowledgementSet1 = mock(AcknowledgementSet.class); + acknowledgementSet2 = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet1); + eventHandle.addAcknowledgementSet(acknowledgementSet2); + AggregateEventHandle eventHandle2 = new AggregateEventHandle(now); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet1).add(eventHandle2); + verify(acknowledgementSet2).add(eventHandle2); + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index f351febd11..df1d8d814d 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -13,7 +13,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import org.mockito.Mock; import java.time.Instant; @@ -39,7 +38,6 @@ void testBasic() { void testWithAcknowledgementSet() { acknowledgementSet = mock(AcknowledgementSet.class); when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); - doNothing().when(acknowledgementSet).acquire(any(EventHandle.class)); Instant now = Instant.now(); DefaultEventHandle eventHandle = new DefaultEventHandle(now); assertThat(eventHandle.getAcknowledgementSet(), equalTo(null)); @@ -75,5 +73,16 @@ void testWithOnReleaseHandler() { assertThat(count, equalTo(1)); } + + @Test + void testAddEventHandle() { + Instant now = Instant.now(); + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + acknowledgementSet = mock(AcknowledgementSet.class); + eventHandle.addAcknowledgementSet(acknowledgementSet); + DefaultEventHandle eventHandle2 = new DefaultEventHandle(now); + eventHandle.addEventHandle(eventHandle2); + verify(acknowledgementSet).add(eventHandle2); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java index b5481add14..b52399646f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java @@ -7,11 +7,8 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.InternalEventHandle; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,18 +72,13 @@ public void checkProgress() { } @Override - public void add(Event event) { + public void add(EventHandle eventHandle) { lock.lock(); try { - if (event instanceof JacksonEvent) { - EventHandle eventHandle = event.getEventHandle(); - if (eventHandle instanceof DefaultEventHandle) { - InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; - internalEventHandle.addAcknowledgementSet(this); - pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); - totalEventsAdded.incrementAndGet(); - } - } + InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle; + internalEventHandle.addAcknowledgementSet(this); + pendingAcknowledgments.put(eventHandle, new AtomicInteger(1)); + totalEventsAdded.incrementAndGet(); } finally { lock.unlock(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java index 932c42c614..589ec7eda3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategy.java @@ -9,7 +9,6 @@ import org.opensearch.dataprepper.core.parser.DataFlowComponent; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; @@ -95,13 +94,13 @@ public Record getRecord(final Record record) { final Event recordEvent = (Event) record.getData(); JacksonEvent newRecordEvent; Record newRecord; - DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle(); + if (internalHandle != null && internalHandle.hasAcknowledgementSet()) { final EventMetadata eventMetadata = recordEvent.getMetadata(); final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap()); newRecordEvent = (JacksonEvent) eventBuilder.build(); - eventHandle.getAcknowledgementSet().add(newRecordEvent); + internalHandle.addEventHandle(newRecordEvent.getEventHandle()); newRecord = new Record<>(newRecordEvent); acquireEventReference(newRecord); } else { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java index 06521b9d38..dbf2bab1a6 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java @@ -129,7 +129,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception { @Test void testDefaultAcknowledgementInvalidAcquire() { - defaultAcknowledgementSet.add(event); + defaultAcknowledgementSet.add(event.getEventHandle()); defaultAcknowledgementSet.complete(); DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest(); defaultAcknowledgementSet.acquire(handle2); @@ -247,7 +247,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception { Duration.ofSeconds(1) ); defaultAcknowledgementSet.add(event); - defaultAcknowledgementSet.add(event2); + defaultAcknowledgementSet.add(event2.getEventHandle()); defaultAcknowledgementSet.complete(); lenient().doAnswer(a -> { AcknowledgementSet acknowledgementSet = a.getArgument(0); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java index 178460cbe9..aedf1f60a5 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/router/RouterCopyRecordStrategyTests.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -241,10 +242,10 @@ void test_one_record_with_acknowledgements_and_multi_components() { attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { - JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); + EventHandle handle = (EventHandle) i.getArgument(0); + ((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1); return null; - }).when(acknowledgementSet1).add(any(JacksonEvent.class)); + }).when(acknowledgementSet1).add(any(EventHandle.class)); } catch (Exception e){} eventBuilder = mock(EventBuilder.class); @@ -279,10 +280,10 @@ void test_multiple_records_with_acknowledgements_and_multi_components() { attachEventHandlesToRecordsIn(eventHandles); try { doAnswer((i) -> { - JacksonEvent e1 = (JacksonEvent) i.getArgument(0); - ((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1); + EventHandle handle = (EventHandle) i.getArgument(0); + ((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1); return null; - }).when(acknowledgementSet1).add(any(JacksonEvent.class)); + }).when(acknowledgementSet1).add(any(EventHandle.class)); } catch (Exception e){} eventBuilder = mock(EventBuilder.class); diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializer.java index 223821b7e0..d1fd2106a3 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializer.java @@ -34,7 +34,7 @@ public ByteCount deserialize(final JsonParser parser, final DeserializationConte try { return ByteCount.parse(byteString); } catch (final Exception ex) { - throw new IllegalArgumentException(ex); + throw new IllegalArgumentException(ex.getMessage()); } } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializer.java new file mode 100644 index 0000000000..e95f10b962 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializer.java @@ -0,0 +1,111 @@ +package org.opensearch.dataprepper.pipeline.parser; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.ContextualDeserializer; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + + +/** + * This deserializer is used for any Enum classes when converting the pipeline configuration file into the plugin model classes + * @since 2.11 + */ +public class EnumDeserializer extends JsonDeserializer> implements ContextualDeserializer { + + static final String INVALID_ENUM_VALUE_ERROR_FORMAT = "Invalid value \"%s\". Valid options include %s."; + + private Class enumClass; + + public EnumDeserializer() {} + + public EnumDeserializer(final Class enumClass) { + if (!enumClass.isEnum()) { + throw new IllegalArgumentException("The provided class is not an enum: " + enumClass.getName()); + } + + this.enumClass = enumClass; + } + @Override + public Enum deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + final JsonNode node = p.getCodec().readTree(p); + final String enumValue = node.asText(); + + final Optional jsonCreator = findJsonCreatorMethod(); + + try { + jsonCreator.ifPresent(method -> method.setAccessible(true)); + + for (Object enumConstant : enumClass.getEnumConstants()) { + try { + if (jsonCreator.isPresent() && enumConstant.equals(jsonCreator.get().invoke(null, enumValue))) { + return (Enum) enumConstant; + } else if (jsonCreator.isEmpty() && enumConstant.toString().toLowerCase().equals(enumValue)) { + return (Enum) enumConstant; + } + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + } finally { + jsonCreator.ifPresent(method -> method.setAccessible(false)); + } + + + + final Optional jsonValueMethod = findJsonValueMethodForClass(); + final List listOfEnums = jsonValueMethod.map(method -> Arrays.stream(enumClass.getEnumConstants()) + .map(valueEnum -> { + try { + return method.invoke(valueEnum); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList())).orElseGet(() -> Arrays.stream(enumClass.getEnumConstants()) + .map(valueEnum -> valueEnum.toString().toLowerCase()) + .collect(Collectors.toList())); + + throw new IllegalArgumentException(String.format(INVALID_ENUM_VALUE_ERROR_FORMAT, enumValue, listOfEnums)); + } + + @Override + public JsonDeserializer createContextual(final DeserializationContext ctxt, final BeanProperty property) { + final JavaType javaType = property.getType(); + final Class rawClass = javaType.getRawClass(); + + return new EnumDeserializer(rawClass); + } + + private Optional findJsonValueMethodForClass() { + for (final Method method : enumClass.getDeclaredMethods()) { + if (method.isAnnotationPresent(JsonValue.class)) { + return Optional.of(method); + } + } + + return Optional.empty(); + } + + private Optional findJsonCreatorMethod() { + for (final Method method : enumClass.getDeclaredMethods()) { + if (method.isAnnotationPresent(JsonCreator.class)) { + return Optional.of(method); + } + } + + return Optional.empty(); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializerTest.java index 21f0b000f3..6d4575bf4e 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/ByteCountDeserializerTest.java @@ -13,6 +13,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.model.types.ByteCount; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -31,9 +32,28 @@ void setUp() { } @ParameterizedTest - @ValueSource(strings = {"1", "1b 2b", "1vb", "bad"}) - void convert_with_invalid_values_throws(final String invalidByteString) { - assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidByteString, ByteCount.class)); + @ValueSource(strings = {"1", "10"}) + void convert_with_no_byte_unit_throws_expected_exception(final String invalidByteString) { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidByteString, ByteCount.class)); + assertThat(exception.getMessage(), containsString("Byte counts must have a unit. Valid byte units include: [b, kb, mb, gb]")); + } + + @ParameterizedTest + @ValueSource(strings = {"10 2b", "bad"}) + void convert_with_non_parseable_values_throws(final String invalidByteString) { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidByteString, ByteCount.class)); + assertThat(exception.getMessage(), containsString("Unable to parse bytes")); + } + + @ParameterizedTest + @CsvSource({ + "10f, f", + "1vb, vb", + "3g, g" + }) + void convert_with_invalid_byte_units_throws(final String invalidByteString, final String invalidUnit) { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidByteString, ByteCount.class)); + assertThat(exception.getMessage(), containsString("Invalid byte unit: '" + invalidUnit + "'. Valid byte units include: [b, kb, mb, gb]")); } @ParameterizedTest diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializerTest.java new file mode 100644 index 0000000000..990fd60e7d --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/EnumDeserializerTest.java @@ -0,0 +1,174 @@ +package org.opensearch.dataprepper.pipeline.parser; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.TextNode; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class EnumDeserializerTest { + + private ObjectMapper objectMapper; + + @BeforeEach + void setup() { + objectMapper = mock(ObjectMapper.class); + } + + private EnumDeserializer createObjectUnderTest(final Class enumClass) { + return new EnumDeserializer(enumClass); + } + + @Test + void non_enum_class_throws_IllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> new EnumDeserializer(Duration.class)); + } + + @ParameterizedTest + @EnumSource(TestEnum.class) + void enum_class_with_json_creator_annotation_returns_expected_enum_constant(final TestEnum testEnumOption) throws IOException { + final EnumDeserializer objectUnderTest = createObjectUnderTest(TestEnum.class); + final JsonParser jsonParser = mock(JsonParser.class); + final DeserializationContext deserializationContext = mock(DeserializationContext.class); + when(jsonParser.getCodec()).thenReturn(objectMapper); + + when(objectMapper.readTree(jsonParser)).thenReturn(new TextNode(testEnumOption.toString())); + + Enum result = objectUnderTest.deserialize(jsonParser, deserializationContext); + + assertThat(result, equalTo(testEnumOption)); + } + + @ParameterizedTest + @EnumSource(TestEnumWithoutJsonCreator.class) + void enum_class_without_json_creator_annotation_returns_expected_enum_constant(final TestEnumWithoutJsonCreator enumWithoutJsonCreator) throws IOException { + final EnumDeserializer objectUnderTest = createObjectUnderTest(TestEnumWithoutJsonCreator.class); + final JsonParser jsonParser = mock(JsonParser.class); + final DeserializationContext deserializationContext = mock(DeserializationContext.class); + when(jsonParser.getCodec()).thenReturn(objectMapper); + + when(objectMapper.readTree(jsonParser)).thenReturn(new TextNode(enumWithoutJsonCreator.toString())); + + Enum result = objectUnderTest.deserialize(jsonParser, deserializationContext); + + assertThat(result, equalTo(enumWithoutJsonCreator)); + } + + @Test + void enum_class_with_invalid_value_and_jsonValue_annotation_throws_IllegalArgumentException() throws IOException { + final EnumDeserializer objectUnderTest = createObjectUnderTest(TestEnum.class); + final JsonParser jsonParser = mock(JsonParser.class); + final DeserializationContext deserializationContext = mock(DeserializationContext.class); + when(jsonParser.getCodec()).thenReturn(objectMapper); + + final String invalidValue = UUID.randomUUID().toString(); + when(objectMapper.readTree(jsonParser)).thenReturn(new TextNode(invalidValue)); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + objectUnderTest.deserialize(jsonParser, deserializationContext)); + + assertThat(exception, notNullValue()); + final String expectedErrorMessage = "Invalid value \"" + invalidValue + "\". Valid options include"; + assertThat(exception.getMessage(), Matchers.startsWith(expectedErrorMessage)); + assertThat(exception.getMessage(), containsString("[test_display_one, test_display_two, test_display_three]")); + } + + @Test + void enum_class_with_invalid_value_and_no_jsonValue_annotation_throws_IllegalArgumentException() throws IOException { + final EnumDeserializer objectUnderTest = createObjectUnderTest(TestEnumWithoutJsonCreator.class); + final JsonParser jsonParser = mock(JsonParser.class); + final DeserializationContext deserializationContext = mock(DeserializationContext.class); + when(jsonParser.getCodec()).thenReturn(objectMapper); + + final String invalidValue = UUID.randomUUID().toString(); + when(objectMapper.readTree(jsonParser)).thenReturn(new TextNode(invalidValue)); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + objectUnderTest.deserialize(jsonParser, deserializationContext)); + + assertThat(exception, notNullValue()); + final String expectedErrorMessage = "Invalid value \"" + invalidValue + "\". Valid options include"; + assertThat(exception.getMessage(), Matchers.startsWith(expectedErrorMessage)); + + } + + @Test + void create_contextual_returns_expected_enum_deserializer() { + final DeserializationContext context = mock(DeserializationContext.class); + final BeanProperty property = mock(BeanProperty.class); + + final ObjectMapper mapper = new ObjectMapper(); + final JavaType javaType = mapper.constructType(HandleFailedEventsOption.class); + when(property.getType()).thenReturn(javaType); + + final EnumDeserializer objectUnderTest = new EnumDeserializer(); + JsonDeserializer result = objectUnderTest.createContextual(context, property); + + assertThat(result, instanceOf(EnumDeserializer.class)); + } + + private enum TestEnum { + TEST_ONE("test_display_one"), + TEST_TWO("test_display_two"), + TEST_THREE("test_display_three"); + private static final Map NAMES_MAP = Arrays.stream(TestEnum.values()) + .collect(Collectors.toMap(TestEnum::toString, Function.identity())); + private final String name; + TestEnum(final String name) { + this.name = name; + } + + @JsonValue + public String toString() { + return this.name; + } + @JsonCreator + static TestEnum fromOptionValue(final String option) { + return NAMES_MAP.get(option); + } + } + + private enum TestEnumWithoutJsonCreator { + TEST("test"); + private static final Map NAMES_MAP = Arrays.stream(TestEnumWithoutJsonCreator.values()) + .collect(Collectors.toMap(TestEnumWithoutJsonCreator::toString, Function.identity())); + private final String name; + TestEnumWithoutJsonCreator(final String name) { + this.name = name; + } + public String toString() { + return this.name; + } + + static TestEnumWithoutJsonCreator fromOptionValue(final String option) { + return NAMES_MAP.get(option); + } + } +} diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java index ca2cea4ee8..0ec428397b 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.pipeline.parser.ByteCountDeserializer; import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationDeserializer; +import org.opensearch.dataprepper.pipeline.parser.EnumDeserializer; import org.opensearch.dataprepper.pipeline.parser.EventKeyDeserializer; import org.springframework.context.annotation.Bean; @@ -33,6 +34,7 @@ public class ObjectMapperConfiguration { ObjectMapper extensionPluginConfigObjectMapper() { final SimpleModule simpleModule = new SimpleModule(); simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + simpleModule.addDeserializer(Enum.class, new EnumDeserializer()); simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer()); return new ObjectMapper() @@ -47,6 +49,7 @@ ObjectMapper pluginConfigObjectMapper( final SimpleModule simpleModule = new SimpleModule(); simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer()); + simpleModule.addDeserializer(Enum.class, new EnumDeserializer()); simpleModule.addDeserializer(EventKey.class, new EventKeyDeserializer(eventKeyFactory)); TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer( clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz))); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ObjectMapperConfigurationTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ObjectMapperConfigurationTest.java index 594d3a47c2..b2cdea61e0 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ObjectMapperConfigurationTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ObjectMapperConfigurationTest.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugin; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -44,11 +47,12 @@ void test_duration_with_pluginConfigObjectMapper() { } @Test - void test_enum_with_pluginConfigObjectMapper() { - final String testString = "test"; + void test_enum_with_pluginConfigObjectMapper() throws JsonProcessingException { + final String testModelAsString = "{ \"name\": \"my-name\", \"test_type\": \"test\" }"; final ObjectMapper objectMapper = objectMapperConfiguration.pluginConfigObjectMapper(variableExpander, eventKeyFactory); - final TestType duration = objectMapper.convertValue(testString, TestType.class); - assertThat(duration, equalTo(TestType.fromOptionValue(testString))); + final TestModel testModel = objectMapper.readValue(testModelAsString, TestModel.class); + assertThat(testModel, notNullValue()); + assertThat(testModel.getTestType(), equalTo(TestType.TEST)); } @Test @@ -60,11 +64,12 @@ void test_duration_with_extensionPluginConfigObjectMapper() { } @Test - void test_enum_with_extensionPluginConfigObjectMapper() { - final String testString = "test"; + void test_enum_with_extensionPluginConfigObjectMapper() throws JsonProcessingException { + final String testModelAsString = "{ \"name\": \"my-name\", \"test_type\": \"test\" }"; final ObjectMapper objectMapper = objectMapperConfiguration.extensionPluginConfigObjectMapper(); - final TestType duration = objectMapper.convertValue(testString, TestType.class); - assertThat(duration, equalTo(TestType.fromOptionValue(testString))); + final TestModel testModel = objectMapper.readValue(testModelAsString, TestModel.class); + assertThat(testModel, notNullValue()); + assertThat(testModel.getTestType(), equalTo(TestType.TEST)); } @Test @@ -100,4 +105,26 @@ static TestType fromOptionValue(final String option) { } } + private static class TestModel { + @JsonProperty("name") + private final String name; + + @JsonProperty("test_type") + private final TestType testType; + + public TestModel(@JsonProperty("name") final String name, + @JsonProperty("test_type") final TestType testType) { + this.name = name; + this.testType = testType; + } + + public String getName() { + return name; + } + + public TestType getTestType() { + return testType; + } + } + } \ No newline at end of file diff --git a/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverride.java b/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverride.java new file mode 100644 index 0000000000..49d332e403 --- /dev/null +++ b/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverride.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.schemas; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.victools.jsonschema.generator.FieldScope; +import com.github.victools.jsonschema.generator.InstanceAttributeOverrideV2; +import com.github.victools.jsonschema.generator.SchemaGenerationContext; +import org.opensearch.dataprepper.model.annotations.ExampleValues; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +class ExampleValuesInstanceAttributeOverride implements InstanceAttributeOverrideV2 { + @Override + public void overrideInstanceAttributes(final ObjectNode fieldSchema, final FieldScope fieldScope, final SchemaGenerationContext context) { + final ExampleValues exampleValuesAnnotation = fieldScope.getAnnotationConsideringFieldAndGetterIfSupported(ExampleValues.class); + if(exampleValuesAnnotation != null && exampleValuesAnnotation.value().length > 0) { + final ObjectMapper objectMapper = context.getGeneratorConfig().getObjectMapper(); + + addExampleSchema(fieldSchema, objectMapper, exampleValuesAnnotation); + } + } + + private void addExampleSchema(final ObjectNode fieldSchema, final ObjectMapper objectMapper, final ExampleValues exampleValuesAnnotation) { + final List> exampleValues = Arrays.stream(exampleValuesAnnotation.value()) + .map(ExampleValuesInstanceAttributeOverride::createExampleMap).collect(Collectors.toList()); + final ArrayNode exampleNode = objectMapper.convertValue(exampleValues, ArrayNode.class); + + fieldSchema.putArray("examples") + .addAll(exampleNode); + } + + private static Map createExampleMap(final ExampleValues.Example example) { + final HashMap exampleMap = new HashMap<>(); + exampleMap.put("example", example.value()); + if(example.description() != null && !example.description().isEmpty()) { + exampleMap.put("description", example.description()); + } + return exampleMap; + } +} diff --git a/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java b/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java index 9bff7b2aa7..78e79c9fa0 100644 --- a/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java +++ b/data-prepper-plugin-schema/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java @@ -53,6 +53,7 @@ public ObjectNode convertIntoJsonSchema( resolveDependentRequiresFields(scopeSchemaGeneratorConfigPart); overrideDataPrepperPluginTypeAttribute(configBuilder.forTypesInGeneral(), schemaVersion, optionPreset); resolveDataPrepperTypes(scopeSchemaGeneratorConfigPart); + scopeSchemaGeneratorConfigPart.withInstanceAttributeOverride(new ExampleValuesInstanceAttributeOverride()); final SchemaGeneratorConfig config = configBuilder.build(); final SchemaGenerator generator = new SchemaGenerator(config); diff --git a/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverrideTest.java b/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverrideTest.java new file mode 100644 index 0000000000..152cab2bd3 --- /dev/null +++ b/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/ExampleValuesInstanceAttributeOverrideTest.java @@ -0,0 +1,187 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.schemas; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.victools.jsonschema.generator.FieldScope; +import com.github.victools.jsonschema.generator.SchemaGenerationContext; +import com.github.victools.jsonschema.generator.SchemaGeneratorConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.annotations.ExampleValues; + +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ExampleValuesInstanceAttributeOverrideTest { + + @Mock + private ObjectNode fieldSchema; + + @Mock + private FieldScope fieldScope; + + @Mock + private SchemaGenerationContext context; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + fieldSchema = spy(objectMapper.convertValue(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()), ObjectNode.class)); + } + + private ExampleValuesInstanceAttributeOverride createObjectUnderTest() { + return new ExampleValuesInstanceAttributeOverride(); + } + + @Test + void overrideInstanceAttributes_does_not_modify_fieldSchema_if_no_ExampleValues_annotation() { + createObjectUnderTest().overrideInstanceAttributes(fieldSchema, fieldScope, context); + + verifyNoInteractions(fieldSchema); + } + + @Nested + class WithExampleValuesAnnotation { + @Mock + private ExampleValues exampleValuesAnnotation; + + @Mock + private SchemaGeneratorConfig schemaGeneratorConfig; + + @BeforeEach + void setUp() { + when(fieldScope.getAnnotationConsideringFieldAndGetterIfSupported(ExampleValues.class)) + .thenReturn(exampleValuesAnnotation); + } + + @Test + void overrideInstanceAttributes_does_not_modify_fieldSchema_if_no_ExampleValues_annotation_is_empty() { + when(exampleValuesAnnotation.value()).thenReturn(new ExampleValues.Example[]{}); + + createObjectUnderTest().overrideInstanceAttributes(fieldSchema, fieldScope, context); + + verifyNoInteractions(fieldSchema); + } + + @Test + void overrideInstanceAttributes_adds_examples_when_one_ExampleValue() { + when(context.getGeneratorConfig()).thenReturn(schemaGeneratorConfig); + when(schemaGeneratorConfig.getObjectMapper()).thenReturn(objectMapper); + + final ExampleValues.Example example = mock(ExampleValues.Example.class); + final String value = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + when(example.value()).thenReturn(value); + when(example.description()).thenReturn(description); + final ExampleValues.Example[] examples = {example}; + when(exampleValuesAnnotation.value()).thenReturn(examples); + + createObjectUnderTest().overrideInstanceAttributes(fieldSchema, fieldScope, context); + + final JsonNode examplesNode = fieldSchema.get("examples"); + assertThat(examplesNode, notNullValue()); + + assertThat(examplesNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + assertThat(examplesNode.size(), equalTo(1)); + final JsonNode firstExampleNode = examplesNode.get(0); + assertThat(firstExampleNode, notNullValue()); + assertThat(firstExampleNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(firstExampleNode.get("example"), notNullValue()); + assertThat(firstExampleNode.get("example").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(firstExampleNode.get("example").textValue(), equalTo(value)); + assertThat(firstExampleNode.get("description"), notNullValue()); + assertThat(firstExampleNode.get("description").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(firstExampleNode.get("description").textValue(), equalTo(description)); + } + + @Test + void overrideInstanceAttributes_adds_examples_when_one_ExampleValue_with_no_description() { + when(context.getGeneratorConfig()).thenReturn(schemaGeneratorConfig); + when(schemaGeneratorConfig.getObjectMapper()).thenReturn(objectMapper); + + final ExampleValues.Example example = mock(ExampleValues.Example.class); + final String value = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + when(example.value()).thenReturn(value); + final ExampleValues.Example[] examples = {example}; + when(exampleValuesAnnotation.value()).thenReturn(examples); + + createObjectUnderTest().overrideInstanceAttributes(fieldSchema, fieldScope, context); + + final JsonNode examplesNode = fieldSchema.get("examples"); + assertThat(examplesNode, notNullValue()); + + assertThat(examplesNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + assertThat(examplesNode.size(), equalTo(1)); + final JsonNode firstExampleNode = examplesNode.get(0); + assertThat(firstExampleNode, notNullValue()); + assertThat(firstExampleNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(firstExampleNode.get("example"), notNullValue()); + assertThat(firstExampleNode.get("example").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(firstExampleNode.get("example").textValue(), equalTo(value)); + assertThat(firstExampleNode.has("description"), equalTo(false)); + } + + @ParameterizedTest + @ValueSource(ints = {2, 3, 5}) + void overrideInstanceAttributes_adds_examples_when_multiple_ExampleValue(final int numberOfExamples) { + when(context.getGeneratorConfig()).thenReturn(schemaGeneratorConfig); + when(schemaGeneratorConfig.getObjectMapper()).thenReturn(objectMapper); + + final ExampleValues.Example[] examples = new ExampleValues.Example[numberOfExamples]; + for (int i = 0; i < numberOfExamples; i++) { + final ExampleValues.Example example = mock(ExampleValues.Example.class); + final String value = UUID.randomUUID().toString(); + final String description = UUID.randomUUID().toString(); + when(example.value()).thenReturn(value); + when(example.description()).thenReturn(description); + + examples[i] = example; + } + when(exampleValuesAnnotation.value()).thenReturn(examples); + + createObjectUnderTest().overrideInstanceAttributes(fieldSchema, fieldScope, context); + + final JsonNode examplesNode = fieldSchema.get("examples"); + assertThat(examplesNode, notNullValue()); + + assertThat(examplesNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + assertThat(examplesNode.size(), equalTo(numberOfExamples)); + + for (int i = 0; i < numberOfExamples; i++) { + final JsonNode exampleNode = examplesNode.get(0); + assertThat(exampleNode, notNullValue()); + assertThat(exampleNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(exampleNode.get("example"), notNullValue()); + assertThat(exampleNode.get("example").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(exampleNode.get("example").textValue(), notNullValue()); + assertThat(exampleNode.get("description"), notNullValue()); + assertThat(exampleNode.get("description").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(exampleNode.get("description").textValue(), notNullValue()); + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterIT.java b/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterIT.java index 7a3dca5991..fcd0bddebc 100644 --- a/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterIT.java +++ b/data-prepper-plugin-schema/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterIT.java @@ -1,10 +1,12 @@ package org.opensearch.dataprepper.schemas; import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.victools.jsonschema.generator.Module; import com.github.victools.jsonschema.generator.OptionPreset; @@ -14,6 +16,7 @@ import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.annotations.ExampleValues; import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugin.ClasspathPluginProvider; @@ -23,8 +26,10 @@ import java.util.List; import static com.github.victools.jsonschema.module.jackson.JacksonOption.RESPECT_JSONPROPERTY_REQUIRED; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; public class JsonSchemaConverterIT { @@ -59,6 +64,38 @@ void testSubTypes() throws JsonProcessingException { anyOfNode.forEach(aggregateActionNode -> assertThat(aggregateActionNode.has(PROPERTIES_KEY), is(true))); } + @Test + void test_examples() throws JsonProcessingException { + final ObjectNode jsonSchemaNode = objectUnderTest.convertIntoJsonSchema( + SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, TestConfig.class); + assertThat(jsonSchemaNode, instanceOf(ObjectNode.class)); + final JsonNode propertiesNode = jsonSchemaNode.at("/" + PROPERTIES_KEY); + assertThat(propertiesNode, instanceOf(ObjectNode.class)); + assertThat(propertiesNode.has("string_value_with_two_examples"), is(true)); + final JsonNode propertyNode = propertiesNode.at("/string_value_with_two_examples"); + assertThat(propertyNode.has("type"), equalTo(true)); + assertThat(propertyNode.get("type").getNodeType(), equalTo(JsonNodeType.STRING)); + + assertThat(propertyNode.has("examples"), equalTo(true)); + assertThat(propertyNode.get("examples").getNodeType(), equalTo(JsonNodeType.ARRAY)); + assertThat(propertyNode.get("examples").size(), equalTo(2)); + assertThat(propertyNode.get("examples").get(0), notNullValue()); + assertThat(propertyNode.get("examples").get(0).getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(propertyNode.get("examples").get(0).has("example"), equalTo(true)); + assertThat(propertyNode.get("examples").get(0).get("example").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(propertyNode.get("examples").get(0).get("example").textValue(), equalTo("some example value")); + assertThat(propertyNode.get("examples").get(0).has("description"), equalTo(false)); + + assertThat(propertyNode.get("examples").get(1), notNullValue()); + assertThat(propertyNode.get("examples").get(1).getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(propertyNode.get("examples").get(1).has("example"), equalTo(true)); + assertThat(propertyNode.get("examples").get(1).get("example").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(propertyNode.get("examples").get(1).get("example").textValue(), equalTo("second example value")); + assertThat(propertyNode.get("examples").get(1).has("description"), equalTo(true)); + assertThat(propertyNode.get("examples").get(1).get("description").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(propertyNode.get("examples").get(1).get("description").textValue(), equalTo("This is the second value.")); + } + @JsonClassDescription("test config") static class TestConfig { @JsonPropertyDescription("The aggregate action description") @@ -68,5 +105,16 @@ static class TestConfig { public PluginModel getAction() { return action; } + + @JsonProperty("string_value_with_two_examples") + @ExampleValues({ + @ExampleValues.Example("some example value"), + @ExampleValues.Example(value = "second example value", description = "This is the second value.") + }) + private String stringValueWithTwoExamples; + + public String getStringValueWithTwoExamples() { + return stringValueWithTwoExamples; + } } } diff --git a/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptions.java b/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptions.java index f0be710968..1c0baea3bd 100644 --- a/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptions.java +++ b/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptions.java @@ -16,16 +16,20 @@ */ public class AwsCredentialsOptions { private static final AwsCredentialsOptions DEFAULT_OPTIONS = new AwsCredentialsOptions(); + private static final AwsCredentialsOptions DEFAULT_OPTIONS_WITH_DEFAULT_CREDS_PROVIDER = + AwsCredentialsOptions.builder().withUseDefaultCredentialsProvider(true).build(); private final String stsRoleArn; private final String stsExternalId; private final Region region; private final Map stsHeaderOverrides; + private final boolean useDefaultCredentialsProvider; private AwsCredentialsOptions(final Builder builder) { this.stsRoleArn = builder.stsRoleArn; this.stsExternalId = builder.stsExternalId; this.region = builder.region; this.stsHeaderOverrides = builder.stsHeaderOverrides != null ? new HashMap<>(builder.stsHeaderOverrides) : Collections.emptyMap(); + this.useDefaultCredentialsProvider = builder.useDefaultCredentialsProvider; } private AwsCredentialsOptions() { @@ -33,6 +37,7 @@ private AwsCredentialsOptions() { this.stsExternalId = null; this.region = null; this.stsHeaderOverrides = Collections.emptyMap(); + this.useDefaultCredentialsProvider = false; } /** @@ -49,6 +54,10 @@ public static AwsCredentialsOptions defaultOptions() { return DEFAULT_OPTIONS; } + public static AwsCredentialsOptions defaultOptionsWithDefaultCredentialsProvider() { + return DEFAULT_OPTIONS_WITH_DEFAULT_CREDS_PROVIDER; + } + public String getStsRoleArn() { return stsRoleArn; } @@ -65,6 +74,10 @@ public Map getStsHeaderOverrides() { return stsHeaderOverrides; } + public boolean isUseDefaultCredentialsProvider() { + return useDefaultCredentialsProvider; + } + /** * Builder class for {@link AwsCredentialsOptions}. */ @@ -73,6 +86,7 @@ public static class Builder { private String stsExternalId; private Region region; private Map stsHeaderOverrides = Collections.emptyMap(); + private boolean useDefaultCredentialsProvider = false; /** * Sets the STS role ARN to use. @@ -122,6 +136,17 @@ public Builder withStsHeaderOverrides(final Map stsHeaderOverrid return this; } + /** + * Configures whether to use default credentials. + * + * @param useDefaultCredentialsProvider + * @return The {@link Builder} for continuing to build + */ + public Builder withUseDefaultCredentialsProvider(final boolean useDefaultCredentialsProvider) { + this.useDefaultCredentialsProvider = useDefaultCredentialsProvider; + return this; + } + /** * Builds the {@link AwsCredentialsOptions}. * diff --git a/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptionsTest.java b/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptionsTest.java index 5f4200069e..c30ef133fc 100644 --- a/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptionsTest.java +++ b/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsCredentialsOptionsTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; class AwsCredentialsOptionsTest { @Test @@ -150,4 +151,21 @@ void defaultOptions_returns_same_instance_on_multiple_calls() { assertThat(AwsCredentialsOptions.defaultOptions(), sameInstance(AwsCredentialsOptions.defaultOptions())); } + + + @Test + void with_DefaultRole() { + final AwsCredentialsOptions awsCredentialsOptionsWithDefaultCredentialsProvider + = AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider(); + + assertThat(awsCredentialsOptionsWithDefaultCredentialsProvider, notNullValue()); + assertThat(awsCredentialsOptionsWithDefaultCredentialsProvider.getStsRoleArn(), nullValue()); + assertTrue(awsCredentialsOptionsWithDefaultCredentialsProvider.isUseDefaultCredentialsProvider()); + } + + @Test + void defaultCredentialsOptions_returns_same_instance_on_multiple_calls() { + assertThat(AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider(), + sameInstance(AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider())); + } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/CredentialsProviderFactory.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/CredentialsProviderFactory.java index 222051beab..5d014998d2 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/CredentialsProviderFactory.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/CredentialsProviderFactory.java @@ -50,6 +50,10 @@ Region getDefaultRegion() { AwsCredentialsProvider providerFromOptions(final AwsCredentialsOptions credentialsOptions) { Objects.requireNonNull(credentialsOptions); + if (credentialsOptions.isUseDefaultCredentialsProvider()) { + return DefaultCredentialsProvider.create(); + } + if(credentialsOptions.getStsRoleArn() != null || defaultStsConfiguration.getAwsStsRoleArn() != null) { return createStsCredentials(credentialsOptions); } diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginIT.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginIT.java index a1e81198c6..d2daa7545e 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginIT.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginIT.java @@ -157,6 +157,42 @@ void test_AwsPlugin_without_STS_role_and_with_default_role_uses_default_role() { assertThat(awsCredentialsProvider2, sameInstance(awsCredentialsProvider1)); } + @Test + void test_AwsPlugin_without_STS_role_and_without_default_role_uses_default_role() { + + createObjectUnderTest().apply(extensionPoints); + + final ArgumentCaptor> extensionProviderArgumentCaptor = ArgumentCaptor.forClass(ExtensionProvider.class); + verify(extensionPoints).addExtensionProvider(extensionProviderArgumentCaptor.capture()); + + final ExtensionProvider extensionProvider = extensionProviderArgumentCaptor.getValue(); + + final Optional optionalSupplier = extensionProvider.provideInstance(context); + assertThat(optionalSupplier, notNullValue()); + assertThat(optionalSupplier.isPresent(), equalTo(true)); + + final AwsCredentialsSupplier awsCredentialsSupplier = optionalSupplier.get(); + + final AwsCredentialsOptions awsCredentialsOptions1 = AwsCredentialsOptions.builder() + .withRegion(Region.US_EAST_1) + .withUseDefaultCredentialsProvider(true) + .build(); + + final AwsCredentialsProvider awsCredentialsProvider1 = awsCredentialsSupplier.getProvider(awsCredentialsOptions1); + + assertThat(awsCredentialsProvider1, instanceOf(DefaultCredentialsProvider.class)); + + final AwsCredentialsOptions awsCredentialsOptions2 = AwsCredentialsOptions.builder() + .withRegion(Region.US_EAST_1) + .withUseDefaultCredentialsProvider(true) + .build(); + + final AwsCredentialsProvider awsCredentialsProvider2 = awsCredentialsSupplier.getProvider(awsCredentialsOptions2); + + assertThat(awsCredentialsProvider2, instanceOf(DefaultCredentialsProvider.class)); + assertThat(awsCredentialsProvider2, sameInstance(awsCredentialsProvider1)); + } + private String createStsRole() { return String.format("arn:aws:iam::123456789012:role/%s", UUID.randomUUID()); } diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java index 901bde4c9d..c81a3a1ffb 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.annotations.AlsoRequired; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; import java.time.ZoneId; import java.util.List; @@ -47,6 +49,11 @@ public static class DateMatch { "The timestamp value also supports epoch_second, epoch_milli, and epoch_nano values, " + "which represent the timestamp as the number of seconds, milliseconds, and nanoseconds since the epoch. " + "Epoch values always use the UTC time zone.") + @ExampleValues({ + @Example(value = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX", description = "Matches ISO-8601 formatted strings."), + @Example(value = "dd/MMM/yyyy:HH:mm:ss Z", description = "Matches Apache Common Log Format."), + @Example(value = "epoch_second", description = "Matches against strings that represent seconds since Unix epoch time.") + }) private List patterns; public DateMatch() { @@ -129,6 +136,10 @@ public static boolean isValidPattern(final String pattern) { @JsonProperty(value = "output_format", defaultValue = DEFAULT_OUTPUT_FORMAT) @JsonPropertyDescription("Determines the format of the timestamp added to an event.") + @ExampleValues({ + @Example(value = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX", description = "Outputs ISO-8601 formatted strings."), + @Example(value = "dd/MMM/yyyy:HH:mm:ss Z", description = "Outputs in Apache Common Log Format.") + }) private String outputFormat = DEFAULT_OUTPUT_FORMAT; @JsonProperty("to_origination_metadata") @@ -141,12 +152,20 @@ public static boolean isValidPattern(final String pattern) { "from the value. If the zone or offset are part of the value, then the time zone is ignored. " + "A list of all the available time zones is contained in the TZ database name column of " + "this table.") + @ExampleValues({ + @Example(value = "UTC", description = "Coordinated Universal Time (UTC)."), + @Example(value = "US/Pacific", description = "United States Pacific time zone.") + }) private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE; @JsonProperty("destination_timezone") @JsonPropertyDescription("The time zone used for storing the timestamp in the destination field. " + "A list of all the available time zones is contained in the TZ database name column of " + "this table.") + @ExampleValues({ + @Example(value = "UTC", description = "Coordinated Universal Time (UTC)."), + @Example(value = "US/Pacific", description = "United States Pacific time zone.") + }) private String destinationTimezone = DEFAULT_DESTINATION_TIMEZONE; @JsonProperty("locale") @@ -157,6 +176,10 @@ public static boolean isValidPattern(final String pattern) { "A full list of locale fields, including language, country, and variant, can be found " + "here." + "Default is Locale.ROOT.") + @ExampleValues({ + @Example("en-US"), + @Example("fr-FR") + }) private String locale; @JsonProperty("date_when") diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java index 6196894beb..3355e15fde 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java @@ -36,7 +36,8 @@ public DropEventsProcessor( if (dropEventProcessorConfig.getDropWhen() != null && (!expressionEvaluator.isValidExpressionStatement(dropEventProcessorConfig.getDropWhen()))) { - throw new InvalidPluginConfigurationException("drop_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + throw new InvalidPluginConfigurationException(String.format("drop_when \"%s\" is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", dropEventProcessorConfig.getDropWhen())); } whenCondition = new DropEventsWhenCondition.Builder() diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java index 8f3bac38aa..9398f208e1 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisClientFactory.java @@ -33,7 +33,8 @@ public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier, .withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId()) .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) .build()); - defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions()); + defaultCredentialsProvider = awsCredentialsSupplier.getProvider( + AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider()); this.awsAuthenticationConfig = awsAuthenticationConfig; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index e13c403676..e044676799 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -294,7 +294,7 @@ void testProcessRecordsWithAcknowledgementsEnabled() doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/ocsf/build.gradle b/data-prepper-plugins/ocsf/build.gradle new file mode 100644 index 0000000000..205686105c --- /dev/null +++ b/data-prepper-plugins/ocsf/build.gradle @@ -0,0 +1,6 @@ + +dependencies { + implementation project(':data-prepper-api') +} + + diff --git a/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/rules/ocsf-v1.1-panw-traffic-rule.yaml b/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/rules/ocsf-v1.1-panw-traffic-rule.yaml new file mode 100644 index 0000000000..52e5b9be3b --- /dev/null +++ b/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/rules/ocsf-v1.1-panw-traffic-rule.yaml @@ -0,0 +1,6 @@ +plugin_name: "ocsf-v1.1-panw-traffic" +apply_when: + - "$..processor[?(@.ocsf.type == 'palo_alto_networks_traffic_logs')]" + - "$..processor[?(@.ocsf.version == '1.1')]" + + diff --git a/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/templates/ocsf-v1.1-panw-traffic-template.yaml b/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/templates/ocsf-v1.1-panw-traffic-template.yaml new file mode 100644 index 0000000000..dba9929bf8 --- /dev/null +++ b/data-prepper-plugins/ocsf/src/main/resources/org/opensearch/dataprepper/transforms/templates/ocsf-v1.1-panw-traffic-template.yaml @@ -0,0 +1,673 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: "<<$.<>.source>>" + sink: "<<$.<>.sink>>" + processor: + - date: + match: + - key: Start_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: time_dt + output_format: 'yyyy-MM-dd''T''HH:mm:ss' + - date: + match: + - key: Start_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: time + output_format: epoch_second + - date: + match: + - key: Generated_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: metadata/processed_time + output_format: epoch_second + - date: + match: + - key: Generated_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: metadata/processed_time_dt + output_format: 'yyyy-MM-dd''T''HH:mm:ss' + - date: + match: + - key: Receive_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: metadata/logged_time + output_format: epoch_second + - date: + match: + - key: Receive_Time + patterns: + - 'yyyy-MM-dd''T''HH:mm:ss' + - 'yyyy-MM-dd''T''HH:mm:ss''Z''' + destination: metadata/logged_time_dt + output_format: 'yyyy-MM-dd''T''HH:mm:ss' + - convert_type: + key: time + type: integer + - convert_type: + key: metadata/processed_time + type: integer + - add_entries: + entries: + - key: category_uid + value: 4 + - key: category_name + value: Network Activity + - key: class_uid + value: 4001 + - key: class_name + value: Network Activity + - key: metadata/product/name + value: Palo Alto Networks Next-Generation Firewall + - key: metadata/product/vendor_name + value: Palo Alto Networks + - key: metadata/profiles + value: + - security_control + - network_proxy + - host + - datetime + - key: metadata/version + value: 1.1.0 + - key: severity_id + value: 1 + - key: severity + value: Informational + - key: device/type_id + value: 9 + - key: connection_info/direction_id + value: 1 + - add_entries: + entries: + - key: observables_0/name + value: src_endpoint.ip + - key: observables_0/type + value: IP Address + - key: observables_0/type_id + value: '2' + - key: observables_0/value + format: '${Source_Address}' + - add_entries: + entries: + - key: observables_1/name + value: dst_endpoint.ip + - key: observables_1/type + value: IP Address + - key: observables_1/type_id + value: '2' + - key: observables_1/value + format: '${Destination_Address}' + - add_entries: + entries: + - key: observables_2/name + value: firewall_rule.uid + - key: observables_2/type + value: Resource UID + - key: observables_2/type_id + value: '10' + - key: observables_2/value + format: '${Rule_UUID}' + - convert_type: + key: observables_0/type_id + type: integer + - convert_type: + key: observables_1/type_id + type: integer + - convert_type: + key: observables_2/type_id + type: integer + - add_entries: + entries: + - key: observables + value: [] + - key: observables + value_expression: /observables_0 + append_if_key_exists: true + - add_entries: + entries: + - key: observables + value_expression: /observables_1 + append_if_key_exists: true + - add_entries: + entries: + - key: observables + value_expression: /observables_2 + append_if_key_exists: true + - rename_keys: + entries: + - from_key: Source_Address + to_key: src_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: Source_Port + to_key: src_endpoint/port + overwrite_if_to_key_exists: true + - from_key: Virtual_System + to_key: src_endpoint/instance_uid + overwrite_if_to_key_exists: true + - from_key: Virtual_System_Name + to_key: src_endpoint/name + overwrite_if_to_key_exists: true + - from_key: NAT_Source_IP + to_key: src_endpoint/proxy_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: NAT_Source_Port + to_key: src_endpoint/proxy_endpoint/port + overwrite_if_to_key_exists: true + - from_key: Source_Zone + to_key: src_endpoint/zone + overwrite_if_to_key_exists: true + - from_key: Inbound_Interface + to_key: src_endpoint/interface_uid + overwrite_if_to_key_exists: true + - from_key: Source_Location + to_key: src_endpoint/location/country + overwrite_if_to_key_exists: true + - from_key: Source_Device_Category + to_key: src_endpoint/type + overwrite_if_to_key_exists: true + - from_key: Source_MAC_Address + to_key: src_endpoint/mac + overwrite_if_to_key_exists: true + - from_key: Source_Hostname + to_key: src_endpoint/hostname + overwrite_if_to_key_exists: true + - from_key: Source_Device_OS_Version + to_key: src_endpoint/os/version + overwrite_if_to_key_exists: true + - from_key: Source_Device_OS_Family + to_key: src_endpoint/os/type + overwrite_if_to_key_exists: true + - from_key: Source_Device_Model + to_key: src_endpoint/device_hw_info/cpu_type + overwrite_if_to_key_exists: true + - from_key: Source_Device_Profile + to_key: unmapped/Source_Device_Profile + overwrite_if_to_key_exists: true + - from_key: Source_Device_Vendor + to_key: src_endpoint/device_hw_info/bios_manufacturer + overwrite_if_to_key_exists: true + - from_key: Destination_Device_Category + to_key: dst_endpoint/type + overwrite_if_to_key_exists: true + - from_key: Destination_MAC_Address + to_key: dst_endpoint/mac + overwrite_if_to_key_exists: true + - from_key: Destination_Hostname + to_key: dst_endpoint/hostname + overwrite_if_to_key_exists: true + - from_key: Destination_Device_OS_Version + to_key: dst_endpoint/os/version + overwrite_if_to_key_exists: true + - from_key: Destination_Device_OS_Family + to_key: dst_endpoint/os/type + overwrite_if_to_key_exists: true + - from_key: Destination_Device_Model + to_key: dst_endpoint/device_hw_info/cpu_type + overwrite_if_to_key_exists: true + - from_key: Destination_Device_Vendor + to_key: dst_endpoint/device_hw_info/bios_manufacturer + overwrite_if_to_key_exists: true + - from_key: Destination_Device_Profile + to_key: unmapped/Destination_Device_Profile + overwrite_if_to_key_exists: true + - from_key: Destination_Location + to_key: dst_endpoint/location/country + overwrite_if_to_key_exists: true + - from_key: Destination_Zone + to_key: dst_endpoint/zone + overwrite_if_to_key_exists: true + - from_key: Destination_Address + to_key: dst_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: Destination_Port + to_key: dst_endpoint/port + overwrite_if_to_key_exists: true + - from_key: NAT_Destination_IP + to_key: dst_endpoint/proxy_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: NAT_Destination_Port + to_key: dst_endpoint/proxy_endpoint/port + overwrite_if_to_key_exists: true + - from_key: Outbound_Interface + to_key: dst_endpoint/interface_uid + overwrite_if_to_key_exists: true + - from_key: XFF_Address + to_key: proxy_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: Application_Subcategory + to_key: unmapped/Application_Risk + overwrite_if_to_key_exists: true + - from_key: Application_Category + to_key: unmapped/Application_Category + overwrite_if_to_key_exists: true + - from_key: Application_Technology + to_key: unmapped/Application_Technology + overwrite_if_to_key_exists: true + - from_key: Application_Risk + to_key: unmapped/Application_Risk + overwrite_if_to_key_exists: true + - from_key: XFF_Address + to_key: proxy_endpoint/ip + overwrite_if_to_key_exists: true + - from_key: Session_ID + to_key: connection_info/session/uid + overwrite_if_to_key_exists: true + - from_key: Repeat_Count + to_key: connection_info/session/count + overwrite_if_to_key_exists: true + - from_key: Flags + to_key: connection_info/tcp_flags + overwrite_if_to_key_exists: true + - from_key: Protocol + to_key: connection_info/protocol_name + overwrite_if_to_key_exists: true + - from_key: Serial_Number + to_key: device/hw_info/serial_number + overwrite_if_to_key_exists: true + - from_key: Device_Name + to_key: device/hostname + overwrite_if_to_key_exists: true + - from_key: Host_ID + to_key: device/uid + overwrite_if_to_key_exists: true + - from_key: Container_ID + to_key: device/container/uid + overwrite_if_to_key_exists: true + - from_key: POD_Namespace + to_key: unmapped/POD_Namespace + overwrite_if_to_key_exists: true + - from_key: POD_Name + to_key: device/container/pod_uuid + overwrite_if_to_key_exists: true + - from_key: HTTP2_Connection + to_key: unmapped/HTTP2_Connection + overwrite_if_to_key_exists: true + - from_key: Parent_Session_ID + to_key: unmapped/Parent_Session_ID + overwrite_if_to_key_exists: true + - from_key: Source_VM_UUID + to_key: unmapped/Source_VM_UUID + overwrite_if_to_key_exists: true + - from_key: Destination_VM_UUID + to_key: unmapped/Source_VM_UUID + overwrite_if_to_key_exists: true + - from_key: Device_Group_Hierarchy_Level_1 + to_key: unmapped/Device_Group_Hierarchy_Level_1 + overwrite_if_to_key_exists: true + - from_key: Device_Group_Hierarchy_Level_2 + to_key: unmapped/Device_Group_Hierarchy_Level_2 + overwrite_if_to_key_exists: true + - from_key: Device_Group_Hierarchy_Level_3 + to_key: unmapped/Device_Group_Hierarchy_Level_3 + overwrite_if_to_key_exists: true + - from_key: Device_Group_Hierarchy_Level_4 + to_key: unmapped/Device_Group_Hierarchy_Level_4 + overwrite_if_to_key_exists: true + - from_key: High_Resolution_Timestamp + to_key: unmapped/High_Resolution_Timestamp + overwrite_if_to_key_exists: true + - from_key: Log_Action + to_key: unmapped/Log_Action + overwrite_if_to_key_exists: true + - from_key: Action_Flags + to_key: unmapped/Action_Flags + overwrite_if_to_key_exists: true + - from_key: Tunnel_ID_IMSI + to_key: unmapped/Tunnel_ID_IMSI + overwrite_if_to_key_exists: true + - from_key: Monitor_Tag_IMEI + to_key: unmapped/Monitor_Tag_IMEI + overwrite_if_to_key_exists: true + - from_key: Tunnel_Type + to_key: unmapped/Tunnel_Type + overwrite_if_to_key_exists: true + - from_key: SCTP_Association_ID + to_key: unmapped/SCTP_Association_ID + overwrite_if_to_key_exists: true + - from_key: App_Flap_Count + to_key: unmapped/App_Flap_Count + overwrite_if_to_key_exists: true + - from_key: Policy_ID + to_key: unmapped/Policy_ID + overwrite_if_to_key_exists: true + - from_key: SD_WAN_Cluster + to_key: unmapped/SD_WAN_Cluster + overwrite_if_to_key_exists: true + - from_key: SD_WAN_Device_Type + to_key: unmapped/SD_WAN_Device_Type + overwrite_if_to_key_exists: true + - from_key: SD_WAN_Cluster_Type + to_key: unmapped/SD_WAN_Cluster_Type + overwrite_if_to_key_exists: true + - from_key: SD_WAN_Site + to_key: unmapped/SD_WAN_Site + overwrite_if_to_key_exists: true + - from_key: Link_Switches + to_key: unmapped/Link_Switches + overwrite_if_to_key_exists: true + - from_key: A_Slice_Service_Type + to_key: unmapped/A_Slice_Service_Type + overwrite_if_to_key_exists: true + - from_key: A_Slice_Differentiator + to_key: unmapped/A_Slice_Differentiator + overwrite_if_to_key_exists: true + - from_key: Source_External_Dynamic_List + to_key: unmapped/Source_External_Dynamic_List + overwrite_if_to_key_exists: true + - from_key: Destination_External_Dynamic_List + to_key: unmapped/Destination_External_Dynamic_List + overwrite_if_to_key_exists: true + - from_key: Source_Dynamic_Address_Group + to_key: unmapped/Source_Dynamic_Address_Group + overwrite_if_to_key_exists: true + - from_key: Destination_Dynamic_Address_Group + to_key: unmapped/Destination_Dynamic_Address_Group + overwrite_if_to_key_exists: true + - from_key: Application_Characteristic + to_key: unmapped/Application_Characteristic + overwrite_if_to_key_exists: true + - from_key: Application_Container + to_key: unmapped/Application_Container + overwrite_if_to_key_exists: true + - from_key: Tunneled_Application + to_key: unmapped/Tunneled_Application + overwrite_if_to_key_exists: true + - from_key: Application_SaaS + to_key: unmapped/Application_SaaS + overwrite_if_to_key_exists: true + - from_key: Application_Sanctioned_State + to_key: unmapped/Application_Sanctioned_State + overwrite_if_to_key_exists: true + - from_key: Offloaded + to_key: unmapped/Offloaded + overwrite_if_to_key_exists: true + - from_key: Session_Owner + to_key: unmapped/Session_Owner + overwrite_if_to_key_exists: true + - from_key: Packets_Sent + to_key: traffic/packets_out + overwrite_if_to_key_exists: true + - from_key: Packets_Received + to_key: traffic/packets_in + overwrite_if_to_key_exists: true + - from_key: Packets + to_key: traffic/packets + overwrite_if_to_key_exists: true + - from_key: Bytes_Sent + to_key: traffic/bytes_out + overwrite_if_to_key_exists: true + - from_key: Bytes_Received + to_key: traffic/bytes_in + overwrite_if_to_key_exists: true + - from_key: Bytes + to_key: traffic/bytes + overwrite_if_to_key_exists: true + - from_key: SCTP_Chunks_Sent + to_key: traffic/chunks_out + overwrite_if_to_key_exists: true + - from_key: SCTP_Chunks_Received + to_key: traffic/chunks_in + overwrite_if_to_key_exists: true + - from_key: SCTP_Chunks + to_key: traffic/chunks + overwrite_if_to_key_exists: true + - from_key: Application + to_key: unmapped/Application + overwrite_if_to_key_exists: true + - from_key: Source_User + to_key: actor/user/name + overwrite_if_to_key_exists: true + - from_key: Destination_User + to_key: actor/invoked_by + overwrite_if_to_key_exists: true + - from_key: Dynamic_User_Group_Name + to_key: unmapped/Dynamic_User_Group_Name + overwrite_if_to_key_exists: true + - from_key: Rule_Name + to_key: firewall_rule/name + overwrite_if_to_key_exists: true + - from_key: Rule_UUID + to_key: firewall_rule/uid + overwrite_if_to_key_exists: true + - from_key: Session_End_Reason + to_key: firewall_rule/condition + overwrite_if_to_key_exists: true + - from_key: Action_Source + to_key: firewall_rule/match_location + overwrite_if_to_key_exists: true + - from_key: Category + to_key: unmapped/Category + overwrite_if_to_key_exists: true + - from_key: Type + to_key: metadata/product/feature/name + overwrite_if_to_key_exists: true + - from_key: Threat_Content_Type + to_key: metadata/log_name + overwrite_if_to_key_exists: true + - from_key: Sequence_Number + to_key: metadata/log_version + overwrite_if_to_key_exists: true + - from_key: Elapsed_Time + to_key: unmapped/Elapsed_Time + overwrite_if_to_key_exists: true + - from_key: Parent_Start_Time + to_key: unmapped/Parent_Start_Time + overwrite_if_to_key_exists: true + - translate: + mappings: + - source: Action + targets: + - target: activity_id + default: 99 + map: + allow: 1 + deny: 5 + drop: 2 + drop ICMP: 2 + reset both: 3 + reset client: 3 + reset server: 3 + - translate: + mappings: + - source: Action + targets: + - target: activity_name + default: Other + map: + allow: Open + deny: Refuse + drop: Close + drop ICMP: Close + reset both: Reset + reset client: Reset + reset server: Reset + - translate: + mappings: + - source: Action + targets: + - target: action_id + default: 0 + map: + allow: 1 + deny: 2 + drop: 99 + drop ICMP: 99 + reset both: 99 + reset client: 99 + reset server: 99 + - translate: + mappings: + - source: Action + targets: + - target: action + default: Other + map: + allow: Allowed + deny: Denied + drop: Other + drop ICMP: Other + reset both: Other + reset client: Other + reset server: Other + - translate: + mappings: + - source: Action + targets: + - target: type_uid + default: 400199 + map: + allow: 400101 + deny: 400105 + drop: 400102 + drop ICMP: 400102 + reset both: 400103 + reset client: 400103 + reset server: 400103 + - translate: + mappings: + - source: Action + targets: + - target: type_name + default: 'Network Activity: Unknown' + map: + allow: 'Network Activity: Open' + deny: 'Network Activity: Refuse' + drop: 'Network Activity: Close' + drop ICMP: 'Network Activity: Close' + reset both: 'Network Activity: Reset' + reset client: 'Network Activity: Reset' + reset server: 'Network Activity: Reset' + - translate: + mappings: + - source: Action + targets: + - target: status_id + default: 0 + map: + allow: 1 + deny: 2 + drop: 99 + drop ICMP: 99 + reset both: 99 + reset client: 99 + reset server: 99 + - translate: + mappings: + - source: Action + targets: + - target: status + default: 0 + map: + allow: Success + deny: Failure + drop: Other + drop ICMP: Other + reset both: Other + reset client: Other + reset server: Other + - rename_keys: + entries: + - from_key: Action + to_key: unmapped/Action + overwrite_if_to_key_exists: true + - convert_type: + key: status_id + type: integer + - convert_type: + key: action_id + type: integer + - convert_type: + key: type_uid + type: integer + - convert_type: + key: activity_id + type: integer + - convert_type: + key: metadata/log_version + type: string + - convert_type: + key: metadata/logged_time + type: integer + - convert_type: + key: connection_info/uid + type: string + - convert_type: + key: connection_info/session/uid + type: string + - convert_type: + key: connection_info/direction_id + type: integer + - convert_type: + key: connection_info/session/count + type: integer + - convert_type: + key: connection_info/tcp_flags + type: integer + - convert_type: + key: dst_endpoint/port + type: integer + - convert_type: + key: dst_endpoint/proxy_endpoint/port + type: integer + - convert_type: + key: src_endpoint/port + type: integer + - convert_type: + key: src_endpoint/proxy_endpoint/port + type: integer + - convert_type: + key: traffic/bytes + type: integer + - convert_type: + key: traffic/bytes_in + type: integer + - convert_type: + key: traffic/bytes_out + type: integer + - convert_type: + key: traffic/packets + type: integer + - convert_type: + key: traffic/packets_in + type: integer + - convert_type: + key: traffic/packets_out + type: integer + - convert_type: + key: traffic/chunks + type: integer + - convert_type: + key: traffic/chunks_in + type: integer + - convert_type: + key: traffic/chunks_out + type: integer + - delete_entries: + with_keys: + - s3 + - message + - Generated_Time + - Start_Time + - Receive_Time + - FUTURE_USE_1 + - FUTURE_USE_2 + - FUTURE_USE_3 + - FUTURE_USE_4 + - FUTURE_USE_5 + - observables + - observables_0 + - observables_1 + - observables_2 + diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index b5d0d4ff28..a162e352c5 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index dac0db3fa9..e715d54563 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -259,7 +259,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index ba9b32b7df..09349579e4 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a doAnswer(a -> { numEventsAdded.getAndSet(numEventsAdded.get() + 1); return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); doAnswer(invocation -> { Consumer consumer = invocation.getArgument(0); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/configuration/ThresholdOptions.java index d6dbf6877f..0ba43c2c02 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/configuration/ThresholdOptions.java @@ -28,7 +28,7 @@ public class ThresholdOptions { private int eventCount; @JsonProperty("maximum_size") - private String maximumSize = DEFAULT_BYTE_CAPACITY; + private ByteCount maximumSize = ByteCount.parse(DEFAULT_BYTE_CAPACITY); @JsonProperty("event_collect_timeout") @DurationMin(seconds = 1) @@ -49,7 +49,7 @@ public Duration getEventCollectTimeOut() { * @return maximum byte count. */ public ByteCount getMaximumSize() { - return ByteCount.parse(maximumSize); + return maximumSize; } /** diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java index eadc9a7b8b..0b9fcfdaee 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java @@ -140,7 +140,7 @@ void setUp() throws Exception { lenient().doAnswer(a -> { numEventsAdded++; return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); bucketName = UUID.randomUUID().toString(); key = UUID.randomUUID().toString(); when(s3ObjectReference.getBucketName()).thenReturn(bucketName); @@ -196,6 +196,8 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg numEventsAdded = 0; doAnswer(a -> { Record record = mock(Record.class); + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); Consumer c = (Consumer)a.getArgument(2); c.accept(record); return null; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java index a911964ab6..603bc5d77e 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerTest.java @@ -127,7 +127,7 @@ void setup() { lenient().doAnswer(a -> { numEventsAdded++; return null; - }).when(acknowledgementSet).add(any()); + }).when(acknowledgementSet).add(any(Event.class)); final String bucketName = UUID.randomUUID().toString(); final String objectKey = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java index cadd463ae9..0f0b24fbdf 100644 --- a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) { DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle(); - if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { - eventHandle.getAcknowledgementSet().add(recordEvent); + if (eventHandle != null) { + eventHandle.addEventHandle(recordEvent.getEventHandle()); } } diff --git a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java index 4e8944ab91..5c67e123ba 100644 --- a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java +++ b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java @@ -376,7 +376,7 @@ public void testAddToAcknowledgementSetFromOriginEvent() { DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle(); // Verify that the add method is called on the acknowledgement set - verify(spyEventHandle).getAcknowledgementSet(); + verify(spyEventHandle).addEventHandle(recordEvent.getEventHandle()); AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet(); DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle(); diff --git a/release/release-notes/data-prepper.release-notes-2.10.1.md b/release/release-notes/data-prepper.release-notes-2.10.1.md new file mode 100644 index 0000000000..8d4ca73eb7 --- /dev/null +++ b/release/release-notes/data-prepper.release-notes-2.10.1.md @@ -0,0 +1,6 @@ +## 2024-10-21 Version 2.10.1 + +--- + +### Bug Fixes +* [BUG] Kinesis source is failing on startup ([#5084](https://github.com/opensearch-project/data-prepper/issues/5084)) \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 0e56fcf4a6..c8349901f5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -109,6 +109,7 @@ include 'data-prepper-plugins:common' include 'data-prepper-plugins:armeria-common' include 'data-prepper-plugins:anomaly-detector-processor' include 'data-prepper-plugins:opensearch' +include 'data-prepper-plugins:ocsf' include 'data-prepper-plugins:service-map-stateful' include 'data-prepper-plugins:mapdb-processor-state' include 'data-prepper-plugins:otel-proto-common'