Skip to content

Commit

Permalink
Support Data type transformation for MySQL export and stream processing
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Nov 8, 2024
1 parent 8fd743d commit d3a1dd2
Show file tree
Hide file tree
Showing 23 changed files with 511 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.rds.leader.RdsApiStrategy;
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
Expand All @@ -35,8 +36,10 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class RdsService {
private static final Logger LOG = LoggerFactory.getLogger(RdsService.class);
Expand Down Expand Up @@ -95,9 +98,12 @@ public void start(Buffer<Record<Event>> buffer) {
new ClusterApiStrategy(rdsClient) : new InstanceApiStrategy(rdsClient);
final DbMetadata dbMetadata = rdsApiStrategy.describeDb(sourceConfig.getDbIdentifier());
final String s3PathPrefix = getS3PathPrefix();
final SchemaManager schemaManager = getSchemaManager(sourceConfig, dbMetadata);
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap(schemaManager);
final DbTableMetadata dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap);

leaderScheduler = new LeaderScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, getSchemaManager(sourceConfig, dbMetadata), dbMetadata);
sourceCoordinator, sourceConfig, s3PathPrefix, schemaManager, dbTableMetadata);
runnableList.add(leaderScheduler);

if (sourceConfig.isExportEnabled()) {
Expand Down Expand Up @@ -178,5 +184,13 @@ private String getS3PathPrefix() {
return s3PathPrefix;
}

private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) {
return sourceConfig.getTableNames().stream()
.collect(Collectors.toMap(
fullTableName -> fullTableName,
fullTableName -> schemaManager.getColumnDataTypes(fullTableName.split("\\.")[0], fullTableName.split("\\.")[1])
));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* to appropriate string representations based on their data types.
*/
public interface DataTypeHandler {
String BYTES_KEY = "bytes";
/**
* Handles the conversion of a MySQL column value to its string representation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,18 @@ public boolean isNumeric() {
return category == DataCategory.NUMERIC;
}

public boolean isUnsigned() {
public boolean isNumericUnsigned() {
return category == DataCategory.NUMERIC && subCategory == DataSubCategory.UNSIGNED;
}

public boolean isBigIntUnsigned() {
return this == MySQLDataType.BIGINT_UNSIGNED;
}

public boolean isBit() {
return this == MySQLDataType.BIT;
}

public boolean isString() {
return category == DataCategory.STRING;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,25 @@
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.Base64;
import java.util.Map;

public class BinaryTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
return Base64.getEncoder().encodeToString((byte[]) value);
if (value instanceof byte[]) {
return new String((byte[]) value);
}

if (value instanceof Map) {
Object data = ((Map<?, ?>)value).get(BYTES_KEY);
if (data instanceof byte[]) {
return new String((byte[]) data);
} else {
return data.toString();
}
}

return value.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.io.IOException;

public class JsonTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
return convertToJson((byte[]) value);
return convertToJson(value);
}

private String convertToJson(final byte[] jsonBytes) {
private String convertToJson(final Object value) {
try {
return JsonBinary.parseAsString(jsonBytes);
} catch (IOException e) {
if (value instanceof byte[]) {
return JsonBinary.parseAsString((byte[])value);
} else {
return value.toString();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Map;

public class NumericTypeHandler implements DataTypeHandler {

Expand All @@ -15,40 +18,93 @@ public Number handle(final MySQLDataType columnType, final String columnName, fi
return null;
}

if (!columnType.isNumeric()) {
if (!(columnType.isNumeric() || columnType.isBit())) {
throw new IllegalArgumentException("ColumnType is not numeric: " + columnType);
}

if (!(value instanceof Number)) {
throw new IllegalArgumentException("Value is not a number: " + value);
return handleNumericType(columnType, value);
}

private Number handleNumericType(final MySQLDataType columnType, final Object value) {
if (columnType.isNumericUnsigned()) {
if (columnType.isBigIntUnsigned()) {
return handleUnsignedBigInt(value);
} else {
return handleUnsignedNumber(value, columnType.getUnsignedMask());
}
}

if (columnType.isBit()) {
return handleBit(value);
}

if (value instanceof Number) {
return (Number)value;
}

return handleNumericType(columnType, (Number) value);
throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass());
}

private Number handleNumericType(final MySQLDataType columnType, final Number value) {
if (columnType.isUnsigned()) {
if (columnType == MySQLDataType.BIGINT_UNSIGNED) {
return handleUnsignedDouble(value);
private Number handleBit(final Object value) {
if (value instanceof BitSet) {
return bitSetToBigInteger((BitSet) value);
}

if (value instanceof Map) {
Object data = ((Map<?, ?>)value).get(BYTES_KEY);
if (data instanceof byte[]) {
return new BigInteger(1, (byte[]) data);
} else {
return handleUnsignedNumber(value, columnType.getUnsignedMask());
byte[] bytes = ((String)data).getBytes();
return new BigInteger(1, bytes);
}
}
return value;

throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass());
}

private Number handleUnsignedNumber(final Number value, final long mask) {
final long longVal = value.longValue();
private Number handleUnsignedNumber(final Object value, final long mask) {
if (!(value instanceof Number)) {
throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass());
}

final long longVal = ((Number)value).longValue();
return longVal < 0 ? longVal & mask : longVal;
}

private Number handleUnsignedDouble(final Number value) {
long longVal = value.longValue();
if (longVal < 0) {
return BigInteger.valueOf(longVal & Long.MAX_VALUE)
.add(BigInteger.valueOf(Long.MAX_VALUE))
.add(BigInteger.ONE);
private Number handleUnsignedBigInt(final Object value) {
if (value instanceof Number) {
long longVal = ((Number)value).longValue();
if (longVal < 0) {
return BigInteger.valueOf(longVal & Long.MAX_VALUE)
.add(BigInteger.valueOf(Long.MAX_VALUE))
.add(BigInteger.ONE);
}
return (Number)value;
}

if (value instanceof ArrayList<?>) {
ArrayList<?> list = (ArrayList<?>) value;

// Convert ArrayList to byte array
byte[] bytes = new byte[list.size()];
for (int i = 0; i < list.size(); i++) {
bytes[i] = ((Number) list.get(i)).byteValue();
}

return new BigInteger(1, bytes);
}

throw new IllegalArgumentException("Unsupported value type. The value is of type: " + value.getClass().getName());
}

private static BigInteger bitSetToBigInteger(BitSet bitSet) {
BigInteger result = BigInteger.ZERO;
for (int i = 0; i < bitSet.length(); i++) {
if (bitSet.get(i)) {
result = result.setBit(i);
}
}
return value;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.Map;

public class SpatialTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
// TODO: Implement the transformation
if (value instanceof Map) {
Object data = ((Map<?, ?>)value).get(BYTES_KEY);
if (data instanceof byte[]) {
return new String((byte[]) data);
} else {
return data.toString();
}
}
return new String((byte[]) value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class StringTypeHandler implements DataTypeHandler {
@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
if (columnType.isStringBytes()) {
if (columnType.isStringBytes() && (value instanceof byte[])) {
return new String((byte[]) value);
} else if (columnType.isStringEnum() && value instanceof Integer) {
return getEnumValue((int) value, metadata.getEnumStrValues().get(columnName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;
Expand Down Expand Up @@ -52,6 +56,7 @@ public class DataFileLoader implements Runnable {
private final EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSet acknowledgementSet;
private final Duration acknowledgmentTimeout;
private final DbTableMetadata dbTableMetadata;
private final Counter exportRecordsTotalCounter;
private final Counter exportRecordSuccessCounter;
private final Counter exportRecordErrorCounter;
Expand All @@ -66,7 +71,8 @@ private DataFileLoader(final DataFilePartition dataFilePartition,
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
final Duration acknowledgmentTimeout,
final DbTableMetadata dbTableMetadata) {
this.dataFilePartition = dataFilePartition;
bucket = dataFilePartition.getBucket();
objectKey = dataFilePartition.getKey();
Expand All @@ -77,6 +83,7 @@ private DataFileLoader(final DataFilePartition dataFilePartition,
this.sourceCoordinator = sourceCoordinator;
this.acknowledgementSet = acknowledgementSet;
this.acknowledgmentTimeout = acknowledgmentTimeout;
this.dbTableMetadata = dbTableMetadata;

exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT);
Expand All @@ -93,9 +100,10 @@ public static DataFileLoader create(final DataFilePartition dataFilePartition,
final PluginMetrics pluginMetrics,
final EnhancedSourceCoordinator sourceCoordinator,
final AcknowledgementSet acknowledgementSet,
final Duration acknowledgmentTimeout) {
final Duration acknowledgmentTimeout,
final DbTableMetadata dbTableMetadata) {
return new DataFileLoader(dataFilePartition, codec, buffer, objectReader, recordConverter,
pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout);
pluginMetrics, sourceCoordinator, acknowledgementSet, acknowledgmentTimeout, dbTableMetadata);
}

@Override
Expand All @@ -118,6 +126,7 @@ public void run() {

final String fullTableName = progressState.getSourceDatabase() + "." + progressState.getSourceTable();
final List<String> primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of());
transformEvent(event, fullTableName);

final long snapshotTime = progressState.getSnapshotTime();
final long eventVersionNumber = snapshotTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis();
Expand Down Expand Up @@ -162,4 +171,14 @@ public void run() {
exportRecordErrorCounter.increment(eventCount.get());
}
}

private void transformEvent(final Event event, final String fullTableName) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
for (Map.Entry<String, Object> entry : event.toMap().entrySet()) {
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(),
entry.getValue(), null);
event.put(entry.getKey(), data);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -156,7 +157,8 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {

Runnable loader = DataFileLoader.create(
dataFilePartition, codec, buffer, objectReader, recordConverter, pluginMetrics,
sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout());
sourceCoordinator, acknowledgementSet, sourceConfig.getDataFileAcknowledgmentTimeout(),
getDBTableMetadata());
CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor);

if (isAcknowledgmentsEnabled) {
Expand Down Expand Up @@ -220,4 +222,10 @@ private BiConsumer<Void, Throwable> completeDataLoader(DataFilePartition dataFil
numOfWorkers.decrementAndGet();
};
}

private DbTableMetadata getDBTableMetadata() {
final Optional<EnhancedSourcePartition> globalStatePartition = sourceCoordinator.getPartition(sourceConfig.getDbIdentifier());
final GlobalState globalState = (GlobalState) globalStatePartition.get();
return DbTableMetadata.fromMap(globalState.getProgressState().get());
}
}
Loading

0 comments on commit d3a1dd2

Please sign in to comment.