From 431132ad6f61d1caf59b7b4ecc5ce9890a23dffe Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 24 Oct 2024 18:23:01 +0000 Subject: [PATCH] Add external origination time for events created from S3 Object Signed-off-by: Krishna Kondaka --- .../plugins/source/s3/S3InputFile.java | 10 ++++++++++ .../plugins/source/s3/S3ObjectWorker.java | 13 +++++++++++-- .../plugins/source/s3/S3ObjectWorkerTest.java | 16 ++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputFile.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputFile.java index 5b80a3fec0..401a8869ea 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputFile.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3InputFile.java @@ -8,6 +8,7 @@ import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import java.time.Duration; +import java.time.Instant; public class S3InputFile implements InputFile { @@ -46,6 +47,15 @@ public long getLength() { return getMetadata().contentLength(); } + /** + * Return the last modified time of the file + * + * @return last modified time + */ + public Instant getLastModified() { + return getMetadata().lastModified(); + } + /** * Create an input stream from the input file * @return an implementation of a SeekableInputStream into the S3 object. diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java index 6750ada0f3..aca42fb8a8 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,6 +44,7 @@ class S3ObjectWorker implements S3ObjectHandler { private final int numberOfRecordsToAccumulate; private final BiConsumer eventConsumer; private final S3ObjectPluginMetrics s3ObjectPluginMetrics; + private Instant lastModified; public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) { this.buffer = s3ObjectRequest.getBuffer(); @@ -53,6 +55,7 @@ public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) { this.numberOfRecordsToAccumulate = s3ObjectRequest.getNumberOfRecordsToAccumulate(); this.eventConsumer = s3ObjectRequest.getEventConsumer(); this.s3Client = s3ObjectRequest.getS3Client(); + this.lastModified = Instant.now(); this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics(); } @@ -95,16 +98,22 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final AtomicInteger saveStateCounter = new AtomicInteger(); try { s3ObjectSize = inputFile.getLength(); + final Instant lastModifiedTime = inputFile.getLastModified(); + final Instant now = Instant.now(); + final Instant originationTime = (lastModifiedTime == null || lastModifiedTime.isAfter(now)) ? now : lastModifiedTime; codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> { try { - eventConsumer.accept(record.getData(), s3ObjectReference); + Event event = record.getData(); + eventConsumer.accept(event, s3ObjectReference); + event.getMetadata().setExternalOriginationTime(originationTime); + event.getEventHandle().setExternalOriginationTime(originationTime); // Always add record to acknowledgementSet before adding to // buffer because another thread may take and process // buffer contents before the event record is added // to acknowledgement set if (acknowledgementSet != null) { - acknowledgementSet.add(record.getData()); + acknowledgementSet.add(event); } bufferAccumulator.add(record); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java index eadc9a7b8b..2f1d0161fd 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerTest.java @@ -22,6 +22,8 @@ import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.io.InputFile; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; @@ -196,6 +198,12 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg numEventsAdded = 0; doAnswer(a -> { Record record = mock(Record.class); + final Event event = mock(Event.class); + final EventMetadata metadata = mock(EventMetadata.class); + final EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + when(event.getMetadata()).thenReturn(metadata); + when(event.getEventHandle()).thenReturn(eventHandle); Consumer c = (Consumer)a.getArgument(2); c.accept(record); return null; @@ -272,7 +280,11 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato final Record record = mock(Record.class); final Event event = mock(Event.class); + final EventMetadata metadata = mock(EventMetadata.class); + final EventHandle eventHandle = mock(EventHandle.class); when(record.getData()).thenReturn(event); + when(event.getMetadata()).thenReturn(metadata); + when(event.getEventHandle()).thenReturn(eventHandle); consumerUnderTest.accept(record); @@ -305,7 +317,11 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato final Record record = mock(Record.class); final Event event = mock(Event.class); + final EventMetadata metadata = mock(EventMetadata.class); + final EventHandle eventHandle = mock(EventHandle.class); when(record.getData()).thenReturn(event); + when(event.getMetadata()).thenReturn(metadata); + when(event.getEventHandle()).thenReturn(eventHandle); consumerUnderTest.accept(record); final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator, sourceCoordinator);