Skip to content

Commit

Permalink
Batch the errors writing to the S3 sink to reduce the number of error…
Browse files Browse the repository at this point in the history
…s reported. (#3242)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 24, 2023
1 parent c2fc949 commit 2cca4b1
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -108,6 +111,8 @@ void output(Collection<Record<Event>> records) {
return;
}

List<Event> failedEvents = new ArrayList<>();
Exception sampleException = null;
reentrantLock.lock();
try {
for (Record<Event> record : records) {
Expand All @@ -126,10 +131,11 @@ void output(Collection<Record<Event>> records) {
bufferedEventHandles.add(event.getEventHandle());
}
} catch (Exception ex) {
if (event.getEventHandle() != null) {
event.getEventHandle().release(false);
if(sampleException == null) {
sampleException = ex;
}
LOG.error("Unable to add event to buffer. Dropping this event.");

failedEvents.add(event);
}

flushToS3IfNeeded();
Expand All @@ -138,6 +144,15 @@ void output(Collection<Record<Event>> records) {
} finally {
reentrantLock.unlock();
}

if(!failedEvents.isEmpty()) {
failedEvents
.stream()
.map(Event::getEventHandle)
.filter(Objects::nonNull)
.forEach(eventHandle -> eventHandle.release(false));
LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException);
}
}

private void releaseEventHandles(final boolean result) {
Expand Down

0 comments on commit 2cca4b1

Please sign in to comment.