diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 7aa20a5dac..351ebcf0e1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -106,9 +106,6 @@ private void doInitializeInternal() { */ @Override public void doOutput(final Collection> records) { - if (records.isEmpty()) { - return; - } s3SinkService.output(records); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index e7d62751a8..7007259ebf 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.services.s3.S3Client; import java.io.IOException; -import java.io.OutputStream; import java.util.Collection; import java.util.LinkedList; import java.util.concurrent.locks.Lock; @@ -92,59 +91,44 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS); numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED); s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); + + currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); } /** * @param records received records and add into buffer. */ void output(Collection> records) { - reentrantLock.lock(); - if (currentBuffer == null) { - currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); + // Don't acquire the lock if there's no work to be done + if (records.isEmpty() && currentBuffer.getEventCount() == 0) { + return; } - try { - OutputStream outputStream = currentBuffer.getOutputStream(); + reentrantLock.lock(); + try { for (Record record : records) { if (currentBuffer.getEventCount() == 0) { final Event eventForSchemaAutoGenerate = record.getData(); - codec.start(outputStream, eventForSchemaAutoGenerate, codecContext); + codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext); } final Event event = record.getData(); - codec.writeEvent(event, outputStream); + codec.writeEvent(event, currentBuffer.getOutputStream()); int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); if (event.getEventHandle() != null) { bufferedEventHandles.add(event.getEventHandle()); } - if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { - codec.complete(outputStream); - String s3Key = currentBuffer.getKey(); - LOG.info("Writing {} to S3 with {} events and size of {} bytes.", - s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); - final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); - if (isFlushToS3) { - LOG.info("Successfully saved {} to S3.", s3Key); - numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); - objectsSucceededCounter.increment(); - s3ObjectSizeSummary.record(currentBuffer.getSize()); - releaseEventHandles(true); - } else { - LOG.error("Failed to save {} to S3.", s3Key); - numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); - objectsFailedCounter.increment(); - releaseEventHandles(false); - } - currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); - outputStream = currentBuffer.getOutputStream(); - } + flushToS3IfNeeded(); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { LOG.error("Exception while write event into buffer :", e); } + + flushToS3IfNeeded(); + reentrantLock.unlock(); } @@ -156,15 +140,41 @@ private void releaseEventHandles(final boolean result) { bufferedEventHandles.clear(); } + private void flushToS3IfNeeded() { + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { + try { + codec.complete(currentBuffer.getOutputStream()); + String s3Key = currentBuffer.getKey(); + LOG.info("Writing {} to S3 with {} events and size of {} bytes.", + s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); + final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); + if (isFlushToS3) { + LOG.info("Successfully saved {} to S3.", s3Key); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + objectsSucceededCounter.increment(); + s3ObjectSizeSummary.record(currentBuffer.getSize()); + releaseEventHandles(true); + } else { + LOG.error("Failed to save {} to S3.", s3Key); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + objectsFailedCounter.increment(); + releaseEventHandles(false); + } + currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); + } catch (final IOException e) { + LOG.error("Exception while completing codec", e); + } + } + } + /** * perform retry in-case any issue occurred, based on max_upload_retries configuration. * * @param currentBuffer current buffer. * @param s3Key * @return boolean based on object upload status. - * @throws InterruptedException interruption during sleep. */ - protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException { + protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) { boolean isUploadedToS3 = Boolean.FALSE; int retryCount = maxRetries; do { @@ -174,12 +184,17 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) } catch (AwsServiceException | SdkClientException e) { LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:", retryCount, e); - LOG.info("Error Massage {}", e.getMessage()); + LOG.info("Error Message {}", e.getMessage()); --retryCount; if (retryCount == 0) { return isUploadedToS3; } - Thread.sleep(5000); + + try { + Thread.sleep(5000); + } catch (final InterruptedException ex) { + LOG.warn("Interrupted while backing off before retrying S3 upload", ex); + } } } while (!isUploadedToS3); return isUploadedToS3; diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 79724bc306..cb311d178a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -158,7 +158,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); - verify(snapshotSuccessCounter, times(50)).increment(); + verify(snapshotSuccessCounter, times(51)).increment(); } @@ -181,7 +181,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); - verify(snapshotSuccessCounter, times(50)).increment(); + verify(snapshotSuccessCounter, times(51)).increment(); } @Test @@ -200,7 +200,7 @@ void test_output_with_uploadedToS3_success() throws IOException { assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); s3SinkService.output(generateRandomStringEventRecord()); - verify(snapshotSuccessCounter, times(50)).increment(); + verify(snapshotSuccessCounter, times(51)).increment(); } @Test @@ -219,7 +219,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(generateRandomStringEventRecord()); - verify(s3ObjectSizeSummary, times(50)).record(objectSize); + verify(s3ObjectSizeSummary, times(51)).record(objectSize); } @Test @@ -245,7 +245,7 @@ void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IO s3SinkService.output(generateEventRecords(2)); - verify(snapshotSuccessCounter, times(2)).increment(); + verify(snapshotSuccessCounter, times(3)).increment(); verify(codec).writeEvent(any(), eq(outputStream1)); verify(codec).writeEvent(any(), eq(outputStream2)); } @@ -283,7 +283,45 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I s3SinkService.output(Collections.singletonList(new Record<>(event))); verify(s3ObjectSizeSummary, never()).record(anyLong()); - verify(buffer, times(3)).flushToS3(); + verify(buffer, times(6)).flushToS3(); + } + + @Test + void test_output_with_no_incoming_records_flushes_batch() throws IOException { + + bufferFactory = mock(BufferFactory.class); + Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); + when(buffer.getEventCount()).thenReturn(10); + + final OutputStream outputStream = mock(OutputStream.class); + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + doNothing().when(codec).writeEvent(event, outputStream); + final S3SinkService s3SinkService = createObjectUnderTest(); + s3SinkService.output(Collections.emptyList()); + + verify(snapshotSuccessCounter, times(1)).increment(); + verify(buffer, times(1)).flushToS3(); + } + + @Test + void test_output_with_no_incoming_records_or_buffered_records_short_circuits() throws IOException { + + bufferFactory = mock(BufferFactory.class); + Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); + when(buffer.getEventCount()).thenReturn(0); + final long objectSize = random.nextInt(1_000_000) + 10_000; + when(buffer.getSize()).thenReturn(objectSize); + + final OutputStream outputStream = mock(OutputStream.class); + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + doNothing().when(codec).writeEvent(event, outputStream); + final S3SinkService s3SinkService = createObjectUnderTest(); + s3SinkService.output(Collections.emptyList()); + + verify(snapshotSuccessCounter, times(0)).increment(); + verify(buffer, times(0)).flushToS3(); } @Test