Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.4] Catch exceptions when writing to the output codec and drop the event. #3216

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,32 @@ void output(Collection<Record<Event>> records) {
try {
for (Record<Event> record : records) {

if (currentBuffer.getEventCount() == 0) {
final Event eventForSchemaAutoGenerate = record.getData();
codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext);
}

final Event event = record.getData();
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
try {
if (currentBuffer.getEventCount() == 0) {
codec.start(currentBuffer.getOutputStream(), event, codecContext);
}

codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
} catch (Exception ex) {
if (event.getEventHandle() != null) {
event.getEventHandle().release(false);
}
LOG.error("Unable to add event to buffer. Dropping this event.");
}

flushToS3IfNeeded();
}
} catch (IOException e) {
LOG.error("Exception while write event into buffer :", e);
flushToS3IfNeeded();
} finally {
reentrantLock.unlock();
}

flushToS3IfNeeded();

reentrantLock.unlock();
}

private void releaseEventHandles(final boolean result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
Expand Down Expand Up @@ -54,6 +55,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -471,6 +473,38 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException {
}
}

@Test
void output_will_skip_and_drop_failed_records() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

final OutputStream outputStream = mock(OutputStream.class);
when(buffer.getOutputStream()).thenReturn(outputStream);


List<Record<Event>> records = generateEventRecords(2);
Event event1 = records.get(0).getData();
Event event2 = records.get(1).getData();

doThrow(RuntimeException.class).when(codec).writeEvent(event1, outputStream);

createObjectUnderTest().output(records);

InOrder inOrder = inOrder(codec);
inOrder.verify(codec).start(eq(outputStream), eq(event1), any());
inOrder.verify(codec).writeEvent(event1, outputStream);
inOrder.verify(codec).writeEvent(event2, outputStream);

verify(event1.getEventHandle()).release(false);
verify(event1.getEventHandle(), never()).release(true);
verify(event2.getEventHandle()).release(true);
verify(event2.getEventHandle(), never()).release(false);
}

@Test
void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
Expand Down
Loading