diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java index d8e4fedd4b..e0e06ee032 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java @@ -24,7 +24,7 @@ * * RDS S3 export formats: * - DATE: "yyyy-MM-dd" (Example: "2024-01-15") - * - TIME: "HH:mm:ss" (Example: "14:30:00") + * - TIME: "HH:mm:ss[.SSSSSS]" (Example: "14:30:00.123456") * - DATETIME: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") * - TIMESTAMP: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") * Note: Fractional seconds are optional for DATETIME and TIMESTAMP @@ -32,7 +32,7 @@ */ public class TemporalTypeHandler implements DataTypeHandler { private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd"; - private static final String MYSQL_TIME_FORMAT = "HH:mm:ss"; + private static final String MYSQL_TIME_FORMAT = "HH:mm:ss[.SSSSSS]"; private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; // Thread-safe formatters @@ -87,6 +87,11 @@ private Long handleTime(final String timeStr) { private Long handleDate(final String dateStr) { try { + // Handle MySQL zero date special case + if ("0000-00-00".equals(dateStr)) { + return null; // or return a specific default value + } + // Try parsing as Unix timestamp first final Long dateEpoch = parseDateTimeStrAsEpochMillis(dateStr); if (dateEpoch != null) return dateEpoch; @@ -117,8 +122,16 @@ private Long handleDateTime(final String dateTimeStr) { // Binlog reader converts temporal fields to epoch millis // The Binlog reader is set with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG private Long parseDateTimeStrAsEpochMillis(final String dateTimeStr) { - // Try parsing as Unix timestamp first + // Try parsing as Unix timestamp try { + // Date Time field in OpenSearch when represented as long value should be in milliseconds since the epoch + // If the value is already in microseconds (length > 13) convert to milliseconds + if (dateTimeStr.length() > 13) { + // Convert microseconds to milliseconds by dividing by 1000 + return Long.parseLong(dateTimeStr) / 1000; + } + + return Long.parseLong(dateTimeStr); } catch (NumberFormatException ignored) { // Continue with datetime parsing diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java index e9669409d7..5b48315021 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java @@ -49,7 +49,7 @@ void handle_whenValueIsNull_returnsNull() { @ParameterizedTest @MethodSource("provideDateTestCases") - void handle_withDateType_returnsCorrectEpochMillis(String input, long expected) { + void handle_withDateType_returnsCorrectEpochMillis(String input, Long expected) { Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", input, null); assertEquals(expected, result); } @@ -59,6 +59,7 @@ private static Stream provideDateTestCases() { Arguments.of("2023-12-25", getEpochMillisFromDate(2023, 12, 25)), Arguments.of("1970-01-01", getEpochMillisFromDate(1970, 1, 1)), Arguments.of("2024-02-29", getEpochMillisFromDate(2024, 2, 29)), // Leap year + Arguments.of("0000-00-00", null), // Leap year Arguments.of("1684108800000", getEpochMillisFromDate(2023, 5, 15)) ); } @@ -83,7 +84,9 @@ private static Stream provideTimeTestCases() { Arguments.of("52200000", getEpochMillis(1970, 1, 1, 14, 30, 0, 0)), Arguments.of("52200123", - getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000)) + getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000)), + Arguments.of("16:30:00.000000", + getEpochMillis(1970, 1, 1, 16, 30, 0, 0)) ); }