Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLink]Fixed handle of null values and add some usage document #471

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,5 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From mongo Database " + dbName);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,28 @@ public class SyncDatabase {
static String password;
static boolean useBatch;
static int sinkParallelism;
static String jdbcOrDorisOptions;

public static void main(String[] args) throws Exception {
StringBuilder connectorOptions = new StringBuilder();
if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")) {
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("--jdbc") || args[i].startsWith("--doris")) {
connectorOptions.append("'")
.append(args[i].substring(7))
.append("'")
.append("=")
.append("'")
.append(args[i + 1])
.append("'")
.append(",");
}
}
if (connectorOptions.length() > 0) {
jdbcOrDorisOptions = connectorOptions.substring(0, connectorOptions.length() - 1);
}
}
ParameterTool parameter = ParameterTool.fromArgs(args);

sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
dbType = parameter.get(TARGET_DATABASE_TYPE.key());
Expand All @@ -61,12 +79,10 @@ public static void main(String[] args) throws Exception {
}
sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());

String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
//StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(sinkParallelism);

Expand Down Expand Up @@ -118,7 +134,7 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
String mysqlType = "TEXT";
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
mysqlType = "VARCHAR(255)";
}
}
stringFieldTypes[i] = mysqlType;
Expand Down Expand Up @@ -147,7 +163,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
String mysqlType = "TEXT";
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
mysqlType = "VARCHAR(255)";
}
}
stringFieldTypes[i] = mysqlType;
Expand Down Expand Up @@ -175,7 +191,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
stringFieldTypes[i] = "DATETIME";
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
Expand Down Expand Up @@ -212,8 +228,6 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
tEnvs.useCatalog("lakeSoul");
tEnvs.useDatabase(sourceDatabase);
String jdbcUrl = url + targetDatabase;
Connection conn = DriverManager.getConnection(jdbcUrl, username, password);

Expand All @@ -229,15 +243,43 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException
Statement statement = conn.createStatement();
// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());
String createCatalog = "create catalog postgres_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL

tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase + "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldTypes.length; i++) {
if (stringFieldsTypes[i].equals("BYTEA")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else if (stringFieldsTypes[i].equals("TEXT")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");
} else if (stringFieldsTypes[i].equals("FLOAT8")) {
coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE");
} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
}
if (i < fieldTypes.length - 1) {
coulmns.append(",");
}
}
String sql;
if (jdbcOrDorisOptions == null) {
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
}
} else {
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
}
}
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
statement.close();
conn.close();
}
Expand Down Expand Up @@ -271,6 +313,9 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
for (int i = 0; i < fieldDataTypes.length; i++) {
if (stringFieldsTypes[i].equals("BLOB")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else if (stringFieldsTypes[i].equals("TEXT")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");

} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
}
Expand All @@ -279,14 +324,26 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
}
}
String sql;
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
if (jdbcOrDorisOptions == null) {
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
}
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s' , 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism, jdbcOrDorisOptions);
}
}

tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);

Expand Down Expand Up @@ -319,9 +376,17 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
coulmns.append(",");
}
}
String sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
String sql;
if (jdbcOrDorisOptions == null) {
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
} else {
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s', %s)",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password, jdbcOrDorisOptions);
}

tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
throw new IOException(equalOrCanCast);
}
for (LakeSoulMultiTableSinkCommittable committable : lakeSoulMultiTableSinkCommittable) {
if (committable.getTsMs() > schemaLastChangeTime) {
if (committable.getTsMs() > schemaLastChangeTime && !dbType.equals("mongodb")) {
LOG.error("incompatible cast data created and delayThreshold time: {}, dml create time: {}", schemaLastChangeTime, committable.getTsMs());
throw new IOException(equalOrCanCast);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class LakeSoulRecordConvert implements Serializable {

List<String> partitionFields;

static HashMap<String, Object> hashMap = new HashMap<>();


public LakeSoulRecordConvert(Configuration conf, String serverTimeZone) {
this(conf, serverTimeZone, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,55 @@

import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.List;

import static org.apache.flink.lakesoul.types.LakeSoulRecordConvert.hashMap;
public class ParseDocument {


public static Struct convertBSONToStruct(String value) {
HashMap<String, Object> tempHashMap = new HashMap<>(hashMap);
Document bsonDocument = Document.parse(value);
SchemaBuilder structSchemaBuilder = SchemaBuilder.struct();
Struct struct = new Struct(buildSchema(bsonDocument, structSchemaBuilder));
bsonDocument.keySet().forEach(tempHashMap::remove);
buildSchema(bsonDocument, structSchemaBuilder);
tempHashMap.forEach((k, v) -> {
if (v instanceof Schema) {
structSchemaBuilder.field(k, (Schema) v);
}
});
Struct struct = new Struct(structSchemaBuilder.build());
fillStructValues(bsonDocument, struct);

return struct;
}

private static Schema buildSchema(Document bsonDocument, SchemaBuilder structSchemaBuilder) {
for (Map.Entry<String, Object> entry : bsonDocument.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
structSchemaBuilder.field(fieldName, buildSchema((Document) value, nestedStructSchemaBuilder));
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
Schema arraySchema = getSchemaForArrayList(arrayList);
structSchemaBuilder.field(fieldName, arraySchema);
if (value == null ){
hashMap.putIfAbsent(fieldName, null);
if (hashMap.get(fieldName) != null){
Schema schemaFromMap = (Schema) hashMap.get(fieldName);
structSchemaBuilder.field(fieldName, schemaFromMap);
}
} else {
structSchemaBuilder.field(fieldName, getSchemaForValue(value));
if (value instanceof Document) {
SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
Schema schema = buildSchema((Document) value, nestedStructSchemaBuilder);
structSchemaBuilder.field(fieldName, schema);
hashMap.put(fieldName, schema);
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
Schema arraySchema = getSchemaForArrayList(arrayList);
structSchemaBuilder.field(fieldName, arraySchema);
hashMap.put(fieldName, arraySchema);
} else {
structSchemaBuilder.field(fieldName, getSchemaForValue(value));
hashMap.put(fieldName,getSchemaForValue(value));
}
}
}
return structSchemaBuilder.build();
Expand All @@ -56,24 +79,26 @@ private static void fillStructValues(Document bsonDocument, Struct struct) {
for (Map.Entry<String, Object> entry : bsonDocument.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
fillStructValues((Document) value, nestedStruct);
struct.put(fieldName, nestedStruct);
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
struct.put(fieldName, arrayList);
} else if (value instanceof Decimal128) {
BigDecimal decimalValue = new BigDecimal(value.toString());
struct.put(fieldName, decimalValue);
} else if (value instanceof Binary) {
Binary binaryData = (Binary) value;
struct.put(fieldName,binaryData.getData());
} else if (value instanceof BsonTimestamp) {
BsonTimestamp bsonTimestamp = (BsonTimestamp) value;
struct.put(fieldName,bsonTimestamp.getValue());
} else {
struct.put(fieldName,value);
if (value != null){
if (value instanceof Document) {
Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
fillStructValues((Document) value, nestedStruct);
struct.put(fieldName, nestedStruct);
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
struct.put(fieldName, arrayList);
} else if (value instanceof Decimal128) {
BigDecimal decimalValue = new BigDecimal(value.toString());
struct.put(fieldName, decimalValue);
} else if (value instanceof Binary) {
Binary binaryData = (Binary) value;
struct.put(fieldName,binaryData.getData());
} else if (value instanceof BsonTimestamp) {
BsonTimestamp bsonTimestamp = (BsonTimestamp) value;
struct.put(fieldName,bsonTimestamp.getValue());
} else {
struct.put(fieldName,value);
}
}
}
}
Expand Down
Loading
Loading