From 03eca1e7b2e633adc17fe9671538ac1828317616 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 22 Aug 2023 15:17:57 -0700 Subject: [PATCH] Catch exceptions when writing to the output codec and drop the event. (#3210) (#3216) Catch exceptions when writing to the output codec and drop the event. Correctly release failed events in the S3 sink. Signed-off-by: David Venable (cherry picked from commit c1cbb22fc5a80003d881a66bd028772909b33bba) Co-authored-by: David Venable --- .../plugins/sink/s3/S3SinkService.java | 38 ++++++++++--------- .../plugins/sink/s3/S3SinkServiceTest.java | 34 +++++++++++++++++ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 4ac43721f1..12cf358b86 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -112,28 +112,32 @@ void output(Collection> records) { try { for (Record record : records) { - if (currentBuffer.getEventCount() == 0) { - final Event eventForSchemaAutoGenerate = record.getData(); - codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext); - } - final Event event = record.getData(); - codec.writeEvent(event, currentBuffer.getOutputStream()); - int count = currentBuffer.getEventCount() + 1; - currentBuffer.setEventCount(count); - - if (event.getEventHandle() != null) { - bufferedEventHandles.add(event.getEventHandle()); + try { + if (currentBuffer.getEventCount() == 0) { + codec.start(currentBuffer.getOutputStream(), event, codecContext); + } + + codec.writeEvent(event, currentBuffer.getOutputStream()); + int count = currentBuffer.getEventCount() + 1; + currentBuffer.setEventCount(count); + + if (event.getEventHandle() != null) { + bufferedEventHandles.add(event.getEventHandle()); + } + } catch (Exception ex) { + if (event.getEventHandle() != null) { + event.getEventHandle().release(false); + } + LOG.error("Unable to add event to buffer. Dropping this event."); } + flushToS3IfNeeded(); } - } catch (IOException e) { - LOG.error("Exception while write event into buffer :", e); + flushToS3IfNeeded(); + } finally { + reentrantLock.unlock(); } - - flushToS3IfNeeded(); - - reentrantLock.unlock(); } private void releaseEventHandles(final boolean result) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 37c33adb2a..b154d30526 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -54,6 +55,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -471,6 +473,38 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { } } + @Test + void output_will_skip_and_drop_failed_records() throws IOException { + bufferFactory = mock(BufferFactory.class); + final Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); + + final long objectSize = random.nextInt(1_000_000) + 10_000; + when(buffer.getSize()).thenReturn(objectSize); + + final OutputStream outputStream = mock(OutputStream.class); + when(buffer.getOutputStream()).thenReturn(outputStream); + + + List> records = generateEventRecords(2); + Event event1 = records.get(0).getData(); + Event event2 = records.get(1).getData(); + + doThrow(RuntimeException.class).when(codec).writeEvent(event1, outputStream); + + createObjectUnderTest().output(records); + + InOrder inOrder = inOrder(codec); + inOrder.verify(codec).start(eq(outputStream), eq(event1), any()); + inOrder.verify(codec).writeEvent(event1, outputStream); + inOrder.verify(codec).writeEvent(event2, outputStream); + + verify(event1.getEventHandle()).release(false); + verify(event1.getEventHandle(), never()).release(true); + verify(event2.getEventHandle()).release(true); + verify(event2.getEventHandle(), never()).release(false); + } + @Test void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException { bufferFactory = mock(BufferFactory.class);