Skip to content

Commit

Permalink
Add external origination time for events created from S3 Object
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 24, 2024
1 parent 09ed988 commit 431132a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

import java.time.Duration;
import java.time.Instant;

public class S3InputFile implements InputFile {

Expand Down Expand Up @@ -46,6 +47,15 @@ public long getLength() {
return getMetadata().contentLength();
}

/**
* Return the last modified time of the file
*
* @return last modified time
*/
public Instant getLastModified() {
return getMetadata().lastModified();
}

/**
* Create an input stream from the input file
* @return an implementation of a SeekableInputStream into the S3 object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -43,6 +44,7 @@ class S3ObjectWorker implements S3ObjectHandler {
private final int numberOfRecordsToAccumulate;
private final BiConsumer<Event, S3ObjectReference> eventConsumer;
private final S3ObjectPluginMetrics s3ObjectPluginMetrics;
private Instant lastModified;

public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) {
this.buffer = s3ObjectRequest.getBuffer();
Expand All @@ -53,6 +55,7 @@ public S3ObjectWorker(final S3ObjectRequest s3ObjectRequest) {
this.numberOfRecordsToAccumulate = s3ObjectRequest.getNumberOfRecordsToAccumulate();
this.eventConsumer = s3ObjectRequest.getEventConsumer();
this.s3Client = s3ObjectRequest.getS3Client();
this.lastModified = Instant.now();
this.s3ObjectPluginMetrics = s3ObjectRequest.getS3ObjectPluginMetrics();
}

Expand Down Expand Up @@ -95,16 +98,22 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet,
final AtomicInteger saveStateCounter = new AtomicInteger();
try {
s3ObjectSize = inputFile.getLength();
final Instant lastModifiedTime = inputFile.getLastModified();
final Instant now = Instant.now();
final Instant originationTime = (lastModifiedTime == null || lastModifiedTime.isAfter(now)) ? now : lastModifiedTime;

codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
try {
eventConsumer.accept(record.getData(), s3ObjectReference);
Event event = record.getData();
eventConsumer.accept(event, s3ObjectReference);
event.getMetadata().setExternalOriginationTime(originationTime);
event.getEventHandle().setExternalOriginationTime(originationTime);
// Always add record to acknowledgementSet before adding to
// buffer because another thread may take and process
// buffer contents before the event record is added
// to acknowledgement set
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
acknowledgementSet.add(event);
}
bufferAccumulator.add(record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand Down Expand Up @@ -196,6 +198,12 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest_with_Acknowledg
numEventsAdded = 0;
doAnswer(a -> {
Record record = mock(Record.class);
final Event event = mock(Event.class);
final EventMetadata metadata = mock(EventMetadata.class);
final EventHandle eventHandle = mock(EventHandle.class);
when(record.getData()).thenReturn(event);
when(event.getMetadata()).thenReturn(metadata);
when(event.getEventHandle()).thenReturn(eventHandle);
Consumer c = (Consumer)a.getArgument(2);
c.accept(record);
return null;
Expand Down Expand Up @@ -272,7 +280,11 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato

final Record<Event> record = mock(Record.class);
final Event event = mock(Event.class);
final EventMetadata metadata = mock(EventMetadata.class);
final EventHandle eventHandle = mock(EventHandle.class);
when(record.getData()).thenReturn(event);
when(event.getMetadata()).thenReturn(metadata);
when(event.getEventHandle()).thenReturn(eventHandle);

consumerUnderTest.accept(record);

Expand Down Expand Up @@ -305,7 +317,11 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato

final Record<Event> record = mock(Record.class);
final Event event = mock(Event.class);
final EventMetadata metadata = mock(EventMetadata.class);
final EventHandle eventHandle = mock(EventHandle.class);
when(record.getData()).thenReturn(event);
when(event.getMetadata()).thenReturn(metadata);
when(event.getEventHandle()).thenReturn(eventHandle);
consumerUnderTest.accept(record);

final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator, sourceCoordinator);
Expand Down

0 comments on commit 431132a

Please sign in to comment.