diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java index 5511a36975..d8e4fedd4b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java @@ -4,22 +4,134 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +/** + * Handles MySQL temporal data types (DATE, TIME, DATETIME, TIMESTAMP, YEAR) conversion between binlog and S3 export formats. + * + * The BinlogClient is configured with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG. + * MySQL binlog temporal types are deserialized to use Unix time (milliseconds elapsed since 1970-01-01 00:00:00 UTC): + * - DATE: long value representing milliseconds since epoch (1970-01-01) + * - TIME: long value representing milliseconds since epoch (1970-01-01 00:00:00) + * - DATETIME: long value representing milliseconds since epoch (1970-01-01 00:00:00) + * - TIMESTAMP: long value representing milliseconds since epoch (1970-01-01 00:00:00) + * - YEAR: 4-digit year value (Example: 2024) + * + * RDS S3 export formats: + * - DATE: "yyyy-MM-dd" (Example: "2024-01-15") + * - TIME: "HH:mm:ss" (Example: "14:30:00") + * - DATETIME: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") + * - TIMESTAMP: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456") + * Note: Fractional seconds are optional for DATETIME and TIMESTAMP + * - YEAR: "yyyy" (Example: "2024") + */ public class TemporalTypeHandler implements DataTypeHandler { + private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd"; + private static final String MYSQL_TIME_FORMAT = "HH:mm:ss"; + private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; + + // Thread-safe formatters + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATE_FORMAT); + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_TIME_FORMAT); + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATETIME_FORMAT); @Override - public String handle(final MySQLDataType columnType, final String columnName, final Object value, - final TableMetadata metadata) { - // Date and Time types - switch (columnType) { - // TODO: Implement the transformation - case DATE: - case TIME: - case TIMESTAMP: - case DATETIME: - case YEAR: - return value.toString(); - default: - throw new IllegalArgumentException("Unsupported temporal data type: " + columnType); + public Long handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + if (value == null) { + return null; + } + + final String strValue = value.toString().trim(); + + try { + switch (columnType) { + case DATE: + return handleDate(strValue); + case TIME: + return handleTime(strValue); + case DATETIME: + case TIMESTAMP: + return handleDateTime(strValue); + case YEAR: + return handleYear(strValue); + default: + throw new IllegalArgumentException("Unsupported temporal data type: " + columnType); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to parse %s value: %s", columnType, strValue), e); + } + } + + private Long handleTime(final String timeStr) { + try { + // Try parsing as Unix timestamp first + final Long timeEpoch = parseDateTimeStrAsEpochMillis(timeStr); + if (timeEpoch != null) return timeEpoch; + + final LocalTime time = LocalTime.parse(timeStr, TIME_FORMATTER); + // Combine with date from EPOCH + return time.atDate(LocalDate.EPOCH) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid time format: " + timeStr, e); + } + } + + private Long handleDate(final String dateStr) { + try { + // Try parsing as Unix timestamp first + final Long dateEpoch = parseDateTimeStrAsEpochMillis(dateStr); + if (dateEpoch != null) return dateEpoch; + + LocalDate date = LocalDate.parse(dateStr, DATE_FORMATTER); + return date.atStartOfDay(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid date format: " + dateStr, e); + } + } + + private Long handleDateTime(final String dateTimeStr) { + try { + final Long dateTimeEpoch = parseDateTimeStrAsEpochMillis(dateTimeStr); + if (dateTimeEpoch != null) return dateTimeEpoch; + + // Parse using standard MySQL datetime format + return LocalDateTime.parse(dateTimeStr, DATETIME_FORMATTER) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid datetime format: " + dateTimeStr, e); + } + } + + // Binlog reader converts temporal fields to epoch millis + // The Binlog reader is set with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG + private Long parseDateTimeStrAsEpochMillis(final String dateTimeStr) { + // Try parsing as Unix timestamp first + try { + return Long.parseLong(dateTimeStr); + } catch (NumberFormatException ignored) { + // Continue with datetime parsing + } + return null; + } + + private Long handleYear(final String yearStr) { + try { + // MySQL YEAR values are typically four-digit numbers (e.g., 2024). + return Long.parseLong(yearStr); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid year format: " + yearStr, e); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java index 8ea7b68889..f580e59fac 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.network.SSLMode; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; @@ -35,6 +36,11 @@ public BinaryLogClient create() { username, password); binaryLogClient.setSSLMode(sslMode); + final EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode( + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG + ); + binaryLogClient.setEventDeserializer(eventDeserializer); return binaryLogClient; } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java index 35c2b92e55..e9669409d7 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java @@ -1,32 +1,178 @@ package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.time.LocalDate; +import java.time.ZoneOffset; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; -public class TemporalTypeHandlerTest { + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.LocalDateTime; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertNull; + +class TemporalTypeHandlerTest { + private TemporalTypeHandler temporalTypeHandler; + + @BeforeEach + void setUp() { + temporalTypeHandler = new TemporalTypeHandler(); + } + + private static long getEpochMillisFromDate(final int year, final int month, final int day) { + return LocalDate.of(year, month, day) + .atStartOfDay(ZoneOffset.UTC) // Ensure UTC + .toInstant() + .toEpochMilli(); + } + + private static long getEpochMillis(int year, int month, int day, int hour, int minute, int second, int nanoSeconds) { + return LocalDateTime.of(year, month, day, hour, minute, second, nanoSeconds) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } @Test - public void test_handle() { - final DataTypeHandler handler = new TemporalTypeHandler(); - final MySQLDataType columnType = MySQLDataType.TIME; - final String columnName = "jsonColumn"; - final String value = UUID.randomUUID().toString(); - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - Object result = handler.handle(columnType, columnName, value, metadata); + void handle_whenValueIsNull_returnsNull() { + assertNull(temporalTypeHandler.handle(MySQLDataType.DATE, "test_column", null, null)); + } + + @ParameterizedTest + @MethodSource("provideDateTestCases") + void handle_withDateType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", input, null); + assertEquals(expected, result); + } - assertThat(result, is(instanceOf(String.class))); - assertThat(result, is(value)); + private static Stream provideDateTestCases() { + return Stream.of( + Arguments.of("2023-12-25", getEpochMillisFromDate(2023, 12, 25)), + Arguments.of("1970-01-01", getEpochMillisFromDate(1970, 1, 1)), + Arguments.of("2024-02-29", getEpochMillisFromDate(2024, 2, 29)), // Leap year + Arguments.of("1684108800000", getEpochMillisFromDate(2023, 5, 15)) + ); + } + + @ParameterizedTest + @MethodSource("provideTimeTestCases") + void handle_withTimeType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.TIME, "time_column", input, null); + assertEquals(result, expected); + } + + private static Stream provideTimeTestCases() { + return Stream.of( + Arguments.of("14:30:00", + getEpochMillis(1970, 1, 1, 14, 30, 0, 0)), + Arguments.of("00:00:00", + getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("23:59:59", + getEpochMillis(1970, 1, 1, 23, 59, 59, 0)), + Arguments.of("85647000", + getEpochMillis(1970, 1, 1, 23, 47, 27, 0)), + Arguments.of("52200000", + getEpochMillis(1970, 1, 1, 14, 30, 0, 0)), + Arguments.of("52200123", + getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000)) + ); + } + + @ParameterizedTest + @MethodSource("provideDateTimeTestCases") + void handle_withDateTimeType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideDateTimeTestCases() { + return Stream.of( + Arguments.of("2023-12-25 14:30:00.123456", getEpochMillis(2023, 12, 25, 14, 30, 0, 123456000)), + Arguments.of("1970-01-01 00:00:00", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("1703509900000", 1703509900000L) + ); + } + + @ParameterizedTest + @MethodSource("provideTimestampTestCases") + void handle_withTimestampType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.TIMESTAMP, "timestamp_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideTimestampTestCases() { + return Stream.of( + Arguments.of("1703509800000", 1703509800000L), + Arguments.of("2023-12-25 14:30:00", getEpochMillis(2023, 12, 25, 14, 30, 0, 0)) + ); + } + + @ParameterizedTest + @MethodSource("provideYearTestCases") + void handle_withYearType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.YEAR, "year_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideYearTestCases() { + return Stream.of( + Arguments.of("2023", 2023), + Arguments.of("1900", 1900), + Arguments.of("1997", 1997) + ); + } + + @Test + void handle_withInvalidFormat_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "invalid-date", null)); + } + + @Test + void handle_withUnsupportedType_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.VARCHAR, "varchar_column", "2023-12-25", null)); + } + + @Test + void handle_withEmptyString_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "", null)); + } + + @Test + void handle_withWhitespaceString_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", " ", null)); + } + + @Test + void handle_withLeapYearDate_returnsCorrectEpochMillis() { + Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "2024-02-29", null); + assertEquals(getEpochMillisFromDate(2024, 2, 29), result); + } + + @Test + void handle_withMaxDateTime_returnsCorrectEpochMillis() { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", + "9999-12-31 23:59:59", null); + assertEquals(getEpochMillis(9999, 12, 31, 23, 59, 59, 0), result); + } + + @Test + void handle_withMinDateTime_returnsCorrectEpochMillis() { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", + "1000-01-01 00:00:00", null); + assertEquals(getEpochMillis(1000, 1, 1, 0, 0, 0, 0), result); } } +