diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 12cf358b86..e014eb764e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -24,8 +24,11 @@ import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; +import java.util.List; +import java.util.Objects; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -108,6 +111,8 @@ void output(Collection> records) { return; } + List failedEvents = new ArrayList<>(); + Exception sampleException = null; reentrantLock.lock(); try { for (Record record : records) { @@ -126,10 +131,11 @@ void output(Collection> records) { bufferedEventHandles.add(event.getEventHandle()); } } catch (Exception ex) { - if (event.getEventHandle() != null) { - event.getEventHandle().release(false); + if(sampleException == null) { + sampleException = ex; } - LOG.error("Unable to add event to buffer. Dropping this event."); + + failedEvents.add(event); } flushToS3IfNeeded(); @@ -138,6 +144,15 @@ void output(Collection> records) { } finally { reentrantLock.unlock(); } + + if(!failedEvents.isEmpty()) { + failedEvents + .stream() + .map(Event::getEventHandle) + .filter(Objects::nonNull) + .forEach(eventHandle -> eventHandle.release(false)); + LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException); + } } private void releaseEventHandles(final boolean result) {