diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java index f4f909858b..2aa1c6a02f 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java @@ -13,6 +13,10 @@ import com.amazon.dataprepper.plugins.source.compression.CompressionEngine; import com.amazon.dataprepper.plugins.source.compression.NoneCompressionEngine; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.noop.NoopTimer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; @@ -30,7 +34,6 @@ import java.util.function.Consumer; import java.util.stream.Stream; -import static com.amazon.dataprepper.plugins.source.S3ObjectWorker.S3_OBJECTS_FAILED_METRIC_NAME; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -38,6 +41,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -69,7 +73,9 @@ void setUp() { pluginMetrics = mock(PluginMetrics.class); final Counter counter = mock(Counter.class); - when(pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME)).thenReturn(counter); + final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER)); + when(pluginMetrics.counter(anyString())).thenReturn(counter); + when(pluginMetrics.timer(anyString())).thenReturn(timer); } private void stubBufferWriter(final Consumer additionalEventAssertions) throws Exception { diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java index 2767ccfd03..4095a90e70 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java @@ -12,6 +12,7 @@ import com.amazon.dataprepper.plugins.source.codec.Codec; import com.amazon.dataprepper.plugins.source.compression.CompressionEngine; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseInputStream; @@ -22,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; +import java.util.concurrent.Callable; /** * Class responsible for taking an {@link S3ObjectReference} and creating all the necessary {@link Event} @@ -30,6 +32,8 @@ class S3ObjectWorker { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class); static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed"; + static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded"; + static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed"; private final S3Client s3Client; private final Buffer> buffer; @@ -38,6 +42,8 @@ class S3ObjectWorker { private final Duration bufferTimeout; private final int numberOfRecordsToAccumulate; private final Counter s3ObjectsFailedCounter; + private final Counter s3ObjectsSucceededCounter; + private final Timer s3ObjectReadTimer; public S3ObjectWorker(final S3Client s3Client, final Buffer> buffer, @@ -54,6 +60,8 @@ public S3ObjectWorker(final S3Client s3Client, this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate; s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME); + s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME); + s3ObjectReadTimer = pluginMetrics.timer(S3_OBJECTS_TIME_ELAPSED_METRIC_NAME); } void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException { @@ -65,8 +73,24 @@ void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException .build(); final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout); + try { + s3ObjectReadTimer.recordCallable((Callable) () -> { + doParseObject(s3ObjectReference, getObjectRequest, bufferAccumulator); + return null; + }); + } catch (final IOException | RuntimeException e) { + throw e; + } catch (final Exception e) { + // doParseObject does not throw Exception, only IOException or RuntimeException. But, Callable has Exception as a checked + // exception on the interface. This catch block thus should not be reached, but in case it is, wrap it. + throw new RuntimeException(e); + } + + s3ObjectsSucceededCounter.increment(); + } + private void doParseObject(final S3ObjectReference s3ObjectReference, final GetObjectRequest getObjectRequest, final BufferAccumulator> bufferAccumulator) throws IOException { try (final ResponseInputStream responseInputStream = s3Client.getObject(getObjectRequest); final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) { codec.parse(inputStream, record -> { diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java index a166f91543..c195df1ed6 100644 --- a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java @@ -12,6 +12,7 @@ import com.amazon.dataprepper.plugins.source.codec.Codec; import com.amazon.dataprepper.plugins.source.compression.CompressionEngine; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,10 +32,12 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -45,6 +48,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -72,13 +76,19 @@ class S3ObjectWorkerTest { private PluginMetrics pluginMetrics; @Mock private Counter s3ObjectsFailedCounter; + @Mock + private Counter s3ObjectsSucceededCounter; + @Mock + private Timer s3ObjectReadTimer; private String bucketName; private String key; private ResponseInputStream objectInputStream; + private Exception exceptionThrownByCallable; + @BeforeEach - void setUp() { + void setUp() throws Exception { final Random random = new Random(); bufferTimeout = Duration.ofMillis(random.nextInt(100) + 100); recordsToAccumulate = random.nextInt(10) + 2; @@ -88,7 +98,21 @@ void setUp() { when(s3ObjectReference.getBucketName()).thenReturn(bucketName); when(s3ObjectReference.getKey()).thenReturn(key); + exceptionThrownByCallable = null; + when(s3ObjectReadTimer.recordCallable(any(Callable.class))) + .thenAnswer(a -> { + try { + a.getArgument(0, Callable.class).call(); + } catch (final Exception ex) { + exceptionThrownByCallable = ex; + throw ex; + } + return null; + }); + when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_FAILED_METRIC_NAME)).thenReturn(s3ObjectsFailedCounter); + when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_SUCCEEDED_METRIC_NAME)).thenReturn(s3ObjectsSucceededCounter); + when(pluginMetrics.timer(S3ObjectWorker.S3_OBJECTS_TIME_ELAPSED_METRIC_NAME)).thenReturn(s3ObjectReadTimer); objectInputStream = mock(ResponseInputStream.class); } @@ -189,6 +213,20 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce inOrder.verify(bufferAccumulator).flush(); } + @Test + void parseS3Object_increments_success_counter_after_parsing_S3_object() throws IOException { + final ResponseInputStream objectInputStream = mock(ResponseInputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + objectUnderTest.parseS3Object(s3ObjectReference); + + verify(s3ObjectsSucceededCounter).increment(); + verifyNoInteractions(s3ObjectsFailedCounter); + assertThat(exceptionThrownByCallable, nullValue()); + } + @Test void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3_object() { final RuntimeException expectedException = mock(RuntimeException.class); @@ -201,10 +239,12 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3 assertThat(actualException, sameInstance(expectedException)); verify(s3ObjectsFailedCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); } @Test - void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_S3_object() throws IOException { + void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_parse_S3_object() throws IOException { when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(objectInputStream); when(s3Client.getObject(any(GetObjectRequest.class))) .thenReturn(objectInputStream); @@ -218,5 +258,55 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_ assertThat(actualException, sameInstance(expectedException)); verify(s3ObjectsFailedCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); + } + + @Test + void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_GetObject_from_S3() { + final RuntimeException expectedException = mock(RuntimeException.class); + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenThrow(expectedException); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference)); + + assertThat(actualException, sameInstance(expectedException)); + + verify(s3ObjectsFailedCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); + } + + @Test + void parseS3Object_throws_Exception_and_increments_failure_counter_when_CompressionEngine_fails() throws IOException { + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + final IOException expectedException = mock(IOException.class); + when(compressionEngine.createInputStream(key, objectInputStream)).thenThrow(expectedException); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + final IOException actualException = assertThrows(IOException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference)); + + assertThat(actualException, sameInstance(expectedException)); + + verify(s3ObjectsFailedCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); + } + + @Test + void parseS3Object_calls_GetObject_after_Callable() throws Exception { + final ResponseInputStream objectInputStream = mock(ResponseInputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + objectUnderTest.parseS3Object(s3ObjectReference); + + final InOrder inOrder = inOrder(s3ObjectReadTimer, s3Client); + + inOrder.verify(s3ObjectReadTimer).recordCallable(any(Callable.class)); + inOrder.verify(s3Client).getObject(any(GetObjectRequest.class)); } } \ No newline at end of file