From fe2c1fbebb0d8ba861a5e14e9da8c8b17dd5f764 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Fri, 15 Nov 2024 11:03:10 -0600 Subject: [PATCH] Handle microseconds in datetime/timestamp and time MySQL data type (#5192) * Handle microseconds in datetime/timestamp and time MySQL data type Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * update test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * update test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * update test Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../datatype/impl/TemporalTypeHandler.java | 19 ++++++++++++++++--- .../impl/TemporalTypeHandlerTest.java | 12 +++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java index d8e4fedd4b..fe992c8bb5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java @@ -24,7 +24,7 @@ * * RDS S3 export formats: * - DATE: "yyyy-MM-dd" (Example: "2024-01-15") - * - TIME: "HH:mm:ss" (Example: "14:30:00") + * - TIME: "HH:mm:ss[.SSSSSS]" (Example: "14:30:00.123456") * - DATETIME: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") * - TIMESTAMP: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") * Note: Fractional seconds are optional for DATETIME and TIMESTAMP @@ -32,7 +32,7 @@ */ public class TemporalTypeHandler implements DataTypeHandler { private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd"; - private static final String MYSQL_TIME_FORMAT = "HH:mm:ss"; + private static final String MYSQL_TIME_FORMAT = "HH:mm:ss[.SSSSSS]"; private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; // Thread-safe formatters @@ -87,6 +87,11 @@ private Long handleTime(final String timeStr) { private Long handleDate(final String dateStr) { try { + // Handle MySQL zero date special case + if ("0000-00-00".equals(dateStr)) { + return null; + } + // Try parsing as Unix timestamp first final Long dateEpoch = parseDateTimeStrAsEpochMillis(dateStr); if (dateEpoch != null) return dateEpoch; @@ -117,8 +122,16 @@ private Long handleDateTime(final String dateTimeStr) { // Binlog reader converts temporal fields to epoch millis // The Binlog reader is set with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG private Long parseDateTimeStrAsEpochMillis(final String dateTimeStr) { - // Try parsing as Unix timestamp first + // Try parsing as Unix timestamp try { + // Date Time field in OpenSearch when represented as long value should be in milliseconds since the epoch + // If the value is already in microseconds (length > 13) convert to milliseconds + if (dateTimeStr.length() > 13) { + // Convert microseconds to milliseconds by dividing by 1000 + return Long.parseLong(dateTimeStr) / 1000; + } + + return Long.parseLong(dateTimeStr); } catch (NumberFormatException ignored) { // Continue with datetime parsing diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java index e9669409d7..82fbd26f36 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java @@ -49,7 +49,7 @@ void handle_whenValueIsNull_returnsNull() { @ParameterizedTest @MethodSource("provideDateTestCases") - void handle_withDateType_returnsCorrectEpochMillis(String input, long expected) { + void handle_withDateType_returnsCorrectEpochMillis(String input, Long expected) { Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", input, null); assertEquals(expected, result); } @@ -59,6 +59,7 @@ private static Stream provideDateTestCases() { Arguments.of("2023-12-25", getEpochMillisFromDate(2023, 12, 25)), Arguments.of("1970-01-01", getEpochMillisFromDate(1970, 1, 1)), Arguments.of("2024-02-29", getEpochMillisFromDate(2024, 2, 29)), // Leap year + Arguments.of("0000-00-00", null), Arguments.of("1684108800000", getEpochMillisFromDate(2023, 5, 15)) ); } @@ -83,7 +84,11 @@ private static Stream provideTimeTestCases() { Arguments.of("52200000", getEpochMillis(1970, 1, 1, 14, 30, 0, 0)), Arguments.of("52200123", - getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000)) + getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000)), + Arguments.of("16:30:00.000000", + getEpochMillis(1970, 1, 1, 16, 30, 0, 0)), + Arguments.of("07:17:00.456789", + getEpochMillis(1970, 1, 1, 7, 17, 0, 456789000)) ); } @@ -98,7 +103,8 @@ private static Stream provideDateTimeTestCases() { return Stream.of( Arguments.of("2023-12-25 14:30:00.123456", getEpochMillis(2023, 12, 25, 14, 30, 0, 123456000)), Arguments.of("1970-01-01 00:00:00", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), - Arguments.of("1703509900000", 1703509900000L) + Arguments.of("1703509900000", 1703509900000L), + Arguments.of("1784161123456789", 1784161123456L) ); }