Skip to content

Commit

Permalink
Modify AcknowledgementSet add API to accept EventHandle instead of Ev…
Browse files Browse the repository at this point in the history
…ent (#4948)

* Addressed review comments. Rebased to latest

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed javadoc error

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed jacoco coverage failure

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing build

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing build

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Oct 21, 2024
1 parent 3aecf2f commit 38b8db5
Show file tree
Hide file tree
Showing 19 changed files with 128 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
* dropped, etc)
*/
public interface AcknowledgementSet {
/**
* Adds an event handle to the acknowledgement set. Assigns initial reference
* count of 1.
*
* @param eventHandle event handle to be added
* @since 2.11
*/
void add(EventHandle eventHandle);

/**
* Adds an event to the acknowledgement set. Assigns initial reference
Expand All @@ -29,7 +37,9 @@ public interface AcknowledgementSet {
* @param event event to be added
* @since 2.2
*/
public void add(Event event);
default void add(Event event) {
add(event.getEventHandle());
}

/**
* Aquires a reference to the event by incrementing the reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public boolean release(boolean result) {
return returnValue;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef : acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}
}
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public boolean hasAcknowledgementSet() {
return acknowledgementSet != null;
}

@Override
public void addEventHandle(EventHandle eventHandle) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.add(eventHandle);
}
}

@Override
public void acquireReference() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,12 @@ public interface InternalEventHandle {
*/
void acquireReference();

/**
* Adds new event handle to the acknowledgement sets associated
* with this event handle
* @param eventHandle event handle to add
* @since 2.11
*/
void addEventHandle(EventHandle eventHandle);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.acknowledgements;

import org.junit.jupiter.api.Test;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.junit.jupiter.api.BeforeEach;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class AcknowledgementSetTests {

Event event;
EventHandle eventHandle;
AcknowledgementSet acknowledgementSet;

@BeforeEach
public void setup() {
event = mock(Event.class);
eventHandle = mock(EventHandle.class);
when(event.getEventHandle()).thenReturn(eventHandle);
acknowledgementSet = spy(AcknowledgementSet.class);
}

@Test
public void testAcknowledgementSetAdd() {
acknowledgementSet.add(event);
verify(acknowledgementSet).add(eventHandle);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,18 @@ void testWithOnReleaseHandler() {

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet1);
eventHandle.addAcknowledgementSet(acknowledgementSet2);
AggregateEventHandle eventHandle2 = new AggregateEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet1).add(eventHandle2);
verify(acknowledgementSet2).add(eventHandle2);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import org.mockito.Mock;

import java.time.Instant;
Expand All @@ -39,7 +38,6 @@ void testBasic() {
void testWithAcknowledgementSet() {
acknowledgementSet = mock(AcknowledgementSet.class);
when(acknowledgementSet.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
doNothing().when(acknowledgementSet).acquire(any(EventHandle.class));
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
assertThat(eventHandle.getAcknowledgementSet(), equalTo(null));
Expand Down Expand Up @@ -75,5 +73,16 @@ void testWithOnReleaseHandler() {
assertThat(count, equalTo(1));

}

@Test
void testAddEventHandle() {
Instant now = Instant.now();
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
acknowledgementSet = mock(AcknowledgementSet.class);
eventHandle.addAcknowledgementSet(acknowledgementSet);
DefaultEventHandle eventHandle2 = new DefaultEventHandle(now);
eventHandle.addEventHandle(eventHandle2);
verify(acknowledgementSet).add(eventHandle2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,18 +72,13 @@ public void checkProgress() {
}

@Override
public void add(Event event) {
public void add(EventHandle eventHandle) {
lock.lock();
try {
if (event instanceof JacksonEvent) {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
}
}
InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle;
internalEventHandle.addAcknowledgementSet(this);
pendingAcknowledgments.put(eventHandle, new AtomicInteger(1));
totalEventsAdded.incrementAndGet();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.dataprepper.core.parser.DataFlowComponent;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand Down Expand Up @@ -95,13 +94,13 @@ public Record getRecord(final Record record) {
final Event recordEvent = (Event) record.getData();
JacksonEvent newRecordEvent;
Record newRecord;
DefaultEventHandle eventHandle = (DefaultEventHandle)recordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
InternalEventHandle internalHandle = (InternalEventHandle)recordEvent.getEventHandle();
if (internalHandle != null && internalHandle.hasAcknowledgementSet()) {
final EventMetadata eventMetadata = recordEvent.getMetadata();
final EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withEventMetadata(eventMetadata).withData(recordEvent.toMap());
newRecordEvent = (JacksonEvent) eventBuilder.build();

eventHandle.getAcknowledgementSet().add(newRecordEvent);
internalHandle.addEventHandle(newRecordEvent.getEventHandle());
newRecord = new Record<>(newRecordEvent);
acquireEventReference(newRecord);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void testDefaultAcknowledgementSetMultipleAcquireAndRelease() throws Exception {

@Test
void testDefaultAcknowledgementInvalidAcquire() {
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event.getEventHandle());
defaultAcknowledgementSet.complete();
DefaultAcknowledgementSet secondAcknowledgementSet = createObjectUnderTest();
defaultAcknowledgementSet.acquire(handle2);
Expand Down Expand Up @@ -247,7 +247,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
Duration.ofSeconds(1)
);
defaultAcknowledgementSet.add(event);
defaultAcknowledgementSet.add(event2);
defaultAcknowledgementSet.add(event2.getEventHandle());
defaultAcknowledgementSet.complete();
lenient().doAnswer(a -> {
AcknowledgementSet acknowledgementSet = a.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -241,10 +242,10 @@ void test_one_record_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down Expand Up @@ -279,10 +280,10 @@ void test_multiple_records_with_acknowledgements_and_multi_components() {
attachEventHandlesToRecordsIn(eventHandles);
try {
doAnswer((i) -> {
JacksonEvent e1 = (JacksonEvent) i.getArgument(0);
((DefaultEventHandle)e1.getEventHandle()).addAcknowledgementSet(acknowledgementSet1);
EventHandle handle = (EventHandle) i.getArgument(0);
((DefaultEventHandle)handle).addAcknowledgementSet(acknowledgementSet1);
return null;
}).when(acknowledgementSet1).add(any(JacksonEvent.class));
}).when(acknowledgementSet1).add(any(EventHandle.class));
} catch (Exception e){}

eventBuilder = mock(EventBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void testProcessRecordsWithAcknowledgementsEnabled()
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a
doAnswer(a -> {
numEventsAdded.getAndSet(numEventsAdded.get() + 1);
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));

doAnswer(invocation -> {
Consumer<Boolean> consumer = invocation.getArgument(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void setUp() throws Exception {
lenient().doAnswer(a -> {
numEventsAdded++;
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));
bucketName = UUID.randomUUID().toString();
key = UUID.randomUUID().toString();
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
Expand Down Expand Up @@ -196,6 +196,8 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg
numEventsAdded = 0;
doAnswer(a -> {
Record record = mock(Record.class);
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
Consumer c = (Consumer)a.getArgument(2);
c.accept(record);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void setup() {
lenient().doAnswer(a -> {
numEventsAdded++;
return null;
}).when(acknowledgementSet).add(any());
}).when(acknowledgementSet).add(any(Event.class));
final String bucketName = UUID.randomUUID().toString();
final String objectKey = UUID.randomUUID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ protected Record createNewRecordFromEvent(final Event recordEvent, String splitV

protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
eventHandle.getAcknowledgementSet().add(recordEvent);
if (eventHandle != null) {
eventHandle.addEventHandle(recordEvent.getEventHandle());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public void testAddToAcknowledgementSetFromOriginEvent() {

DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle();
// Verify that the add method is called on the acknowledgement set
verify(spyEventHandle).getAcknowledgementSet();
verify(spyEventHandle).addEventHandle(recordEvent.getEventHandle());

AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet();
DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle();
Expand Down

0 comments on commit 38b8db5

Please sign in to comment.