From 112aa334f126b02c46e06e8294165d636905c8b5 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:34:43 -0600 Subject: [PATCH] Add support for MySQL Date/Time/TimeStamp/DateTime date type transformation to OpenSearch Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../datatype/impl/TemporalTypeHandler.java | 119 +++++++++-- .../impl/TemporalTypeHandlerTest.java | 190 ++++++++++++++++-- 2 files changed, 276 insertions(+), 33 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java index 5511a36975..f0d5b4dbcb 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java @@ -4,22 +4,115 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + public class TemporalTypeHandler implements DataTypeHandler { + private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd"; + private static final String MYSQL_TIME_FORMAT = "HH:mm:ss"; + private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; + + // Thread-safe formatters + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATE_FORMAT); + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_TIME_FORMAT); + private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATETIME_FORMAT); @Override - public String handle(final MySQLDataType columnType, final String columnName, final Object value, - final TableMetadata metadata) { - // Date and Time types - switch (columnType) { - // TODO: Implement the transformation - case DATE: - case TIME: - case TIMESTAMP: - case DATETIME: - case YEAR: - return value.toString(); - default: - throw new IllegalArgumentException("Unsupported temporal data type: " + columnType); + public Long handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + if (value == null) { + return null; + } + + final String strValue = value.toString().trim(); + + try { + switch (columnType) { + case DATE: + return handleDate(strValue); + case TIME: + return handleTime(strValue); + case DATETIME: + case TIMESTAMP: + return handleDateTime(strValue); + case YEAR: + return handleYear(strValue); + default: + throw new IllegalArgumentException("Unsupported temporal data type: " + columnType); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to parse %s value: %s", columnType, strValue), e); + } + } + + private Long handleTime(final String timeStr) { + try { + // Try parsing as Unix timestamp first + final Long timeEpoch = parseDateTimeStrAsEpoch(timeStr); + if (timeEpoch != null) return timeEpoch; + + final LocalTime time = LocalTime.parse(timeStr, TIME_FORMATTER); + // Combine with date from EPOCH + return time.atDate(LocalDate.EPOCH) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid time format: " + timeStr, e); + } + } + + private Long handleDate(final String dateStr) { + try { + // Try parsing as Unix timestamp first + final Long dateEpoch = parseDateTimeStrAsEpoch(dateStr); + if (dateEpoch != null) return dateEpoch; + + LocalDate date = LocalDate.parse(dateStr, DATE_FORMATTER); + return date.atStartOfDay(ZoneId.systemDefault()) + .withZoneSameInstant(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid date format: " + dateStr, e); + } + } + + private Long handleDateTime(final String dateTimeStr) { + try { + final Long dateTimeEpoch = parseDateTimeStrAsEpoch(dateTimeStr); + if (dateTimeEpoch != null) return dateTimeEpoch; + + // Parse using standard MySQL datetime format + return LocalDateTime.parse(dateTimeStr, DATETIME_FORMATTER) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Invalid datetime format: " + dateTimeStr, e); + } + } + + private Long parseDateTimeStrAsEpoch(final String dateTimeStr) { + // Try parsing as Unix timestamp first + try { + return Long.parseLong(dateTimeStr); + } catch (NumberFormatException ignored) { + // Continue with datetime parsing + } + return null; + } + + private Long handleYear(final String yearStr) { + try { + // MySQL YEAR values are typically four-digit numbers (e.g., 2024). + return Long.parseLong(yearStr); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid year format: " + yearStr, e); } } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java index 35c2b92e55..151bba7919 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java @@ -1,32 +1,182 @@ package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.ZoneOffset; +import java.util.Date; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; -public class TemporalTypeHandlerTest { + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertNull; + +class TemporalTypeHandlerTest { + private TemporalTypeHandler temporalTypeHandler; + + @BeforeEach + void setUp() { + temporalTypeHandler = new TemporalTypeHandler(); + } + + private static long getEpochMillisFromDateStr(final String dateStr) throws ParseException { + final SimpleDateFormat mysqlDateFormat = new SimpleDateFormat("yyyy-MM-dd"); + + final Date date = mysqlDateFormat.parse(dateStr); + return date.getTime(); + } + + private static long getEpochMillis(int year, int month, int day, int hour, int minute, int second, int nanoSeconds) { + return LocalDateTime.of(year, month, day, hour, minute, second, nanoSeconds) + .toInstant(ZoneOffset.UTC) + .toEpochMilli(); + } + + @Test + void handle_whenValueIsNull_returnsNull() { + assertNull(temporalTypeHandler.handle(MySQLDataType.DATE, "test_column", null, null)); + } + + @ParameterizedTest + @MethodSource("provideDateTestCases") + void handle_withDateType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideDateTestCases() throws ParseException { + return Stream.of( + Arguments.of("2023-12-25", getEpochMillisFromDateStr("2023-12-25")), + Arguments.of("1970-01-01", getEpochMillisFromDateStr("1970-1-1")), + Arguments.of("2024-02-29", getEpochMillisFromDateStr("2024-2-29")) // Leap year + ); + } + + @ParameterizedTest + @MethodSource("provideTimeTestCases") + void handle_withTimeType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.TIME, "time_column", input, null); + assertEquals(result, expected); + } + + private static Stream provideTimeTestCases() { + return Stream.of( + Arguments.of("14:30:00", + LocalDateTime.of(1970, 1, 1, 14, 30, 0) + .atZone(ZoneId.of("UTC")) + .toInstant() + .toEpochMilli()), + Arguments.of("00:00:00", + LocalDateTime.of(1970, 1, 1, 0, 0, 0) + .atZone(ZoneId.of("UTC")) + .toInstant() + .toEpochMilli()), + Arguments.of("23:59:59", + LocalDateTime.of(1970, 1, 1, 23, 59, 59) + .atZone(ZoneId.of("UTC")) + .toInstant() + .toEpochMilli()) + ); + } + + @ParameterizedTest + @MethodSource("provideDateTimeTestCases") + void handle_withDateTimeType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideDateTimeTestCases() { + return Stream.of( + Arguments.of("2023-12-25 14:30:00.123456", getEpochMillis(2023, 12, 25, 14, 30, 0, 123456000)), + Arguments.of("1970-01-01 00:00:00", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)), + Arguments.of("1703509900000", 1703509900000L) + ); + } + + @ParameterizedTest + @MethodSource("provideTimestampTestCases") + void handle_withTimestampType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.TIMESTAMP, "timestamp_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideTimestampTestCases() { + return Stream.of( + Arguments.of("1703509800000", 1703509800000L), + Arguments.of("2023-12-25 14:30:00", getEpochMillis(2023, 12, 25, 14, 30, 0, 0)) + ); + } + + @ParameterizedTest + @MethodSource("provideYearTestCases") + void handle_withYearType_returnsCorrectEpochMillis(String input, long expected) { + Long result = temporalTypeHandler.handle(MySQLDataType.YEAR, "year_column", input, null); + assertEquals(expected, result); + } + + private static Stream provideYearTestCases() { + return Stream.of( + Arguments.of("2023", 2023), + Arguments.of("1900", 1900), + Arguments.of("1997", 1997) + ); + } + + @Test + void handle_withInvalidFormat_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "invalid-date", null)); + } + + @Test + void handle_withUnsupportedType_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.VARCHAR, "varchar_column", "2023-12-25", null)); + } @Test - public void test_handle() { - final DataTypeHandler handler = new TemporalTypeHandler(); - final MySQLDataType columnType = MySQLDataType.TIME; - final String columnName = "jsonColumn"; - final String value = UUID.randomUUID().toString(); - final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - Object result = handler.handle(columnType, columnName, value, metadata); + void handle_withEmptyString_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "", null)); + } - assertThat(result, is(instanceOf(String.class))); - assertThat(result, is(value)); + @Test + void handle_withWhitespaceString_throwsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> + temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", " ", null)); + } + + @Test + void handle_withLeapYearDate_returnsCorrectEpochMillis() throws ParseException { + Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "2024-02-29", null); + assertEquals(getEpochMillisFromDateStr("2024-2-29"), result); + } + + @Test + void handle_withMaxDateTime_returnsCorrectEpochMillis() { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", + "9999-12-31 23:59:59", null); + assertEquals(getEpochMillis(9999, 12, 31, 23, 59, 59, 0), result); + } + + @Test + void handle_withMinDateTime_returnsCorrectEpochMillis() { + Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", + "1000-01-01 00:00:00", null); + assertEquals(getEpochMillis(1000, 1, 1, 0, 0, 0, 0), result); } } +