Skip to content

Commit

Permalink
Add support for MySQL Date/Time/TimeStamp/DateTime date type transfor…
Browse files Browse the repository at this point in the history
…mation (#5190)

* Add support for MySQL Date/Time/TimeStamp/DateTime date type transformation to OpenSearch

Signed-off-by: Dinu John <[email protected]>

* Configure BinlogClient to deserialize data and time as long

Signed-off-by: Dinu John <[email protected]>

* Add documentation

Signed-off-by: Dinu John <[email protected]>

* Update documentation

Signed-off-by: Dinu John <[email protected]>

* Update documentation

Signed-off-by: Dinu John <[email protected]>

* Update unit test and documentation

Signed-off-by: Dinu John <[email protected]>

* Update documentation

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Nov 15, 2024
1 parent 7e0c7d1 commit 8e6874d
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,134 @@
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;

/**
* Handles MySQL temporal data types (DATE, TIME, DATETIME, TIMESTAMP, YEAR) conversion between binlog and S3 export formats.
*
* The BinlogClient is configured with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG.
* MySQL binlog temporal types are deserialized to use Unix time (milliseconds elapsed since 1970-01-01 00:00:00 UTC):
* - DATE: long value representing milliseconds since epoch (1970-01-01)
* - TIME: long value representing milliseconds since epoch (1970-01-01 00:00:00)
* - DATETIME: long value representing milliseconds since epoch (1970-01-01 00:00:00)
* - TIMESTAMP: long value representing milliseconds since epoch (1970-01-01 00:00:00)
* - YEAR: 4-digit year value (Example: 2024)
*
* RDS S3 export formats:
* - DATE: "yyyy-MM-dd" (Example: "2024-01-15")
* - TIME: "HH:mm:ss" (Example: "14:30:00")
* - DATETIME: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456")
* - TIMESTAMP: "yyyy-MM-dd HH:mm:ss[.SSSSSS]" (Example: "2024-01-15 14:30:00.123456")
* Note: Fractional seconds are optional for DATETIME and TIMESTAMP
* - YEAR: "yyyy" (Example: "2024")
*/
public class TemporalTypeHandler implements DataTypeHandler {
private static final String MYSQL_DATE_FORMAT = "yyyy-MM-dd";
private static final String MYSQL_TIME_FORMAT = "HH:mm:ss";
private static final String MYSQL_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss[.SSSSSS]";

// Thread-safe formatters
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATE_FORMAT);
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_TIME_FORMAT);
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern(MYSQL_DATETIME_FORMAT);

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
// Date and Time types
switch (columnType) {
// TODO: Implement the transformation
case DATE:
case TIME:
case TIMESTAMP:
case DATETIME:
case YEAR:
return value.toString();
default:
throw new IllegalArgumentException("Unsupported temporal data type: " + columnType);
public Long handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
if (value == null) {
return null;
}

final String strValue = value.toString().trim();

try {
switch (columnType) {
case DATE:
return handleDate(strValue);
case TIME:
return handleTime(strValue);
case DATETIME:
case TIMESTAMP:
return handleDateTime(strValue);
case YEAR:
return handleYear(strValue);
default:
throw new IllegalArgumentException("Unsupported temporal data type: " + columnType);
}
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to parse %s value: %s", columnType, strValue), e);
}
}

private Long handleTime(final String timeStr) {
try {
// Try parsing as Unix timestamp first
final Long timeEpoch = parseDateTimeStrAsEpochMillis(timeStr);
if (timeEpoch != null) return timeEpoch;

final LocalTime time = LocalTime.parse(timeStr, TIME_FORMATTER);
// Combine with date from EPOCH
return time.atDate(LocalDate.EPOCH)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
} catch (DateTimeParseException e) {
throw new IllegalArgumentException("Invalid time format: " + timeStr, e);
}
}

private Long handleDate(final String dateStr) {
try {
// Try parsing as Unix timestamp first
final Long dateEpoch = parseDateTimeStrAsEpochMillis(dateStr);
if (dateEpoch != null) return dateEpoch;

LocalDate date = LocalDate.parse(dateStr, DATE_FORMATTER);
return date.atStartOfDay(ZoneOffset.UTC)
.toInstant()
.toEpochMilli();
} catch (DateTimeParseException e) {
throw new IllegalArgumentException("Invalid date format: " + dateStr, e);
}
}

private Long handleDateTime(final String dateTimeStr) {
try {
final Long dateTimeEpoch = parseDateTimeStrAsEpochMillis(dateTimeStr);
if (dateTimeEpoch != null) return dateTimeEpoch;

// Parse using standard MySQL datetime format
return LocalDateTime.parse(dateTimeStr, DATETIME_FORMATTER)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
} catch (DateTimeParseException e) {
throw new IllegalArgumentException("Invalid datetime format: " + dateTimeStr, e);
}
}

// Binlog reader converts temporal fields to epoch millis
// The Binlog reader is set with EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
private Long parseDateTimeStrAsEpochMillis(final String dateTimeStr) {
// Try parsing as Unix timestamp first
try {
return Long.parseLong(dateTimeStr);
} catch (NumberFormatException ignored) {
// Continue with datetime parsing
}
return null;
}

private Long handleYear(final String yearStr) {
try {
// MySQL YEAR values are typically four-digit numbers (e.g., 2024).
return Long.parseLong(yearStr);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid year format: " + yearStr, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
Expand Down Expand Up @@ -35,6 +36,11 @@ public BinaryLogClient create() {
username,
password);
binaryLogClient.setSSLMode(sslMode);
final EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
);
binaryLogClient.setEventDeserializer(eventDeserializer);
return binaryLogClient;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,178 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.time.LocalDate;
import java.time.ZoneOffset;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TemporalTypeHandlerTest {

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.time.LocalDateTime;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertNull;

class TemporalTypeHandlerTest {
private TemporalTypeHandler temporalTypeHandler;

@BeforeEach
void setUp() {
temporalTypeHandler = new TemporalTypeHandler();
}

private static long getEpochMillisFromDate(final int year, final int month, final int day) {
return LocalDate.of(year, month, day)
.atStartOfDay(ZoneOffset.UTC) // Ensure UTC
.toInstant()
.toEpochMilli();
}

private static long getEpochMillis(int year, int month, int day, int hour, int minute, int second, int nanoSeconds) {
return LocalDateTime.of(year, month, day, hour, minute, second, nanoSeconds)
.atZone(ZoneOffset.UTC)
.toInstant()
.toEpochMilli();
}

@Test
public void test_handle() {
final DataTypeHandler handler = new TemporalTypeHandler();
final MySQLDataType columnType = MySQLDataType.TIME;
final String columnName = "jsonColumn";
final String value = UUID.randomUUID().toString();
final TableMetadata metadata = new TableMetadata(
UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName),
Collections.emptyMap(), Collections.emptyMap());
Object result = handler.handle(columnType, columnName, value, metadata);
void handle_whenValueIsNull_returnsNull() {
assertNull(temporalTypeHandler.handle(MySQLDataType.DATE, "test_column", null, null));
}

@ParameterizedTest
@MethodSource("provideDateTestCases")
void handle_withDateType_returnsCorrectEpochMillis(String input, long expected) {
Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", input, null);
assertEquals(expected, result);
}

assertThat(result, is(instanceOf(String.class)));
assertThat(result, is(value));
private static Stream<Arguments> provideDateTestCases() {
return Stream.of(
Arguments.of("2023-12-25", getEpochMillisFromDate(2023, 12, 25)),
Arguments.of("1970-01-01", getEpochMillisFromDate(1970, 1, 1)),
Arguments.of("2024-02-29", getEpochMillisFromDate(2024, 2, 29)), // Leap year
Arguments.of("1684108800000", getEpochMillisFromDate(2023, 5, 15))
);
}

@ParameterizedTest
@MethodSource("provideTimeTestCases")
void handle_withTimeType_returnsCorrectEpochMillis(String input, long expected) {
Long result = temporalTypeHandler.handle(MySQLDataType.TIME, "time_column", input, null);
assertEquals(result, expected);
}

private static Stream<Arguments> provideTimeTestCases() {
return Stream.of(
Arguments.of("14:30:00",
getEpochMillis(1970, 1, 1, 14, 30, 0, 0)),
Arguments.of("00:00:00",
getEpochMillis(1970, 1, 1, 0, 0, 0, 0)),
Arguments.of("23:59:59",
getEpochMillis(1970, 1, 1, 23, 59, 59, 0)),
Arguments.of("85647000",
getEpochMillis(1970, 1, 1, 23, 47, 27, 0)),
Arguments.of("52200000",
getEpochMillis(1970, 1, 1, 14, 30, 0, 0)),
Arguments.of("52200123",
getEpochMillis(1970, 1, 1, 14, 30, 0, 123456000))
);
}

@ParameterizedTest
@MethodSource("provideDateTimeTestCases")
void handle_withDateTimeType_returnsCorrectEpochMillis(String input, long expected) {
Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column", input, null);
assertEquals(expected, result);
}

private static Stream<Arguments> provideDateTimeTestCases() {
return Stream.of(
Arguments.of("2023-12-25 14:30:00.123456", getEpochMillis(2023, 12, 25, 14, 30, 0, 123456000)),
Arguments.of("1970-01-01 00:00:00", getEpochMillis(1970, 1, 1, 0, 0, 0, 0)),
Arguments.of("1703509900000", 1703509900000L)
);
}

@ParameterizedTest
@MethodSource("provideTimestampTestCases")
void handle_withTimestampType_returnsCorrectEpochMillis(String input, long expected) {
Long result = temporalTypeHandler.handle(MySQLDataType.TIMESTAMP, "timestamp_column", input, null);
assertEquals(expected, result);
}

private static Stream<Arguments> provideTimestampTestCases() {
return Stream.of(
Arguments.of("1703509800000", 1703509800000L),
Arguments.of("2023-12-25 14:30:00", getEpochMillis(2023, 12, 25, 14, 30, 0, 0))
);
}

@ParameterizedTest
@MethodSource("provideYearTestCases")
void handle_withYearType_returnsCorrectEpochMillis(String input, long expected) {
Long result = temporalTypeHandler.handle(MySQLDataType.YEAR, "year_column", input, null);
assertEquals(expected, result);
}

private static Stream<Arguments> provideYearTestCases() {
return Stream.of(
Arguments.of("2023", 2023),
Arguments.of("1900", 1900),
Arguments.of("1997", 1997)
);
}

@Test
void handle_withInvalidFormat_throwsIllegalArgumentException() {
assertThrows(IllegalArgumentException.class, () ->
temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "invalid-date", null));
}

@Test
void handle_withUnsupportedType_throwsIllegalArgumentException() {
assertThrows(IllegalArgumentException.class, () ->
temporalTypeHandler.handle(MySQLDataType.VARCHAR, "varchar_column", "2023-12-25", null));
}

@Test
void handle_withEmptyString_throwsIllegalArgumentException() {
assertThrows(IllegalArgumentException.class, () ->
temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "", null));
}

@Test
void handle_withWhitespaceString_throwsIllegalArgumentException() {
assertThrows(IllegalArgumentException.class, () ->
temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", " ", null));
}

@Test
void handle_withLeapYearDate_returnsCorrectEpochMillis() {
Long result = temporalTypeHandler.handle(MySQLDataType.DATE, "date_column", "2024-02-29", null);
assertEquals(getEpochMillisFromDate(2024, 2, 29), result);
}

@Test
void handle_withMaxDateTime_returnsCorrectEpochMillis() {
Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column",
"9999-12-31 23:59:59", null);
assertEquals(getEpochMillis(9999, 12, 31, 23, 59, 59, 0), result);
}

@Test
void handle_withMinDateTime_returnsCorrectEpochMillis() {
Long result = temporalTypeHandler.handle(MySQLDataType.DATETIME, "datetime_column",
"1000-01-01 00:00:00", null);
assertEquals(getEpochMillis(1000, 1, 1, 0, 0, 0, 0), result);
}
}

0 comments on commit 8e6874d

Please sign in to comment.