diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index 0e01a73f..da9d94bf 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -49,6 +49,10 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); + @Key("SINK_BLOB_FILE_PARTITION_PROCESSING_TIME_ENABLED") + @DefaultValue("false") + boolean getFilePartitionProcessingTimeEnabled(); + @Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE") @DefaultValue("day") @ConverterClass(BlobSinkFilePartitionTypeConverter.class) diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java index 1bec503d..7f39fb1f 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java @@ -2,11 +2,14 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.sink.blob.proto.KafkaMetadataProtoMessage; import lombok.AllArgsConstructor; import lombok.Data; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; @AllArgsConstructor @Data @@ -34,4 +37,14 @@ public Instant getTimestamp(String fieldName) { int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos")); return Instant.ofEpochSecond(seconds, nanos); } + + public LocalDateTime getLocalDateTime(BlobSinkConfig config) { + if (config.getFilePartitionProcessingTimeEnabled()) { + return LocalDateTime.now(); + } else { + return LocalDateTime.ofInstant( + getTimestamp(config.getFilePartitionProtoTimestampFieldName()), + ZoneId.of(config.getFilePartitionProtoTimestampTimezone())); + } + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java index 97e24241..5c4bd6d8 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java @@ -7,11 +7,7 @@ import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Instant; -import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZoneId; import java.time.format.DateTimeFormatter; /** @@ -25,14 +21,13 @@ public class TimePartitionedPathUtils { public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConfig) { String topic = record.getTopic(sinkConfig.getOutputKafkaMetadataColumnName()); - Instant timestamp = record.getTimestamp(sinkConfig.getFilePartitionProtoTimestampFieldName()); + Path path = Paths.get(topic); if (sinkConfig.getFilePartitionTimeGranularityType() == Constants.FilePartitionType.NONE) { - return Paths.get(topic); + return path; } - LocalDate localDate = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalDate(); - String datePart = DATE_FORMATTER.format(localDate); - LocalTime localTime = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalTime(); - String hourPart = HOUR_FORMATTER.format(localTime); + LocalDateTime dateTime = record.getLocalDateTime(sinkConfig); + String datePart = DATE_FORMATTER.format(dateTime.toLocalDate()); + String hourPart = HOUR_FORMATTER.format(dateTime.toLocalTime()); String dateSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeDatePrefix(), datePart); String hourSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeHourPrefix(), hourPart); @@ -40,7 +35,7 @@ public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConf String dateTimePartition; switch (sinkConfig.getFilePartitionTimeGranularityType()) { case NONE: - return Paths.get(topic); + return path; case DAY: dateTimePartition = String.format("%s", dateSegment); break; diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java index e997729e..c7c2504e 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java @@ -1,11 +1,15 @@ package com.gotocompany.firehose.sink.blob.message; import com.google.protobuf.DynamicMessage; +import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.sink.blob.TestUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; public class RecordTest { @@ -39,4 +43,33 @@ public void shouldGetTimeStampFromMessage() { Record record = new Record(message, metadata); Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time")); } + + @Test + public void shouldGetDateTimeLocally() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(true); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime before = LocalDateTime.now(); + Thread.sleep(1000); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Thread.sleep(1000); + LocalDateTime after = LocalDateTime.now(); + Assert.assertTrue(localDateTime.isAfter(before)); + Assert.assertTrue(localDateTime.isBefore(after)); + } + + @Test + public void shouldGetDateTimeFromMessage() throws InterruptedException { + BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class); + Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(false); + Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time"); + Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC"); + DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber); + DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic); + Record record = new Record(message, metadata); + LocalDateTime localDateTime = record.getLocalDateTime(config); + Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java index e26f0077..915c0e2f 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.HashSet; import java.util.Set; @@ -58,6 +60,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); this.sinkConfig = Mockito.mock(BlobSinkConfig.class); Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone); + Mockito.when(sinkConfig.getFilePartitionProcessingTimeEnabled()).thenReturn(false); Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn(""); Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName); Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR); @@ -68,6 +71,7 @@ public void setUp() { @Test public void shouldCreateLocalFileWriter() throws Exception { Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); @@ -82,6 +86,7 @@ public void shouldCreateLocalFileWriter() throws Exception { @Test public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Record record1 = Mockito.mock(Record.class); + Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone))); Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record1.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1); @@ -89,6 +94,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test1"); Record record2 = Mockito.mock(Record.class); + Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone))); Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L)); Mockito.when(record2.getTopic("")).thenReturn(defaultTopic); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2); @@ -106,6 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception { @Test(expected = IOException.class) public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); @@ -120,6 +127,7 @@ public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception { public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { expectedException.expect(LocalFileWriterFailedException.class); Record record = Mockito.mock(Record.class); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now()); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L)); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0)); @@ -133,6 +141,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception { public void shouldGetEmptyFlushedPath() throws Exception { Record record = Mockito.mock(Record.class); Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L)); + Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone))); Mockito.when(record.getTopic("")).thenReturn(defaultTopic); Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test"); Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1);