Skip to content

Commit

Permalink
Continue calling S3SinkService::output even if records is empty to fl…
Browse files Browse the repository at this point in the history
…ush stale batches (#3187)

Continue calling S3SinkService::output even if records is empty to flush stale batches (#3187)

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Aug 18, 2023
1 parent 3527424 commit 0d31f15
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ private void doInitializeInternal() {
*/
@Override
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}
s3SinkService.output(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -92,59 +91,44 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);

currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
}

/**
* @param records received records and add into buffer.
*/
void output(Collection<Record<Event>> records) {
reentrantLock.lock();
if (currentBuffer == null) {
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
// Don't acquire the lock if there's no work to be done
if (records.isEmpty() && currentBuffer.getEventCount() == 0) {
return;
}
try {
OutputStream outputStream = currentBuffer.getOutputStream();

reentrantLock.lock();
try {
for (Record<Event> record : records) {

if (currentBuffer.getEventCount() == 0) {
final Event eventForSchemaAutoGenerate = record.getData();
codec.start(outputStream, eventForSchemaAutoGenerate, codecContext);
codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext);
}

final Event event = record.getData();
codec.writeEvent(event, outputStream);
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
codec.complete(outputStream);
String s3Key = currentBuffer.getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
outputStream = currentBuffer.getOutputStream();
}
flushToS3IfNeeded();
}
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
LOG.error("Exception while write event into buffer :", e);
}

flushToS3IfNeeded();

reentrantLock.unlock();
}

Expand All @@ -156,15 +140,41 @@ private void releaseEventHandles(final boolean result) {
bufferedEventHandles.clear();
}

private void flushToS3IfNeeded() {
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(currentBuffer.getOutputStream());
String s3Key = currentBuffer.getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
} catch (final IOException e) {
LOG.error("Exception while completing codec", e);
}
}
}

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
* @param currentBuffer current buffer.
* @param s3Key
* @return boolean based on object upload status.
* @throws InterruptedException interruption during sleep.
*/
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException {
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) {
boolean isUploadedToS3 = Boolean.FALSE;
int retryCount = maxRetries;
do {
Expand All @@ -174,12 +184,17 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
} catch (AwsServiceException | SdkClientException e) {
LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:",
retryCount, e);
LOG.info("Error Massage {}", e.getMessage());
LOG.info("Error Message {}", e.getMessage());
--retryCount;
if (retryCount == 0) {
return isUploadedToS3;
}
Thread.sleep(5000);

try {
Thread.sleep(5000);
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while backing off before retrying S3 upload", ex);
}
}
} while (!isUploadedToS3);
return isUploadedToS3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}


Expand All @@ -181,7 +181,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException {
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}

@Test
Expand All @@ -200,7 +200,7 @@ void test_output_with_uploadedToS3_success() throws IOException {
assertNotNull(s3SinkService);
assertThat(s3SinkService, instanceOf(S3SinkService.class));
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}

@Test
Expand All @@ -219,7 +219,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(generateRandomStringEventRecord());

verify(s3ObjectSizeSummary, times(50)).record(objectSize);
verify(s3ObjectSizeSummary, times(51)).record(objectSize);
}

@Test
Expand All @@ -245,7 +245,7 @@ void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IO

s3SinkService.output(generateEventRecords(2));

verify(snapshotSuccessCounter, times(2)).increment();
verify(snapshotSuccessCounter, times(3)).increment();
verify(codec).writeEvent(any(), eq(outputStream1));
verify(codec).writeEvent(any(), eq(outputStream2));
}
Expand Down Expand Up @@ -283,7 +283,45 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I
s3SinkService.output(Collections.singletonList(new Record<>(event)));

verify(s3ObjectSizeSummary, never()).record(anyLong());
verify(buffer, times(3)).flushToS3();
verify(buffer, times(6)).flushToS3();
}

@Test
void test_output_with_no_incoming_records_flushes_batch() throws IOException {

bufferFactory = mock(BufferFactory.class);
Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);
when(buffer.getEventCount()).thenReturn(10);

final OutputStream outputStream = mock(OutputStream.class);
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(Collections.emptyList());

verify(snapshotSuccessCounter, times(1)).increment();
verify(buffer, times(1)).flushToS3();
}

@Test
void test_output_with_no_incoming_records_or_buffered_records_short_circuits() throws IOException {

bufferFactory = mock(BufferFactory.class);
Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);
when(buffer.getEventCount()).thenReturn(0);
final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

final OutputStream outputStream = mock(OutputStream.class);
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(Collections.emptyList());

verify(snapshotSuccessCounter, times(0)).increment();
verify(buffer, times(0)).flushToS3();
}

@Test
Expand Down

0 comments on commit 0d31f15

Please sign in to comment.