diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java new file mode 100644 index 0000000000..174d105b71 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java @@ -0,0 +1,21 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype; + +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +/** + * Interface for handling MySQL data type conversions. + * Implementations of this interface are responsible for converting MySQL column values + * to appropriate string representations based on their data types. + */ +public interface DataTypeHandler { + /** + * Handles the conversion of a MySQL column value to its string representation. + * + * @param columnType The MySQL data type of the column being processed + * @param columnName The name of the column being processed + * @param value The value to be converted, can be null + * @param metadata Additional metadata about the table structure and properties + * @return A string representation of the converted value + */ + Object handle(MySQLDataType columnType, String columnName, Object value, TableMetadata metadata); +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java new file mode 100644 index 0000000000..3279fe74c2 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHelper.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.BinaryTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.JsonTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.NumericTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.SpatialTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.StringTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.TemporalTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Map; + +public class DataTypeHelper { + private static final Map typeHandlers = Map.of( + MySQLDataType.DataCategory.NUMERIC, new NumericTypeHandler(), + MySQLDataType.DataCategory.STRING, new StringTypeHandler(), + MySQLDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(), + MySQLDataType.DataCategory.BINARY, new BinaryTypeHandler(), + MySQLDataType.DataCategory.JSON, new JsonTypeHandler(), + MySQLDataType.DataCategory.SPATIAL, new SpatialTypeHandler() + ); + + public static Object getDataByColumnType(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + if (value == null) { + return null; + } + + return typeHandlers.get(columnType.getCategory()).handle(columnType, columnName, value, metadata); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java new file mode 100644 index 0000000000..e1d54dd6b9 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java @@ -0,0 +1,176 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype; + +import java.util.HashMap; +import java.util.Map; + +public enum MySQLDataType { + // Numeric types + TINYINT("tinyint", DataCategory.NUMERIC, DataSubCategory.SIGNED), + TINYINT_UNSIGNED("tinyint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED), + SMALLINT("smallint", DataCategory.NUMERIC, DataSubCategory.SIGNED), + SMALLINT_UNSIGNED("smallint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED), + MEDIUMINT("mediumint", DataCategory.NUMERIC, DataSubCategory.SIGNED), + MEDIUMINT_UNSIGNED("mediumint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED), + INT("int", DataCategory.NUMERIC, DataSubCategory.SIGNED), + INT_UNSIGNED("int unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED), + BIGINT("bigint", DataCategory.NUMERIC, DataSubCategory.SIGNED), + BIGINT_UNSIGNED("bigint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED), + DECIMAL("decimal", DataCategory.NUMERIC, DataSubCategory.SIGNED), + FLOAT("float", DataCategory.NUMERIC, DataSubCategory.SIGNED), + DOUBLE("double", DataCategory.NUMERIC, DataSubCategory.SIGNED), + BIT("bit", DataCategory.NUMERIC, DataSubCategory.BIT), + + // String types + CHAR("char", DataCategory.STRING, DataSubCategory.CHAR), + VARCHAR("varchar", DataCategory.STRING, DataSubCategory.CHAR), + TINYTEXT("tinytext", DataCategory.STRING, DataSubCategory.BYTES), + TEXT("text", DataCategory.STRING, DataSubCategory.BYTES), + MEDIUMTEXT("mediumtext", DataCategory.STRING, DataSubCategory.BYTES), + LONGTEXT("longtext", DataCategory.STRING, DataSubCategory.BYTES), + ENUM("enum", DataCategory.STRING, DataSubCategory.ENUM), + SET("set", DataCategory.STRING, DataSubCategory.SET), + + // Date and time types + DATE("date", DataCategory.TEMPORAL), + TIME("time", DataCategory.TEMPORAL), + DATETIME("datetime", DataCategory.TEMPORAL), + TIMESTAMP("timestamp", DataCategory.TEMPORAL), + YEAR("year", DataCategory.TEMPORAL), + + // Binary types + BINARY("binary", DataCategory.BINARY), + VARBINARY("varbinary", DataCategory.BINARY), + TINYBLOB("tinyblob", DataCategory.BINARY), + BLOB("blob", DataCategory.BINARY), + MEDIUMBLOB("mediumblob", DataCategory.BINARY), + LONGBLOB("longblob", DataCategory.BINARY), + + // Special types + JSON("json", DataCategory.JSON), + GEOMETRY("geometry", DataCategory.SPATIAL); + + private static final Map TYPE_MAP; + + static { + TYPE_MAP = new HashMap<>(values().length); + for (MySQLDataType dataType : values()) { + TYPE_MAP.put(dataType.dataType, dataType); + } + } + + private final String dataType; + private final DataCategory category; + private final DataSubCategory subCategory; + + MySQLDataType(String dataType, DataCategory category) { + this.dataType = dataType; + this.category = category; + this.subCategory = null; + } + + MySQLDataType(String dataType, DataCategory category, DataSubCategory subCategory) { + this.dataType = dataType; + this.category = category; + this.subCategory = subCategory; + } + + public String getDataType() { + return dataType; + } + + public DataCategory getCategory() { + return category; + } + + public DataSubCategory getSubCategory() { + return subCategory; + } + + public static MySQLDataType byDataType(final String dataType) { + final MySQLDataType type = TYPE_MAP.get(dataType.toLowerCase()); + if (type == null) { + throw new IllegalArgumentException("Unsupported MySQL data type: " + dataType); + } + return type; + } + + public enum DataCategory { + NUMERIC, + STRING, + TEMPORAL, + BINARY, + JSON, + SPATIAL + } + + public enum DataSubCategory { + BIT, + SIGNED, + UNSIGNED, + CHAR, + BYTES, + TEMPORAL, + BINARY, + JSON, + SPATIAL, + ENUM, + SET + } + + public boolean isNumeric() { + return category == DataCategory.NUMERIC; + } + + public boolean isUnsigned() { + return category == DataCategory.NUMERIC && subCategory == DataSubCategory.UNSIGNED; + } + + public boolean isString() { + return category == DataCategory.STRING; + } + + public boolean isStringBytes() { + return category == DataCategory.STRING && subCategory == DataSubCategory.BYTES; + } + + public boolean isStringSet() { + return category == DataCategory.STRING && subCategory == DataSubCategory.SET; + } + + public boolean isStringEnum() { + return category == DataCategory.STRING && subCategory == DataSubCategory.ENUM; + } + + public boolean isTemporal() { + return category == DataCategory.TEMPORAL; + } + + public boolean isBinary() { + return category == DataCategory.BINARY; + } + + public boolean isJson() { + return category == DataCategory.JSON; + } + + public boolean isSpatial() { + return category == DataCategory.SPATIAL; + } + + public long getUnsignedMask() { + switch (this) { + case TINYINT_UNSIGNED: + return 0xFFL; + case SMALLINT_UNSIGNED: + return 0xFFFFL; + case MEDIUMINT_UNSIGNED: + return 0xFFFFFFL; + case INT_UNSIGNED: + return 0xFFFFFFFFL; + case BIGINT_UNSIGNED: + return 0xFFFFFFFFFFFFFFFFL; + default: + throw new UnsupportedOperationException("No mask for non-unsigned type: " + this); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java new file mode 100644 index 0000000000..368b4c6cd3 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +import java.util.Base64; + +public class BinaryTypeHandler implements DataTypeHandler { + + @Override + public String handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + return Base64.getEncoder().encodeToString((byte[]) value); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java new file mode 100644 index 0000000000..3f99959279 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +import java.io.IOException; + +public class JsonTypeHandler implements DataTypeHandler { + + @Override + public String handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + return convertToJson((byte[]) value); + } + + private String convertToJson(final byte[] jsonBytes) { + try { + return JsonBinary.parseAsString(jsonBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java new file mode 100644 index 0000000000..37bac6f465 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java @@ -0,0 +1,54 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +import java.math.BigInteger; + +public class NumericTypeHandler implements DataTypeHandler { + + @Override + public Number handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + if (value == null) { + return null; + } + + if (!columnType.isNumeric()) { + throw new IllegalArgumentException("ColumnType is not numeric: " + columnType); + } + + if (!(value instanceof Number)) { + throw new IllegalArgumentException("Value is not a number: " + value); + } + + return handleNumericType(columnType, (Number) value); + } + + private Number handleNumericType(final MySQLDataType columnType, final Number value) { + if (columnType.isUnsigned()) { + if (columnType == MySQLDataType.BIGINT_UNSIGNED) { + return handleUnsignedDouble(value); + } else { + return handleUnsignedNumber(value, columnType.getUnsignedMask()); + } + } + return value; + } + + private Number handleUnsignedNumber(final Number value, final long mask) { + final long longVal = value.longValue(); + return longVal < 0 ? longVal & mask : longVal; + } + + private Number handleUnsignedDouble(final Number value) { + long longVal = value.longValue(); + if (longVal < 0) { + return BigInteger.valueOf(longVal & Long.MAX_VALUE) + .add(BigInteger.valueOf(Long.MAX_VALUE)) + .add(BigInteger.ONE); + } + return value; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java new file mode 100644 index 0000000000..023b1613e5 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java @@ -0,0 +1,15 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +public class SpatialTypeHandler implements DataTypeHandler { + + @Override + public String handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + // TODO: Implement the transformation + return new String((byte[]) value); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java new file mode 100644 index 0000000000..8a80eb5b63 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java @@ -0,0 +1,40 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +import java.util.ArrayList; +import java.util.List; + +public class StringTypeHandler implements DataTypeHandler { + + @Override + public String handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + if (columnType.isStringBytes()) { + return new String((byte[]) value); + } else if (columnType.isStringEnum() && value instanceof Integer) { + return getEnumValue((int) value, metadata.getEnumStrValues().get(columnName)); + } else if (columnType.isStringSet() && value instanceof Long) { + return getSetValues((long) value, metadata.getSetStrValues().get(columnName)).toString(); + } else { + return value.toString(); + } + } + + private List getSetValues(final long numericValue, final String[] setStrValues) { + final List setValues = new ArrayList<>(); + for (int i = 0; i < setStrValues.length; i++) { + if ((numericValue & (1L << i)) != 0) { + setValues.add(setStrValues[i].trim()); + } + } + + return setValues; + } + + private String getEnumValue(final int numericValue, final String[] enumStrValues) { + return enumStrValues[numericValue - 1]; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java new file mode 100644 index 0000000000..5511a36975 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandler.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; + +public class TemporalTypeHandler implements DataTypeHandler { + + @Override + public String handle(final MySQLDataType columnType, final String columnName, final Object value, + final TableMetadata metadata) { + // Date and Time types + switch (columnType) { + // TODO: Implement the transformation + case DATE: + case TIME: + case TIMESTAMP: + case DATETIME: + case YEAR: + return value.toString(); + default: + throw new IllegalArgumentException("Unsupported temporal data type: " + columnType); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java index f9ce48a3cc..310919c8ca 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java @@ -5,19 +5,30 @@ package org.opensearch.dataprepper.plugins.source.rds.model; +import java.util.Collections; import java.util.List; +import java.util.Map; public class TableMetadata { private String databaseName; private String tableName; private List columnNames; private List primaryKeys; + private Map setStrValues; + private Map enumStrValues; public TableMetadata(String tableName, String databaseName, List columnNames, List primaryKeys) { + this(tableName, databaseName, columnNames, primaryKeys, Collections.emptyMap(), Collections.emptyMap()); + } + + public TableMetadata(String tableName, String databaseName, List columnNames, List primaryKeys, + Map setStrValues, Map enumStrValues) { this.tableName = tableName; this.databaseName = databaseName; this.columnNames = columnNames; this.primaryKeys = primaryKeys; + this.setStrValues = setStrValues; + this.enumStrValues = enumStrValues; } public String getDatabaseName() { @@ -56,4 +67,19 @@ public void setPrimaryKeys(List primaryKeys) { this.primaryKeys = primaryKeys; } + public Map getSetStrValues() { + return setStrValues; + } + + public void setSetStrValues(Map setStrValues) { + this.setStrValues = setStrValues; + } + + public Map getEnumStrValues() { + return enumStrValues; + } + + public void setEnumStrValues(Map enumStrValues) { + this.enumStrValues = enumStrValues; + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java new file mode 100644 index 0000000000..7e47a0d9e0 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java @@ -0,0 +1,33 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +public class BinaryTypeHandlerTest { + + @Test + public void test_handle() { + final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataType columnType = MySQLDataType.BINARY; + final String columnName = "binaryColumn"; + final byte[] testData = "Test binary data".getBytes(); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, testData, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(Base64.getEncoder().encodeToString(testData))); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java new file mode 100644 index 0000000000..d7166a192d --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java @@ -0,0 +1,33 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class JsonTypeHandlerTest { + + @Test + public void test_handle() { + final DataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final String jsonValue = "{\"key\":\"value\"}"; + final byte[] testData = jsonValue.getBytes(); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, testData, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(jsonValue)); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java new file mode 100644 index 0000000000..d1a8e4f763 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java @@ -0,0 +1,131 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class NumericTypeHandlerTest { + @ParameterizedTest + @MethodSource("provideNumericTypeData") + public void test_handle(final MySQLDataType mySQLDataType, final String columnName, final Object value, final Object expectedValue) { + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + Object result = numericTypeHandler.handle(mySQLDataType, columnName, value, metadata); + + if (result != null) { + assertThat(result, instanceOf(expectedValue.getClass())); + } + assertThat(result, is(expectedValue)); + } + + private static Stream provideNumericTypeData() { + return Stream.of( + // TINYINT tests (signed: -128 to 127) + Arguments.of(MySQLDataType.TINYINT, "tinyint_col", (byte)1, (byte)1), + Arguments.of(MySQLDataType.TINYINT, "tinyint_col", (byte)-128, (byte)-128), + Arguments.of(MySQLDataType.TINYINT, "tinyint_col", (byte)127, (byte)127), + Arguments.of(MySQLDataType.TINYINT, "tinyint_col", null, null), + + // TINYINT UNSIGNED tests (0 to 255) + Arguments.of(MySQLDataType.TINYINT_UNSIGNED, "tinyint_unsigned_col", (short)0, 0L), + Arguments.of(MySQLDataType.TINYINT_UNSIGNED, "tinyint_unsigned_col", (short)255, 255L), + Arguments.of(MySQLDataType.TINYINT_UNSIGNED, "tinyint_unsigned_col", (short)128, 128L), + Arguments.of(MySQLDataType.TINYINT_UNSIGNED, "tinyint_unsigned_col", (short)-1, 255L), + + // SMALLINT tests (signed: -32,768 to 32,767) + Arguments.of(MySQLDataType.SMALLINT, "smallint_col", (short)32767, (short)32767), + Arguments.of(MySQLDataType.SMALLINT, "smallint_col", (short)-32768, (short)-32768), + Arguments.of(MySQLDataType.SMALLINT, "smallint_col", (short)0, (short)0), + + // SMALLINT UNSIGNED tests (0 to 65,535) + Arguments.of(MySQLDataType.SMALLINT_UNSIGNED, "smallint_unsigned_col", 0, 0L), + Arguments.of(MySQLDataType.SMALLINT_UNSIGNED, "smallint_unsigned_col", 65535, 65535L), + Arguments.of(MySQLDataType.SMALLINT_UNSIGNED, "smallint_unsigned_col", 32768, 32768L), + Arguments.of(MySQLDataType.SMALLINT_UNSIGNED, "smallint_unsigned_col", -1, 65535L), + + // INTEGER/INT tests (signed: -2,147,483,648 to 2,147,483,647) + Arguments.of(MySQLDataType.INT, "int_col", 2147483647, 2147483647), + Arguments.of(MySQLDataType.INT, "int_col", -2147483648, -2147483648), + Arguments.of(MySQLDataType.INT, "int_col", 0, 0), + + // INTEGER/INT UNSIGNED tests (0 to 4,294,967,295) + Arguments.of(MySQLDataType.INT_UNSIGNED, "int_unsigned_col", 4294967295L, 4294967295L), + Arguments.of(MySQLDataType.INT_UNSIGNED, "int_unsigned_col", 0L, 0L), + Arguments.of(MySQLDataType.INT_UNSIGNED, "int_unsigned_col", 2147483648L, 2147483648L), + Arguments.of(MySQLDataType.INT_UNSIGNED, "int_unsigned_col", -1, 4294967295L), + + // BIGINT tests (signed: -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807) + Arguments.of(MySQLDataType.BIGINT, "bigint_col", 9223372036854775807L, 9223372036854775807L), + Arguments.of(MySQLDataType.BIGINT, "bigint_col", -9223372036854775808L, -9223372036854775808L), + Arguments.of(MySQLDataType.BIGINT, "bigint_col", 0L, 0L), + + // BIGINT UNSIGNED tests (0 to 18,446,744,073,709,551,615) + Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new BigInteger("18446744073709551615"), new BigInteger("18446744073709551615")), + Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", BigInteger.ZERO, new BigInteger("0")), + Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new BigInteger("9223372036854775808"), new BigInteger("9223372036854775808")), + Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new BigInteger("-1"), new BigInteger("18446744073709551615")), + + // DECIMAL/NUMERIC tests + Arguments.of(MySQLDataType.DECIMAL, "decimal_col", new BigDecimal("123.45"), new BigDecimal("123.45")), + Arguments.of(MySQLDataType.DECIMAL, "decimal_col", new BigDecimal("-123.45"), new BigDecimal("-123.45")), + Arguments.of(MySQLDataType.DECIMAL, "decimal_col", new BigDecimal("0.0"), new BigDecimal("0.0")), + Arguments.of(MySQLDataType.DECIMAL, "decimal_col", new BigDecimal("999999.99"), new BigDecimal("999999.99")), + + // FLOAT tests + Arguments.of(MySQLDataType.FLOAT, "float_col", 123.45f, 123.45f), + Arguments.of(MySQLDataType.FLOAT, "float_col", -123.45f, -123.45f), + Arguments.of(MySQLDataType.FLOAT, "float_col", 0.0f, 0.0f), + Arguments.of(MySQLDataType.FLOAT, "float_col", Float.MAX_VALUE, Float.MAX_VALUE), + + // DOUBLE tests + Arguments.of(MySQLDataType.DOUBLE, "double_col", 123.45678901234, 123.45678901234), + Arguments.of(MySQLDataType.DOUBLE, "double_col", -123.45678901234, -123.45678901234), + Arguments.of(MySQLDataType.DOUBLE, "double_col", 0.0, 0.0), + Arguments.of(MySQLDataType.DOUBLE, "double_col", Double.MAX_VALUE, Double.MAX_VALUE) + ); + } + + @Test + public void test_handleInvalidType() { + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + List.of("invalid_col"), List.of("invalid_col"), + Collections.emptyMap(), Collections.emptyMap()); + final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + + assertThrows(IllegalArgumentException.class, () -> { + numericTypeHandler.handle(MySQLDataType.INT_UNSIGNED, "invalid_col", "not_a_number", metadata); + }); + } + + @Test + public void test_handleInvalidValue() { + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + List.of("int_col"), List.of("int_col"), + Collections.emptyMap(), Collections.emptyMap()); + final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + + assertThrows(IllegalArgumentException.class, () -> { + numericTypeHandler.handle(MySQLDataType.INT, "int_col", "not_a_number", metadata); + }); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java new file mode 100644 index 0000000000..ac0449b696 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class SpatialTypeHandlerTest { + + @Test + public void test_handle() { + final DataTypeHandler handler = new SpatialTypeHandler(); + final MySQLDataType columnType = MySQLDataType.GEOMETRY; + final String columnName = "geometryColumn"; + final String value = UUID.randomUUID().toString(); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, value.getBytes(), metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java new file mode 100644 index 0000000000..434648b871 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandlerTest.java @@ -0,0 +1,86 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class StringTypeHandlerTest { + @Test + public void test_handle_char_string() { + DataTypeHandler handler = new StringTypeHandler(); + String columnName = "testColumn"; + MySQLDataType columnType = MySQLDataType.VARCHAR; + final String value = "Hello, World!"; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + + Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } + + @Test + public void test_handle_byte_string() { + DataTypeHandler handler = new StringTypeHandler(); + String columnName = "testColumn"; + MySQLDataType columnType = MySQLDataType.TEXT; + final String value = "Hello, World!"; + byte[] testBytes = value.getBytes(); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + + Object result = handler.handle(columnType, columnName, testBytes, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } + + @Test + public void test_handle_enum_string() { + StringTypeHandler handler = new StringTypeHandler(); + String columnName = "testColumn"; + Integer value = 2; + String[] enumValues = { "ENUM1", "ENUM2", "ENUM3" }; + MySQLDataType columnType = MySQLDataType.ENUM; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Map.of(columnName, enumValues)); + + String result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is("ENUM2")); + } + + @Test + public void test_handle_set_string() { + StringTypeHandler handler = new StringTypeHandler(); + String columnName = "testColumn"; + Long value = 3L; + String[] setStrValues = { "Value1", "Value2", "Value3" }; + Map setStrValuesMap = new HashMap<>(); + setStrValuesMap.put(columnName, setStrValues); + MySQLDataType columnType = MySQLDataType.SET; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + setStrValuesMap, Collections.emptyMap()); + + String result = handler.handle(columnType, columnName, value, metadata); + + assertEquals("[Value1, Value2]", result); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java new file mode 100644 index 0000000000..35c2b92e55 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/TemporalTypeHandlerTest.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.source.rds.datatype.impl; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class TemporalTypeHandlerTest { + + @Test + public void test_handle() { + final DataTypeHandler handler = new TemporalTypeHandler(); + final MySQLDataType columnType = MySQLDataType.TIME; + final String columnName = "jsonColumn"; + final String value = UUID.randomUUID().toString(); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(value)); + } +}