Skip to content

Commit

Permalink
[Fix] fix multi database sync npe error (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Jan 3, 2025
1 parent 1ced03e commit 1023235
Show file tree
Hide file tree
Showing 23 changed files with 325 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public boolean build() throws Exception {
streamSource.process(buildProcessFunction());
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, dbTbl.f1);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
Expand Down Expand Up @@ -230,7 +230,7 @@ public DorisSink<String> buildDorisSink() {
}

public ParsingProcessFunction buildProcessFunction() {
return new ParsingProcessFunction(converter);
return new ParsingProcessFunction(database, converter);
}

/** create doris sink. */
Expand Down Expand Up @@ -479,7 +479,7 @@ private void tryCreateTableIfAbsent(
}
TableSchema dorisSchema =
DorisSchemaFactory.createTableSchema(
database,
targetDb,
dorisTable,
schema.getFields(),
schema.getPrimaryKeys(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

Expand All @@ -32,8 +34,10 @@ public class ParsingProcessFunction extends ProcessFunction<String, Void> {
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;
private String database;

public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
public ParsingProcessFunction(String database, DatabaseSync.TableNameConverter converter) {
this.database = database;
this.converter = converter;
}

Expand All @@ -47,8 +51,17 @@ public void processElement(
String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector)
throws Exception {
String tableName = getRecordTableName(record);
String dorisName = converter.convert(tableName);
context.output(getRecordOutputTag(dorisName), record);
String dorisTableName = converter.convert(tableName);
String dorisDbName = database;
if (StringUtils.isNullOrWhitespaceOnly(database)) {
dorisDbName = getRecordDatabaseName(record);
}
context.output(getRecordOutputTag(dorisDbName, dorisTableName), record);
}

private String getRecordDatabaseName(String record) throws JsonProcessingException {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
return extractJsonNode(recordRoot.get("source"), "db");
}

protected String getRecordTableName(String record) throws Exception {
Expand All @@ -60,12 +73,13 @@ protected String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}

private OutputTag<String> getRecordOutputTag(String tableName) {
private OutputTag<String> getRecordOutputTag(String databaseName, String tableName) {
String tableIdentifier = databaseName + "." + tableName;
return recordOutputTags.computeIfAbsent(
tableName, ParsingProcessFunction::createRecordOutputTag);
tableIdentifier, k -> createRecordOutputTag(databaseName, tableName));
}

public static OutputTag<String> createRecordOutputTag(String tableName) {
return new OutputTag<String>("record-" + tableName) {};
public static OutputTag<String> createRecordOutputTag(String databaseName, String tableName) {
return new OutputTag<String>(String.format("record-%s-%s", databaseName, tableName)) {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {

@Override
public ParsingProcessFunction buildProcessFunction() {
return new MongoParsingProcessFunction(converter);
return new MongoParsingProcessFunction(database, converter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
public class MongoParsingProcessFunction extends ParsingProcessFunction {
private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class);

public MongoParsingProcessFunction(TableNameConverter converter) {
super(converter);
public MongoParsingProcessFunction(String databaseName, TableNameConverter converter) {
super(databaseName, converter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ public static void checkResult(
List<String> expected,
String query,
int columnSize) {
checkResult(connection, logger, expected, query, columnSize, true);
}

public static void checkResult(
Connection connection,
Logger logger,
List<String> expected,
String query,
int columnSize,
boolean ordered) {
List<String> actual = new ArrayList<>();
try (Statement statement = connection.createStatement()) {
ResultSet sinkResultSet = statement.executeQuery(query);
Expand Down Expand Up @@ -131,6 +141,13 @@ public static void checkResult(
"checking test result. expected={}, actual={}",
String.join(",", expected),
String.join(",", actual));
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
if (ordered) {
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
} else {
Assert.assertEquals(expected.size(), actual.size());
Assert.assertArrayEquals(
expected.stream().sorted().toArray(Object[]::new),
actual.stream().sorted().toArray(Object[]::new));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
private static final Logger LOG = LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
private static final String DATABASE = "test_e2e_mysql";
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
private static final String DROP_DATABASE = "DROP DATABASE IF EXISTS " + DATABASE;
private static final String MYSQL_CONF = "--" + DatabaseSyncConfig.MYSQL_CONF;

@Before
Expand All @@ -56,13 +57,8 @@ private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
argList.add(MYSQL_CONF);
argList.add(PASSWORD + "=" + getMySQLPassword());
argList.add(MYSQL_CONF);
argList.add(DATABASE_NAME + "=" + DATABASE);
// argList.add(MYSQL_CONF);
// argList.add("server-time-zone=UTC");
argList.add("server-time-zone=UTC");

// set doris database
argList.add(DORIS_DATABASE);
argList.add(DATABASE);
setSinkConfDefaultConfig(argList);
return argList;
}
Expand All @@ -82,15 +78,7 @@ private void initMysqlEnvironment(String sourcePath) {

private void initDorisEnvironment() {
LOG.info("Initializing Doris environment.");
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, CREATE_DATABASE);
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
"DROP TABLE IF EXISTS test_e2e_mysql.tbl1",
"DROP TABLE IF EXISTS test_e2e_mysql.tbl2",
"DROP TABLE IF EXISTS test_e2e_mysql.tbl3",
"DROP TABLE IF EXISTS test_e2e_mysql.tbl4",
"DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, DROP_DATABASE);
}

private void initEnvironment(String jobName, String mysqlSourcePath) {
Expand Down Expand Up @@ -436,6 +424,132 @@ private String getCreateTableSQL(String database, String table) throws Exception
throw new RuntimeException("Table not exist " + table);
}

@Test
public void testMySQL2DorisMultiDatabaseSync() throws Exception {
String jobName = "testMySQL2DorisMultiDatabaseSync";
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db1");
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db2");
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt");

// wait 3 times checkpoint
Thread.sleep(30000);
LOG.info("Start to verify init result.");
List<String> initExpected1 = Arrays.asList("1,db1_tb1,18");
String sql1 = "SELECT * FROM test_e2e_mysql_db1.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected1, sql1, 3, false);

List<String> initExpected2 = Arrays.asList("1,db1_tb2,19");
String sql2 = "SELECT * FROM test_e2e_mysql_db1.tbl2";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false);

List<String> initExpected3 = Arrays.asList("1,db2_tb1,20");
String sql3 = "SELECT * FROM test_e2e_mysql_db2.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false);

List<String> initExpected4 = Arrays.asList("1,db2_tb2,21");
String sql4 = "SELECT * FROM test_e2e_mysql_db2.tbl2";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected4, sql4, 3, false);

List<String> initExpected5 = Arrays.asList("1,db2_tb3,22");
String sql5 = "SELECT * FROM test_e2e_mysql_db2.tbl3";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected5, sql5, 3, false);

// add incremental data
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
"insert into test_e2e_mysql_db1.tbl1 values (2,'db1_tb1',180)",
"insert into test_e2e_mysql_db1.tbl2 values (2,'db1_tb2',190)",
"insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',200)",
"insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)",
"insert into test_e2e_mysql_db2.tbl3 values (2,'db2_tb3',220)");

Thread.sleep(20000);
List<String> incrExpected1 = Arrays.asList("1,db1_tb1,18", "2,db1_tb1,180");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected1, sql1, 3, false);

List<String> incrExpected2 = Arrays.asList("1,db1_tb2,19", "2,db1_tb2,190");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false);

List<String> incrExpected3 = Arrays.asList("1,db2_tb1,20", "2,db2_tb1,200");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false);

List<String> incrExpected4 = Arrays.asList("1,db2_tb2,21", "2,db2_tb2,210");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected4, sql4, 3, false);

List<String> incrExpected5 = Arrays.asList("1,db2_tb3,22", "2,db2_tb3,220");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected5, sql5, 3, false);

cancelE2EJob(jobName);
}

/**
* Separate databases and tables to write to the same database and table
*
* @throws Exception
*/
@Test
public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
String jobName = "testMySQL2DorisMultiDatabase2OneSync";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt");

// wait 3 times checkpoint
Thread.sleep(30000);
LOG.info("Start to verify init result.");
List<String> initExpected = Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20");
String sql1 = "SELECT * FROM test_e2e_mysql.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql1, 3, false);

List<String> initExpected2 =
Arrays.asList(
"1,db1_tb2_1,19", "2,db1_tb2_2,191", "3,db2_tb2_2,21", "4,db2_tbl2_2,211");
String sql2 = "SELECT * FROM test_e2e_mysql.tbl2_merge";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false);

List<String> initExpected3 = Arrays.asList("1,db2_tb3,22");
String sql3 = "SELECT * FROM test_e2e_mysql.tbl3";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false);

// add incremental data
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
"insert into test_e2e_mysql_db1.tbl1 values (3,'db1_tb1',180)",
"insert into test_e2e_mysql_db2.tbl1 values (4,'db2_tb1',200)",
"insert into test_e2e_mysql_db1.tbl2_1 values (5,'db1_tb2_1',1901)",
"insert into test_e2e_mysql_db1.tbl2_2 values (6,'db1_tb2_2',1902)",
"insert into test_e2e_mysql_db2.tbl2_1 values (7,'db2_tb2_1',2101)",
"insert into test_e2e_mysql_db2.tbl2_2 values (8,'db2_tb2_2',2102)",
"insert into test_e2e_mysql_db2.tbl3 values (3,'db2_tb3',220)");

Thread.sleep(20000);

List<String> incrExpected =
Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20", "3,db1_tb1,180", "4,db2_tb1,200");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected, sql1, 3, false);

List<String> incrExpected2 =
Arrays.asList(
"1,db1_tb2_1,19",
"2,db1_tb2_2,191",
"3,db2_tb2_2,21",
"4,db2_tbl2_2,211",
"5,db1_tb2_1,1901",
"6,db1_tb2_2,1902",
"7,db2_tb2_1,2101",
"8,db2_tb2_2,2102");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false);

List<String> incrExpected3 = Arrays.asList("1,db2_tb3,22", "3,db2_tb3,220");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false);

cancelE2EJob(jobName);
}

@After
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void getRecordTableName() throws Exception {
String record =
"{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\": \\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime\":null,\"txnNumber\":null,\"lsid\":null}";
MongoParsingProcessFunction mongoParsingProcessFunction =
new MongoParsingProcessFunction(null);
new MongoParsingProcessFunction(null, null);
String recordTableName = mongoParsingProcessFunction.getRecordTableName(record);
assertEquals("cdc_test", recordTableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
--including-tables "tbl.*|auto_add"
--table-conf replication_num=1
--single-sink true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
--including-tables "tbl.*"
--table-conf replication_num=1
--single-sink true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
--including-tables "tbl1|tbl2|tbl3|tbl5"
--table-conf replication_num=1
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
--including-tables "create_tbl_.*"
--create-table-only
--table-conf table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq;
CREATE TABLE test_e2e_mysql.create_tbl_uniq (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql
--including-tables "tbl1|tbl2|tbl3|tbl5"
--table-conf replication_num=1
--sink-conf sink.enable-delete=false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql_db.*
--including-tables ".*"
--multi-to-one-origin "tbl2.*"
--multi-to-one-target "tbl2_merge"
--table-conf replication_num=1
Loading

0 comments on commit 1023235

Please sign in to comment.