Skip to content

Commit

Permalink
main merging
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Oct 23, 2024
2 parents f1883af + e9bffee commit 54f8388
Show file tree
Hide file tree
Showing 46 changed files with 1,623 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje

| Maintainer | GitHub ID | Affiliation |
| -------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* dropped, etc)
*/
public interface AcknowledgementSet {
/**
* Adds an event handle to the acknowledgement set. Assigns initial reference
* count of 1.
*
* @param eventHandle event handle to be added
* @since 2.11
*/
void add(EventHandle eventHandle);

/**
* Adds an event to the acknowledgement set. Assigns initial reference
Expand All @@ -29,7 +37,9 @@ public interface AcknowledgementSet {
* @param event event to be added
* @since 2.2
*/
public void add(Event event);
default void add(Event event) {
add(event.getEventHandle());
}

/**
* Aquires a reference to the event by incrementing the reference
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* Use this annotation to provide example values for plugin configuration.
*
* @since 2.11
*/
@Documented
@Retention(RUNTIME)
@Target({FIELD})
public @interface ExampleValues {
/**
* One or more examples.
* @return the examples.
* @since 2.11
*/
Example[] value();

/**
* A single example.
*
* @since 2.11
*/
@interface Example {
/**
* The example value
* @return The example value
*
* @since 2.11
*/
String value();

/**
* A description of the example value.
*
* @since 2.11
*/
String description() default "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public boolean release(boolean result) {
return returnValue;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef : acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}
}
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() {
return acknowledgementSet != null;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}

@Override
public void acquireReference() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,12 @@ public interface InternalEventHandle {
*/
void acquireReference();

/**
* Adds new event handle to the acknowledgement sets associated
* with this event handle
* @param eventHandle event handle to add
* @since 2.11
*/
void addEventHandle(EventHandle eventHandle);
}

Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public static ByteCount parse(final String string) {
final String unitString = matcher.group("unit");

if(unitString == null) {
throw new ByteCountInvalidInputException("Byte counts must have a unit.");
throw new ByteCountInvalidInputException("Byte counts must have a unit. Valid byte units include: " +
Arrays.stream(Unit.values()).map(unitValue -> unitValue.unitString).collect(Collectors.toList()));
}

final Unit unit = Unit.fromString(unitString)
.orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'"));
.orElseThrow(() -> new ByteCountInvalidInputException("Invalid byte unit: '" + unitString + "'. Valid byte units include: "
+ Arrays.stream(Unit.values()).map(unitValue -> unitValue.unitString).collect(Collectors.toList())));

final BigDecimal valueBigDecimal = new BigDecimal(valueString);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.acknowledgements;

import org.junit.jupiter.api.Test;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.junit.jupiter.api.BeforeEach;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class AcknowledgementSetTests {

Event event;
EventHandle eventHandle;
AcknowledgementSet acknowledgementSet;

@BeforeEach
public void setup() {
event = mock(Event.class);
eventHandle = mock(EventHandle.class);
when(event.getEventHandle()).thenReturn(eventHandle);
acknowledgementSet = spy(AcknowledgementSet.class);
}

@Test
public void testAcknowledgementSetAdd() {
acknowledgementSet.add(event);
verify(acknowledgementSet).add(eventHandle);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() {

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet1);
eventHandle.addAcknowledgementSet(acknowledgementSet2);
AggregateEventHandle eventHandle2 = new AggregateEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet1).add(eventHandle2);
verify(acknowledgementSet2).add(eventHandle2);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import org.mockito.Mock;

import java.time.Instant;
Expand All @@ -39,7 +38,6 @@ void testBasic() {
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
doNothing().when(acknowledgementSet).acquire(any(EventHandle.class));
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
Expand Down Expand Up @@ -75,5 +73,16 @@ void testWithOnReleaseHandler() {
assertThat(count, equalTo(1));

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
acknowledgementSet = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet);
DefaultEventHandle eventHandle2 = new DefaultEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet).add(eventHandle2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,18 +72,13 @@ public void checkProgress() {
}

@Override
public void add(Event event) {
public void add(EventHandle eventHandle) {
lock.lock();
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
}
}
InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.dataprepper.core.parser.DataFlowComponent;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -95,13 +94,13 @@ public Record getRecord(final Record record) {
final Event recordEvent = (Event) record.getData();
JacksonEvent newRecordEvent;
Record newRecord;
DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle();
if (internalHandle != null && internalHandle.hasAcknowledgementSet()) {
final EventMetadata eventMetadata = recordEvent.getMetadata();
final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap());
newRecordEvent = (JacksonEvent) eventBuilder.build();

eventHandle.getAcknowledgementSet().add(newRecordEvent);
internalHandle.addEventHandle(newRecordEvent.getEventHandle());
newRecord = new Record<>(newRecordEvent);
acquireEventReference(newRecord);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {

@Test
void testDefaultAcknowledgementInvalidAcquire() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event.getEventHandle());
defaultAcknowledgementSet.complete();
DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest();
defaultAcknowledgementSet.acquire(handle2);
Expand Down Expand Up @@ -247,7 +247,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
Duration.ofSeconds(1)
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event2);
defaultAcknowledgementSet.add(event2.getEventHandle());
defaultAcknowledgementSet.complete();
lenient().doAnswer(a -> {
AcknowledgementSet acknowledgementSet = a.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -241,10 +242,10 @@ void test_one_record_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down Expand Up @@ -279,10 +280,10 @@ void test_multiple_records_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ByteCount deserialize(final JsonParser parser, final DeserializationConte
try {
return ByteCount.parse(byteString);
} catch (final Exception ex) {
throw new IllegalArgumentException(ex);
throw new IllegalArgumentException(ex.getMessage());
}
}
}
Loading

0 comments on commit 54f8388

Please sign in to comment.