From d3a1dd26bfd3d4a6ffdfb7dc0ba68cdfa4656975 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:13:08 -0600 Subject: [PATCH] Support Data type transformation for MySQL export and stream processing Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/source/rds/RdsService.java | 16 +++- .../source/rds/datatype/DataTypeHandler.java | 1 + .../source/rds/datatype/MySQLDataType.java | 10 +- .../rds/datatype/impl/BinaryTypeHandler.java | 18 +++- .../rds/datatype/impl/JsonTypeHandler.java | 14 +-- .../rds/datatype/impl/NumericTypeHandler.java | 94 +++++++++++++++---- .../rds/datatype/impl/SpatialTypeHandler.java | 10 ++ .../rds/datatype/impl/StringTypeHandler.java | 2 +- .../source/rds/export/DataFileLoader.java | 25 ++++- .../source/rds/export/DataFileScheduler.java | 10 +- .../source/rds/leader/LeaderScheduler.java | 10 +- .../source/rds/model/DbTableMetadata.java | 4 + .../rds/stream/BinlogEventListener.java | 68 ++++++++++++-- .../rds/stream/StreamWorkerTaskRefresher.java | 16 +++- .../datatype/impl/BinaryTypeHandlerTest.java | 64 +++++++++++-- .../datatype/impl/JsonTypeHandlerTest.java | 30 +++++- .../datatype/impl/NumericTypeHandlerTest.java | 38 +++++++- .../datatype/impl/SpatialTypeHandlerTest.java | 37 ++++++++ .../source/rds/export/DataFileLoaderTest.java | 6 +- .../rds/export/DataFileSchedulerTest.java | 29 +++++- .../rds/leader/LeaderSchedulerTest.java | 6 +- .../rds/stream/BinlogEventListenerTest.java | 7 +- .../stream/StreamWorkerTaskRefresherTest.java | 71 ++++++++++++-- 23 files changed, 511 insertions(+), 75 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 880586b6fe..d74c4dc89f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -23,6 +23,7 @@ import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler; import org.opensearch.dataprepper.plugins.source.rds.leader.RdsApiStrategy; import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory; @@ -35,8 +36,10 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; public class RdsService { private static final Logger LOG = LoggerFactory.getLogger(RdsService.class); @@ -95,9 +98,12 @@ public void start(Buffer> buffer) { new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient); final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier()); final String s3PathPrefix = getS3PathPrefix(); + final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata); + final Map> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager); + final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); leaderScheduler = new LeaderScheduler( - sourceCoordinator, sourceConfig, s3PathPrefix, getSchemaManager(sourceConfig, dbMetadata), dbMetadata); + sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata); runnableList.add(leaderScheduler); if (sourceConfig.isExportEnabled()) { @@ -178,5 +184,13 @@ private String getS3PathPrefix() { return s3PathPrefix; } + private Map> getColumnDataTypeMap(final SchemaManager schemaManager) { + return sourceConfig.getTableNames().stream() + .collect(Collectors.toMap( + fullTableName -> fullTableName, + fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1]) + )); + } + } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java index 174d105b71..078617892b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/DataTypeHandler.java @@ -8,6 +8,7 @@ * to appropriate string representations based on their data types. */ public interface DataTypeHandler { + String BYTES_KEY = "bytes"; /** * Handles the conversion of a MySQL column value to its string representation. * diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java index e1d54dd6b9..a80ff19ada 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/MySQLDataType.java @@ -121,10 +121,18 @@ public boolean isNumeric() { return category == DataCategory.NUMERIC; } - public boolean isUnsigned() { + public boolean isNumericUnsigned() { return category == DataCategory.NUMERIC && subCategory == DataSubCategory.UNSIGNED; } + public boolean isBigIntUnsigned() { + return this == MySQLDataType.BIGINT_UNSIGNED; + } + + public boolean isBit() { + return this == MySQLDataType.BIT; + } + public boolean isString() { return category == DataCategory.STRING; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java index 368b4c6cd3..de0ddfea54 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandler.java @@ -4,13 +4,25 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -import java.util.Base64; +import java.util.Map; public class BinaryTypeHandler implements DataTypeHandler { - @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { - return Base64.getEncoder().encodeToString((byte[]) value); + if (value instanceof byte[]) { + return new String((byte[]) value); + } + + if (value instanceof Map) { + Object data = ((Map)value).get(BYTES_KEY); + if (data instanceof byte[]) { + return new String((byte[]) data); + } else { + return data.toString(); + } + } + + return value.toString(); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java index 3f99959279..9ca8789690 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandler.java @@ -5,20 +5,22 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; -import java.io.IOException; - public class JsonTypeHandler implements DataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { - return convertToJson((byte[]) value); + return convertToJson(value); } - private String convertToJson(final byte[] jsonBytes) { + private String convertToJson(final Object value) { try { - return JsonBinary.parseAsString(jsonBytes); - } catch (IOException e) { + if (value instanceof byte[]) { + return JsonBinary.parseAsString((byte[])value); + } else { + return value.toString(); + } + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java index 37bac6f465..e072fd3b9e 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandler.java @@ -5,6 +5,9 @@ import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Map; public class NumericTypeHandler implements DataTypeHandler { @@ -15,40 +18,93 @@ public Number handle(final MySQLDataType columnType, final String columnName, fi return null; } - if (!columnType.isNumeric()) { + if (!(columnType.isNumeric() || columnType.isBit())) { throw new IllegalArgumentException("ColumnType is not numeric: " + columnType); } - if (!(value instanceof Number)) { - throw new IllegalArgumentException("Value is not a number: " + value); + return handleNumericType(columnType, value); + } + + private Number handleNumericType(final MySQLDataType columnType, final Object value) { + if (columnType.isNumericUnsigned()) { + if (columnType.isBigIntUnsigned()) { + return handleUnsignedBigInt(value); + } else { + return handleUnsignedNumber(value, columnType.getUnsignedMask()); + } + } + + if (columnType.isBit()) { + return handleBit(value); + } + + if (value instanceof Number) { + return (Number)value; } - return handleNumericType(columnType, (Number) value); + throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass()); } - private Number handleNumericType(final MySQLDataType columnType, final Number value) { - if (columnType.isUnsigned()) { - if (columnType == MySQLDataType.BIGINT_UNSIGNED) { - return handleUnsignedDouble(value); + private Number handleBit(final Object value) { + if (value instanceof BitSet) { + return bitSetToBigInteger((BitSet) value); + } + + if (value instanceof Map) { + Object data = ((Map)value).get(BYTES_KEY); + if (data instanceof byte[]) { + return new BigInteger(1, (byte[]) data); } else { - return handleUnsignedNumber(value, columnType.getUnsignedMask()); + byte[] bytes = ((String)data).getBytes(); + return new BigInteger(1, bytes); } } - return value; + + throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass()); } - private Number handleUnsignedNumber(final Number value, final long mask) { - final long longVal = value.longValue(); + private Number handleUnsignedNumber(final Object value, final long mask) { + if (!(value instanceof Number)) { + throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass()); + } + + final long longVal = ((Number)value).longValue(); return longVal < 0 ? longVal & mask : longVal; } - private Number handleUnsignedDouble(final Number value) { - long longVal = value.longValue(); - if (longVal < 0) { - return BigInteger.valueOf(longVal & Long.MAX_VALUE) - .add(BigInteger.valueOf(Long.MAX_VALUE)) - .add(BigInteger.ONE); + private Number handleUnsignedBigInt(final Object value) { + if (value instanceof Number) { + long longVal = ((Number)value).longValue(); + if (longVal < 0) { + return BigInteger.valueOf(longVal & Long.MAX_VALUE) + .add(BigInteger.valueOf(Long.MAX_VALUE)) + .add(BigInteger.ONE); + } + return (Number)value; + } + + if (value instanceof ArrayList) { + ArrayList list = (ArrayList) value; + + // Convert ArrayList to byte array + byte[] bytes = new byte[list.size()]; + for (int i = 0; i < list.size(); i++) { + bytes[i] = ((Number) list.get(i)).byteValue(); + } + + return new BigInteger(1, bytes); + } + + throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass().getName()); + } + + private static BigInteger bitSetToBigInteger(BitSet bitSet) { + BigInteger result = BigInteger.ZERO; + for (int i = 0; i < bitSet.length(); i++) { + if (bitSet.get(i)) { + result = result.setBit(i); + } } - return value; + return result; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java index 023b1613e5..73694c0073 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandler.java @@ -4,12 +4,22 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import java.util.Map; + public class SpatialTypeHandler implements DataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { // TODO: Implement the transformation + if (value instanceof Map) { + Object data = ((Map)value).get(BYTES_KEY); + if (data instanceof byte[]) { + return new String((byte[]) data); + } else { + return data.toString(); + } + } return new String((byte[]) value); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java index 8a80eb5b63..e01aee0bea 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/StringTypeHandler.java @@ -12,7 +12,7 @@ public class StringTypeHandler implements DataTypeHandler { @Override public String handle(final MySQLDataType columnType, final String columnName, final Object value, final TableMetadata metadata) { - if (columnType.isStringBytes()) { + if (columnType.isStringBytes() && (value instanceof byte[])) { return new String((byte[]) value); } else if (columnType.isStringEnum() && value instanceof Integer) { return getEnumValue((int) value, metadata.getEnumStrValues().get(columnName)); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java index 5e0fe9ecf3..41c278721a 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -19,12 +19,16 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; @@ -52,6 +56,7 @@ public class DataFileLoader implements Runnable { private final EnhancedSourceCoordinator sourceCoordinator; private final AcknowledgementSet acknowledgementSet; private final Duration acknowledgmentTimeout; + private final DbTableMetadata dbTableMetadata; private final Counter exportRecordsTotalCounter; private final Counter exportRecordSuccessCounter; private final Counter exportRecordErrorCounter; @@ -66,7 +71,8 @@ private DataFileLoader(final DataFilePartition dataFilePartition, final PluginMetrics pluginMetrics, final EnhancedSourceCoordinator sourceCoordinator, final AcknowledgementSet acknowledgementSet, - final Duration acknowledgmentTimeout) { + final Duration acknowledgmentTimeout, + final DbTableMetadata dbTableMetadata) { this.dataFilePartition = dataFilePartition; bucket = dataFilePartition.getBucket(); objectKey = dataFilePartition.getKey(); @@ -77,6 +83,7 @@ private DataFileLoader(final DataFilePartition dataFilePartition, this.sourceCoordinator = sourceCoordinator; this.acknowledgementSet = acknowledgementSet; this.acknowledgmentTimeout = acknowledgmentTimeout; + this.dbTableMetadata = dbTableMetadata; exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); @@ -93,9 +100,10 @@ public static DataFileLoader create(final DataFilePartition dataFilePartition, final PluginMetrics pluginMetrics, final EnhancedSourceCoordinator sourceCoordinator, final AcknowledgementSet acknowledgementSet, - final Duration acknowledgmentTimeout) { + final Duration acknowledgmentTimeout, + final DbTableMetadata dbTableMetadata) { return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter, - pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout); + pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout, dbTableMetadata); } @Override @@ -118,6 +126,7 @@ public void run() { final String fullTableName = progressState.getSourceDatabase() + "." + progressState.getSourceTable(); final List primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of()); + transformEvent(event, fullTableName); final long snapshotTime = progressState.getSnapshotTime(); final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis(); @@ -162,4 +171,14 @@ public void run() { exportRecordErrorCounter.increment(eventCount.get()); } } + + private void transformEvent(final Event event, final String fullTableName) { + Map columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName); + for (Map.Entry entry : event.toMap().entrySet()) { + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(), + entry.getValue(), null); + event.put(entry.getKey(), data); + } + } + } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index 00af2b7db7..22860b65ff 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +157,8 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { Runnable loader = DataFileLoader.create( dataFilePartition, codec, buffer, objectReader, recordConverter, pluginMetrics, - sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout()); + sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout(), + getDBTableMetadata()); CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); if (isAcknowledgmentsEnabled) { @@ -220,4 +222,10 @@ private BiConsumer completeDataLoader(DataFilePartition dataFil numOfWorkers.decrementAndGet(); }; } + + private DbTableMetadata getDBTableMetadata() { + final Optional globalStatePartition = sourceCoordinator.getPartition(sourceConfig.getDbIdentifier()); + final GlobalState globalState = (GlobalState) globalStatePartition.get(); + return DbTableMetadata.fromMap(globalState.getProgressState().get()); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index ed80f136dc..9abe56b5d1 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -16,7 +16,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; -import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ public class LeaderScheduler implements Runnable { private final RdsSourceConfig sourceConfig; private final String s3Prefix; private final SchemaManager schemaManager; - private final DbMetadata dbMetadata; + private final DbTableMetadata dbTableMetadataMetadata; private LeaderPartition leaderPartition; private volatile boolean shutdownRequested = false; @@ -48,12 +48,12 @@ public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, final String s3Prefix, final SchemaManager schemaManager, - final DbMetadata dbMetadata) { + final DbTableMetadata dbTableMetadataMetadata) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.s3Prefix = s3Prefix; this.schemaManager = schemaManager; - this.dbMetadata = dbMetadata; + this.dbTableMetadataMetadata = dbTableMetadataMetadata; } @Override @@ -113,7 +113,7 @@ private void init() { // Create a Global state in the coordination table for rds cluster/instance information. // Global State here is designed to be able to read whenever needed // So that the jobs can refer to the configuration. - sourceCoordinator.createPartition(new GlobalState(sourceConfig.getDbIdentifier(), dbMetadata.toMap())); + sourceCoordinator.createPartition(new GlobalState(sourceConfig.getDbIdentifier(), dbTableMetadataMetadata.toMap())); LOG.debug("Created global state for DB: {}", sourceConfig.getDbIdentifier()); if (sourceConfig.isExportEnabled()) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadata.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadata.java index 0627e9571a..ce9f565a30 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadata.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/DbTableMetadata.java @@ -12,6 +12,10 @@ public class DbTableMetadata { private static final String DB_METADATA_KEY = "dbMetadata"; private static final String TABLE_COLUMN_METADATA_KEY = "tableColumnDataTypeMap"; private final DbMetadata dbMetadata; + /** + * Map of table name to table column data type map + * e.g. { "table1" : { "column1" : "int", "column2" : "varchar" }, "table2" : { "column1" : "int" } } + */ private final Map> tableColumnDataTypeMap; public DbTableMetadata(final DbMetadata dbMetadata, final Map> tableColumnDataTypeMap) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index 6bf3800337..2dbd8922b6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -30,13 +30,17 @@ import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper; +import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -58,6 +62,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { static final String BYTES_RECEIVED = "bytesReceived"; static final String BYTES_PROCESSED = "bytesProcessed"; static final String REPLICATION_LOG_EVENT_PROCESSING_TIME = "replicationLogEntryProcessingTime"; + static final String SEPARATOR = "."; /** * TableId to TableMetadata mapping @@ -73,6 +78,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { private final PluginMetrics pluginMetrics; private final List pipelineEvents; private final StreamCheckpointManager streamCheckpointManager; + private final DbTableMetadata dbTableMetadata; private final ExecutorService binlogEventExecutorService; private final Counter changeEventSuccessCounter; private final Counter changeEventErrorCounter; @@ -92,7 +98,8 @@ public BinlogEventListener(final Buffer> buffer, final PluginMetrics pluginMetrics, final BinaryLogClient binaryLogClient, final StreamCheckpointer streamCheckpointer, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final DbTableMetadata dbTableMetadata) { this.buffer = buffer; this.binaryLogClient = binaryLogClient; tableMetadataMap = new HashMap<>(); @@ -105,6 +112,7 @@ public BinlogEventListener(final Buffer> buffer, binlogEventExecutorService = Executors.newFixedThreadPool( sourceConfig.getStream().getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-binlog-processor")); + this.dbTableMetadata = dbTableMetadata; this.streamCheckpointManager = new StreamCheckpointManager( streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(), acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout()); @@ -123,8 +131,10 @@ public static BinlogEventListener create(final Buffer> buffer, final PluginMetrics pluginMetrics, final BinaryLogClient binaryLogClient, final StreamCheckpointer streamCheckpointer, - final AcknowledgementSetManager acknowledgementSetManager) { - return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); + final AcknowledgementSetManager acknowledgementSetManager, + final DbTableMetadata dbTableMetadata) { + return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, + streamCheckpointer, acknowledgementSetManager, dbTableMetadata); } @Override @@ -178,16 +188,53 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) { } void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { - final TableMapEventData data = event.getData(); - final TableMapEventMetadata tableMapEventMetadata = data.getEventMetadata(); + final TableMapEventData eventData = event.getData(); + final TableMapEventMetadata tableMapEventMetadata = eventData.getEventMetadata(); final List columnNames = tableMapEventMetadata.getColumnNames(); final List primaryKeys = tableMapEventMetadata.getSimplePrimaryKeys().stream() .map(columnNames::get) .collect(Collectors.toList()); final TableMetadata tableMetadata = new TableMetadata( - data.getTable(), data.getDatabase(), columnNames, primaryKeys); + eventData.getTable(), eventData.getDatabase(), columnNames, primaryKeys, + getSetStrValues(eventData), getEnumStrValues(eventData)); if (isTableOfInterest(tableMetadata.getFullTableName())) { - tableMetadataMap.put(data.getTableId(), tableMetadata); + tableMetadataMap.put(eventData.getTableId(), tableMetadata); + } + } + + private Map getSetStrValues(final TableMapEventData eventData) { + return getStrValuesMap(eventData, MySQLDataType.SET); + } + + private Map getEnumStrValues(final TableMapEventData eventData) { + return getStrValuesMap(eventData, MySQLDataType.ENUM); + } + + private Map getStrValuesMap(final TableMapEventData eventData, final MySQLDataType columnType) { + Map strValuesMap = new HashMap<>(); + List columnNames = eventData.getEventMetadata().getColumnNames(); + List strValues = getStrValues(eventData, columnType); + + final Map tbMetadata = dbTableMetadata.getTableColumnDataTypeMap() + .get(eventData.getDatabase() + SEPARATOR + eventData.getTable()); + + for (int i = 0, j=0; i < columnNames.size(); i++) { + final String dataType = tbMetadata.get(columnNames.get(i)); + if (MySQLDataType.byDataType(dataType) == columnType) { + strValuesMap.put(columnNames.get(i), strValues.get(j++)); + } + } + + return strValuesMap; + } + + private List getStrValues(final TableMapEventData eventData, final MySQLDataType columnType) { + if (columnType == MySQLDataType.ENUM) { + return eventData.getEventMetadata().getEnumStrValues(); + } else if (columnType == MySQLDataType.SET) { + return eventData.getEventMetadata().getSetStrValues(); + } else { + return Collections.emptyList(); } } @@ -251,11 +298,14 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve final long eventTimestampMillis = event.getHeader().getTimestamp(); final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - for (Object[] rowDataArray : rows) { final Map rowDataMap = new HashMap<>(); for (int i = 0; i < rowDataArray.length; i++) { - rowDataMap.put(columnNames.get(i), rowDataArray[i]); + final Map tbColumnDatatypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(tableMetadata.getFullTableName()); + final String columnDataType = tbColumnDatatypeMap.get(columnNames.get(i)); + final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataType), columnNames.get(i), + rowDataArray[i], tableMetadata); + rowDataMap.put(columnNames.get(i), data); } final Event dataPrepperEvent = JacksonEvent.builder() diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java index 270a4f3fbf..a665ea35ad 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java @@ -14,12 +14,16 @@ import org.opensearch.dataprepper.model.plugin.PluginConfigObserver; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -113,8 +117,10 @@ public void shutdown() { private void refreshTask(RdsSourceConfig sourceConfig) { final BinaryLogClient binaryLogClient = binlogClientFactory.create(); + final DbTableMetadata dbTableMetadata = getDBTableMetadata(streamPartition); binaryLogClient.registerEventListener(BinlogEventListener.create( - buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); + buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, + acknowledgementSetManager, dbTableMetadata)); final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); executorService.submit(() -> streamWorker.processStream(streamPartition)); } @@ -124,4 +130,12 @@ private boolean basicAuthChanged(final RdsSourceConfig.AuthenticationConfig newA return !Objects.equals(currentAuthConfig.getUsername(), newAuthConfig.getUsername()) || !Objects.equals(currentAuthConfig.getPassword(), newAuthConfig.getPassword()); } + + private DbTableMetadata getDBTableMetadata(final StreamPartition streamPartition) { + final String dbIdentifier = streamPartition.getPartitionKey(); + final Optional globalStatePartition = sourceCoordinator.getPartition(dbIdentifier); + final GlobalState globalState = (GlobalState) globalStatePartition.get(); + return DbTableMetadata.fromMap(globalState.getProgressState().get()); + } } + diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java index 7e47a0d9e0..5321c80e3a 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/BinaryTypeHandlerTest.java @@ -9,25 +9,73 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import java.util.Base64; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; public class BinaryTypeHandlerTest { @Test - public void test_handle() { + public void testHandleByteArrayData() { final DataTypeHandler handler = new BinaryTypeHandler(); final MySQLDataType columnType = MySQLDataType.BINARY; final String columnName = "binaryColumn"; - final byte[] testData = "Test binary data".getBytes(); + final String testData = UUID.randomUUID().toString(); final TableMetadata metadata = new TableMetadata( - UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), - Collections.emptyMap(), Collections.emptyMap()); - Object result = handler.handle(columnType, columnName, testData, metadata); + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final Object result = handler.handle(columnType, columnName, testData.getBytes(), metadata); assertThat(result, is(instanceOf(String.class))); - assertThat(result, is(Base64.getEncoder().encodeToString(testData))); + assertThat(result, is(testData)); + } + + @Test + public void testHandleMapWithByteArrayData() { + final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataType columnType = MySQLDataType.BINARY; + final String columnName = "test_column"; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final String testData = UUID.randomUUID().toString(); + final Map value = new HashMap<>(); + value.put("bytes", testData.getBytes()); + + final Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(testData)); + } + + @Test + public void testHandleMapValueNotByteArray() { + final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataType columnType = MySQLDataType.BINARY; + final String columnName = "test_column"; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + final Map value = new HashMap<>(); + final String testData = UUID.randomUUID().toString(); + value.put("bytes", testData); + + final Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(testData)); + } + + @Test + public void testHandleNonByteArrayNonMapValue() { + final DataTypeHandler handler = new BinaryTypeHandler(); + final MySQLDataType columnType = MySQLDataType.BINARY; + final String columnName = "test_column"; + final Integer value = 42; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName)); + + final Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is("42")); } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java index d7166a192d..769915a1f3 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/JsonTypeHandlerTest.java @@ -12,11 +12,12 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; public class JsonTypeHandlerTest { @Test - public void test_handle() { + public void testHandleJsonBytes() { final DataTypeHandler handler = new JsonTypeHandler(); final MySQLDataType columnType = MySQLDataType.JSON; final String columnName = "jsonColumn"; @@ -30,4 +31,31 @@ public void test_handle() { assertThat(result, is(instanceOf(String.class))); assertThat(result, is(jsonValue)); } + + @Test + public void testHandleJsonString() { + final DataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final String jsonValue = "{\"key\":\"value\"}"; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, jsonValue, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(jsonValue)); + } + + @Test + public void testHandleInvalidJsonBytes() { + final DataTypeHandler handler = new JsonTypeHandler(); + final MySQLDataType columnType = MySQLDataType.JSON; + final String columnName = "jsonColumn"; + final byte[] testData = new byte[]{5}; + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + assertThrows(RuntimeException.class, () -> handler.handle(columnType, columnName, testData, metadata)); + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java index d1a8e4f763..faa809894e 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/NumericTypeHandlerTest.java @@ -10,8 +10,11 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Stream; @@ -82,6 +85,7 @@ private static Stream provideNumericTypeData() { Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", BigInteger.ZERO, new BigInteger("0")), Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new BigInteger("9223372036854775808"), new BigInteger("9223372036854775808")), Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new BigInteger("-1"), new BigInteger("18446744073709551615")), + Arguments.of(MySQLDataType.BIGINT_UNSIGNED, "bigint_unsigned_col", new ArrayList<>(List.of(0, -1, -1, -1, -1, -1, -1, -1, -1)), new BigInteger("18446744073709551615")), // DECIMAL/NUMERIC tests Arguments.of(MySQLDataType.DECIMAL, "decimal_col", new BigDecimal("123.45"), new BigDecimal("123.45")), @@ -99,7 +103,12 @@ private static Stream provideNumericTypeData() { Arguments.of(MySQLDataType.DOUBLE, "double_col", 123.45678901234, 123.45678901234), Arguments.of(MySQLDataType.DOUBLE, "double_col", -123.45678901234, -123.45678901234), Arguments.of(MySQLDataType.DOUBLE, "double_col", 0.0, 0.0), - Arguments.of(MySQLDataType.DOUBLE, "double_col", Double.MAX_VALUE, Double.MAX_VALUE) + Arguments.of(MySQLDataType.DOUBLE, "double_col", Double.MAX_VALUE, Double.MAX_VALUE), + + + // BIT tests + Arguments.of(MySQLDataType.BIT, "bit_col", BitSet.valueOf(new byte[]{ 4, 3, 2, 1 }), new BigInteger("16909060")), // BitSet interprets the bytes in little-endian order + Arguments.of(MySQLDataType.BIT, "bit_col", Map.of("bytes", new byte[]{ 1, 2, 3, 4 }), new BigInteger("16909060")) // Direct BigInteger interprets the bytes in big-endian order. ); } @@ -128,4 +137,31 @@ public void test_handleInvalidValue() { numericTypeHandler.handle(MySQLDataType.INT, "int_col", "not_a_number", metadata); }); } + + @Test + public void test_handleInvalidUnsignedBigInt() { + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + List.of("int_col"), List.of("int_col"), + Collections.emptyMap(), Collections.emptyMap()); + final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + + assertThrows(IllegalArgumentException.class, () -> { + numericTypeHandler.handle(MySQLDataType.BIGINT, "bigint_col", "not_a_number", metadata); + }); + } + + + @Test + public void test_handleInvalidBit() { + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + List.of("int_col"), List.of("int_col"), + Collections.emptyMap(), Collections.emptyMap()); + final DataTypeHandler numericTypeHandler = new NumericTypeHandler(); + + assertThrows(IllegalArgumentException.class, () -> { + numericTypeHandler.handle(MySQLDataType.BIT, "bit_col", "not_a_number", metadata); + }); + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java index ac0449b696..5dd2beb33f 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/datatype/impl/SpatialTypeHandlerTest.java @@ -6,7 +6,9 @@ import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.hamcrest.CoreMatchers.instanceOf; @@ -29,4 +31,39 @@ public void test_handle() { assertThat(result, is(instanceOf(String.class))); assertThat(result, is(value)); } + + + @Test + public void testHandleMapWithByteArrayData() { + final DataTypeHandler handler = new SpatialTypeHandler(); + final MySQLDataType columnType = MySQLDataType.GEOMETRY; + final String columnName = "geometryColumn"; + final String testData = UUID.randomUUID().toString(); + final Map value = new HashMap<>(); + value.put("bytes", testData.getBytes()); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(testData)); + } + + @Test + public void testHandleMapWithByteStringData() { + final DataTypeHandler handler = new SpatialTypeHandler(); + final MySQLDataType columnType = MySQLDataType.GEOMETRY; + final String columnName = "geometryColumn"; + final String testData = UUID.randomUUID().toString(); + final Map value = new HashMap<>(); + value.put("bytes", testData); + final TableMetadata metadata = new TableMetadata( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), List.of(columnName), List.of(columnName), + Collections.emptyMap(), Collections.emptyMap()); + Object result = handler.handle(columnType, columnName, value, metadata); + + assertThat(result, is(instanceOf(String.class))); + assertThat(result, is(testData)); + } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java index 656e393cee..6eeedfcd0f 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import java.io.InputStream; import java.time.Duration; @@ -82,6 +83,9 @@ class DataFileLoaderTest { @Mock private AcknowledgementSet acknowledgementSet; + @Mock + private DbTableMetadata dbTableMetadata; + @Mock private Duration acknowledgmentTimeout; @@ -211,6 +215,6 @@ void test_flush_failure_then_error_metric_updated() throws Exception { private DataFileLoader createObjectUnderTest() { final InputCodec codec = new ParquetInputCodec(eventFactory); return DataFileLoader.create(dataFilePartition, codec, buffer, s3ObjectReader, recordConverter, - pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout); + pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout, dbTableMetadata); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java index 96a45588c8..34ec5f7d60 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -24,10 +24,13 @@ import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus; import software.amazon.awssdk.services.s3.S3Client; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -130,6 +133,7 @@ void test_given_available_datafile_partition_then_load_datafile() throws Interru DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); + mockDbTableMetadata(); executorService.submit(() -> { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { @@ -137,7 +141,7 @@ void test_given_available_datafile_partition_then_load_datafile() throws Interru dataFileLoaderMockedStatic.when(() -> DataFileLoader.create(eq(dataFilePartition), any(InputCodec.class), any(Buffer.class), any(S3ObjectReader.class), any(ExportRecordConverter.class), any(PluginMetrics.class), - any(EnhancedSourceCoordinator.class), any(), any(Duration.class))) + any(EnhancedSourceCoordinator.class), any(), any(Duration.class), any(DbTableMetadata.class))) .thenReturn(dataFileLoader); doNothing().when(dataFileLoader).run(); objectUnderTest.run(); @@ -159,6 +163,7 @@ void test_data_file_loader_throws_exception_then_give_up_partition() { DataFileScheduler objectUnderTest = createObjectUnderTest(); final ExecutorService executorService = Executors.newSingleThreadExecutor(); + mockDbTableMetadata(); executorService.submit(() -> { // MockedStatic needs to be created on the same thread it's used try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { @@ -166,7 +171,7 @@ void test_data_file_loader_throws_exception_then_give_up_partition() { dataFileLoaderMockedStatic.when(() -> DataFileLoader.create(eq(dataFilePartition), any(InputCodec.class), any(Buffer.class), any(S3ObjectReader.class), any(ExportRecordConverter.class), any(PluginMetrics.class), - any(EnhancedSourceCoordinator.class), any(), any(Duration.class))) + any(EnhancedSourceCoordinator.class), any(), any(Duration.class), any(DbTableMetadata.class))) .thenReturn(dataFileLoader); doThrow(new RuntimeException()).when(dataFileLoader).run(); objectUnderTest.run(); @@ -196,4 +201,24 @@ void test_shutdown() { private DataFileScheduler createObjectUnderTest() { return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Prefix, s3Client, eventFactory, buffer, pluginMetrics, acknowledgementSetManager); } + + private void mockDbTableMetadata() { + final Map> tableColumnDataTypeMap = new HashMap<>(); + final Map columnDataTypeMap = new HashMap<>(); + columnDataTypeMap.put("int_column", "INTEGER"); + final String tableName = UUID.randomUUID().toString(); + tableColumnDataTypeMap.put(tableName, columnDataTypeMap); + + final String dbIdentifier = UUID.randomUUID().toString(); + final String hostName = UUID.randomUUID().toString(); + final int port = new Random().nextInt(); + + final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final Map map = new HashMap<>(); + map.put("dbMetadata", dbMetadata.toMap()); + map.put("tableColumnDataTypeMap", tableColumnDataTypeMap); + final GlobalState globalStatePartition = mock(GlobalState.class); + when(sourceCoordinator.getPartition(sourceConfig.getDbIdentifier())).thenReturn(Optional.of(globalStatePartition)); + when(globalStatePartition.getProgressState()).thenReturn(Optional.of(map)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java index 11381c8a23..dbd21cbe4a 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderSchedulerTest.java @@ -19,7 +19,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; -import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager; import java.time.Duration; @@ -52,7 +52,7 @@ class LeaderSchedulerTest { private SchemaManager schemaManager; @Mock - private DbMetadata dbMetadata; + private DbTableMetadata dbTableMetadata; @Mock private LeaderPartition leaderPartition; @@ -140,6 +140,6 @@ void test_shutDown() { } private LeaderScheduler createObjectUnderTest() { - return new LeaderScheduler(sourceCoordinator, sourceConfig, s3Prefix, schemaManager, dbMetadata); + return new LeaderScheduler(sourceCoordinator, sourceConfig, s3Prefix, schemaManager, dbTableMetadata); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java index 1312607821..d41bb55808 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; import java.io.IOException; import java.util.UUID; @@ -73,6 +74,9 @@ class BinlogEventListenerTest { @Mock private ThreadFactory threadFactory; + @Mock + private DbTableMetadata dbTableMetadata; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private com.github.shyiko.mysql.binlog.event.Event binlogEvent; @@ -153,7 +157,8 @@ void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) } private BinlogEventListener createObjectUnderTest() { - return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager); + return new BinlogEventListener(buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, + streamCheckpointer, acknowledgementSetManager, dbTableMetadata); } private void verifyHandlerCallHelper() { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java index 13078e65cb..e9dc012b82 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java @@ -22,8 +22,15 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -91,6 +98,12 @@ class StreamWorkerTaskRefresherTest { @Mock private BinlogEventListener binlogEventListener; + @Mock + private DbTableMetadata dbTableMetadata; + + @Mock + private GlobalState globalState; + private StreamWorkerTaskRefresher streamWorkerTaskRefresher; @BeforeEach @@ -104,11 +117,16 @@ void setUp() { @Test void test_initialize_then_process_stream() { when(binlogClientFactory.create()).thenReturn(binlogClient); + final Map progressState = mockGlobalStateAndProgressState(); try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class); - MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class)) { + MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(BinaryLogClient.class), eq(pluginMetrics))) .thenReturn(streamWorker); - binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), eq(acknowledgementSetManager))) + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), + any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), + eq(acknowledgementSetManager), eq(dbTableMetadata))) .thenReturn(binlogEventListener); streamWorkerTaskRefresher.initialize(sourceConfig); } @@ -137,12 +155,16 @@ void test_update_when_credentials_changed_then_refresh_task() { when(sourceConfig2.getAuthenticationConfig().getPassword()).thenReturn(password2); when(binlogClientFactory.create()).thenReturn(binlogClient).thenReturn(binlogClient); - + final Map progressState = mockGlobalStateAndProgressState(); try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class); - MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class)) { + MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(BinaryLogClient.class), eq(pluginMetrics))) .thenReturn(streamWorker); - binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), eq(acknowledgementSetManager))) + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), + any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), + eq(acknowledgementSetManager), eq(dbTableMetadata))) .thenReturn(binlogEventListener); streamWorkerTaskRefresher.initialize(sourceConfig); streamWorkerTaskRefresher.update(sourceConfig2); @@ -170,12 +192,16 @@ void test_update_when_credentials_unchanged_then_do_nothing() { when(sourceConfig.getAuthenticationConfig().getPassword()).thenReturn(password); when(binlogClientFactory.create()).thenReturn(binlogClient); - + final Map progressState = mockGlobalStateAndProgressState(); try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class); - MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class)) { + MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(BinaryLogClient.class), eq(pluginMetrics))) .thenReturn(streamWorker); - binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), eq(acknowledgementSetManager))) + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(buffer), any(RdsSourceConfig.class), + any(String.class), eq(pluginMetrics), eq(binlogClient), eq(streamCheckpointer), + eq(acknowledgementSetManager), eq(dbTableMetadata))) .thenReturn(binlogEventListener); streamWorkerTaskRefresher.initialize(sourceConfig); streamWorkerTaskRefresher.update(sourceConfig); @@ -197,4 +223,33 @@ private StreamWorkerTaskRefresher createObjectUnderTest() { return new StreamWorkerTaskRefresher( sourceCoordinator, streamPartition, streamCheckpointer, s3Prefix, binlogClientFactory, buffer, executorServiceSupplier, acknowledgementSetManager, pluginMetrics); } + + private Map mockGlobalStateAndProgressState() { + final String dbIdentifier = UUID.randomUUID().toString(); + when(streamPartition.getPartitionKey()).thenReturn(dbIdentifier); + when(sourceCoordinator.getPartition(dbIdentifier)).thenReturn(Optional.of(globalState)); + final Map progressState = getDbTableMetaDataMap(); + when(globalState.getProgressState()).thenReturn(Optional.of(progressState)); + return progressState; + } + + private Map getDbTableMetaDataMap() { + final String dbIdentifier = UUID.randomUUID().toString(); + final String hostName = UUID.randomUUID().toString(); + final int port = new Random().nextInt(); + final String tableName = UUID.randomUUID().toString(); + + final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port); + final Map> tableColumnDataTypeMap = new HashMap<>(); + final Map columnDataTypeMap = new HashMap<>(); + columnDataTypeMap.put("int_column", "INTEGER"); + tableColumnDataTypeMap.put(tableName, columnDataTypeMap); + + + final Map map = new HashMap<>(); + map.put("dbMetadata", dbMetadata.toMap()); + map.put("tableColumnDataTypeMap", tableColumnDataTypeMap); + + return map; + } }