Skip to content

Commit

Permalink
add case
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 3, 2025
1 parent aa361e8 commit 8ebb7b9
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public boolean build() throws Exception {
streamSource.process(buildProcessFunction());
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, dbTbl.f1);
DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
Expand Down Expand Up @@ -230,7 +230,7 @@ public DorisSink<String> buildDorisSink() {
}

public ParsingProcessFunction buildProcessFunction() {
return new ParsingProcessFunction(converter);
return new ParsingProcessFunction(database, converter);
}

/** create doris sink. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.StringUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

Expand All @@ -32,8 +34,10 @@ public class ParsingProcessFunction extends ProcessFunction<String, Void> {
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;
private String database;

public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
public ParsingProcessFunction(String database, DatabaseSync.TableNameConverter converter) {
this.database = database;
this.converter = converter;
}

Expand All @@ -47,8 +51,17 @@ public void processElement(
String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector)
throws Exception {
String tableName = getRecordTableName(record);
String dorisName = converter.convert(tableName);
context.output(getRecordOutputTag(dorisName), record);
String dorisTableName = converter.convert(tableName);
String dorisDbName = database;
if (StringUtils.isNullOrWhitespaceOnly(database)) {
dorisDbName = getRecordDatabaseName(record);
}
context.output(getRecordOutputTag(dorisDbName, dorisTableName), record);
}

private String getRecordDatabaseName(String record) throws JsonProcessingException {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
return extractJsonNode(recordRoot.get("source"), "db");
}

protected String getRecordTableName(String record) throws Exception {
Expand All @@ -60,12 +73,13 @@ protected String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}

private OutputTag<String> getRecordOutputTag(String tableName) {
private OutputTag<String> getRecordOutputTag(String databaseName, String tableName) {
String tableIdentifier = databaseName + "." + tableName;
return recordOutputTags.computeIfAbsent(
tableName, ParsingProcessFunction::createRecordOutputTag);
tableIdentifier, k -> createRecordOutputTag(databaseName, tableName));
}

public static OutputTag<String> createRecordOutputTag(String tableName) {
return new OutputTag<String>("record-" + tableName) {};
public static OutputTag<String> createRecordOutputTag(String databaseName, String tableName) {
return new OutputTag<String>(String.format("record-%s-%s", databaseName, tableName)) {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {

@Override
public ParsingProcessFunction buildProcessFunction() {
return new MongoParsingProcessFunction(converter);
return new MongoParsingProcessFunction(database, converter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
public class MongoParsingProcessFunction extends ParsingProcessFunction {
private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class);

public MongoParsingProcessFunction(TableNameConverter converter) {
super(converter);
public MongoParsingProcessFunction(String databaseName, TableNameConverter converter) {
super(databaseName, converter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
argList.add(USERNAME + "=" + getMySQLUsername());
argList.add(MYSQL_CONF);
argList.add(PASSWORD + "=" + getMySQLPassword());
// argList.add(MYSQL_CONF);
// argList.add("server-time-zone=UTC");
argList.add(MYSQL_CONF);
argList.add("server-time-zone=UTC");

setSinkConfDefaultConfig(argList);
return argList;
Expand Down Expand Up @@ -427,20 +427,35 @@ private String getCreateTableSQL(String database, String table) throws Exception
@Test
public void testMySQL2DorisMultiDatabaseSync() throws Exception {
String jobName = "testMySQL2DorisMultiDatabaseSync";
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db1");
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS test_e2e_mysql_db2");
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt");

// wait 2 times checkpoint
Thread.sleep(20000);
// wait 3 times checkpoint
Thread.sleep(30000);
LOG.info("Start to verify init result.");
List<String> initExpected =
Arrays.asList("1,db1_tb1,18", "1,db1_tb2,19", "1,db2_tb1,20", "1,db2_tb2,21");
String sql =
"SELECT * FROM test_e2e_mysql_db1.tbl1 UNION ALL \n"
+ "SELECT * FROM test_e2e_mysql_db1.tbl2 UNION ALL \n"
+ "SELECT * FROM test_e2e_mysql_db2.tbl1 UNION ALL \n"
+ "SELECT * FROM test_e2e_mysql_db2.tbl2 ";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql, 3);
List<String> initExpected1 = Arrays.asList("1,db1_tb1,18");
String sql1 = "SELECT * FROM test_e2e_mysql_db1.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected1, sql1, 3, false);

List<String> initExpected2 = Arrays.asList("1,db1_tb2,19");
String sql2 = "SELECT * FROM test_e2e_mysql_db1.tbl2";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false);

List<String> initExpected3 = Arrays.asList("1,db2_tb1,20");
String sql3 = "SELECT * FROM test_e2e_mysql_db2.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false);

List<String> initExpected4 = Arrays.asList("1,db2_tb2,21");
String sql4 = "SELECT * FROM test_e2e_mysql_db2.tbl2";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected4, sql4, 3, false);

List<String> initExpected5 = Arrays.asList("1,db2_tb3,22");
String sql5 = "SELECT * FROM test_e2e_mysql_db2.tbl3";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected5, sql5, 3, false);

// add incremental data
ContainerUtils.executeSQLStatement(
Expand All @@ -449,20 +464,89 @@ public void testMySQL2DorisMultiDatabaseSync() throws Exception {
"insert into test_e2e_mysql_db1.tbl1 values (2,'db1_tb1',180)",
"insert into test_e2e_mysql_db1.tbl2 values (2,'db1_tb2',190)",
"insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',200)",
"insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)");
"insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)",
"insert into test_e2e_mysql_db2.tbl3 values (2,'db2_tb3',220)");

Thread.sleep(20000);
List<String> expected =
List<String> incrExpected1 = Arrays.asList("1,db1_tb1,18", "2,db1_tb1,180");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected1, sql1, 3, false);

List<String> incrExpected2 = Arrays.asList("1,db1_tb2,19", "2,db1_tb2,190");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false);

List<String> incrExpected3 = Arrays.asList("1,db2_tb1,20", "2,db2_tb1,200");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false);

List<String> incrExpected4 = Arrays.asList("1,db2_tb2,21", "2,db2_tb2,210");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected4, sql4, 3, false);

List<String> incrExpected5 = Arrays.asList("1,db2_tb3,22", "2,db2_tb3,220");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected5, sql5, 3, false);

cancelE2EJob(jobName);
}

/**
* Separate databases and tables to write to the same database and table
*
* @throws Exception
*/
@Test
public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
String jobName = "testMySQL2DorisMultiDatabase2OneSync";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt");

// wait 3 times checkpoint
Thread.sleep(30000);
LOG.info("Start to verify init result.");
List<String> initExpected = Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20");
String sql1 = "SELECT * FROM test_e2e_mysql.tbl1";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected, sql1, 3, false);

List<String> initExpected2 =
Arrays.asList(
"1,db1_tb2_1,19", "2,db1_tb2_2,191", "3,db2_tb2_2,21", "4,db2_tbl2_2,211");
String sql2 = "SELECT * FROM test_e2e_mysql.tbl2_merge";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected2, sql2, 3, false);

List<String> initExpected3 = Arrays.asList("1,db2_tb3,22");
String sql3 = "SELECT * FROM test_e2e_mysql.tbl3";
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, initExpected3, sql3, 3, false);

// add incremental data
ContainerUtils.executeSQLStatement(
getMySQLQueryConnection(),
LOG,
"insert into test_e2e_mysql_db1.tbl1 values (3,'db1_tb1',180)",
"insert into test_e2e_mysql_db2.tbl1 values (4,'db2_tb1',200)",
"insert into test_e2e_mysql_db1.tbl2_1 values (5,'db1_tb2_1',1901)",
"insert into test_e2e_mysql_db1.tbl2_2 values (6,'db1_tb2_2',1902)",
"insert into test_e2e_mysql_db2.tbl2_1 values (7,'db2_tb2_1',2101)",
"insert into test_e2e_mysql_db2.tbl2_2 values (8,'db2_tb2_2',2102)",
"insert into test_e2e_mysql_db2.tbl3 values (3,'db2_tb3',220)");

Thread.sleep(20000);

List<String> incrExpected =
Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20", "3,db1_tb1,180", "4,db2_tb1,200");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected, sql1, 3, false);

List<String> incrExpected2 =
Arrays.asList(
"1,db1_tb1,18",
"1,db1_tb2,19",
"1,db2_tb1,20",
"1,db2_tb2,21",
"2,db1_tb1,180",
"2,db1_tb2,190",
"2,db2_tb1,200",
"2,db2_tb2,210");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql, 3);
"1,db1_tb2_1,19",
"2,db1_tb2_2,191",
"3,db2_tb2_2,21",
"4,db2_tbl2_2,211",
"5,db1_tb2_1,1901",
"6,db1_tb2_2,1902",
"7,db2_tb2_1,2101",
"8,db2_tb2_2,2102");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected2, sql2, 3, false);

List<String> incrExpected3 = Arrays.asList("1,db2_tb3,22", "3,db2_tb3,220");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, incrExpected3, sql3, 3, false);

cancelE2EJob(jobName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void getRecordTableName() throws Exception {
String record =
"{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\": \\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime\":null,\"txnNumber\":null,\"lsid\":null}";
MongoParsingProcessFunction mongoParsingProcessFunction =
new MongoParsingProcessFunction(null);
new MongoParsingProcessFunction(null, null);
String recordTableName = mongoParsingProcessFunction.getRecordTableName(record);
assertEquals("cdc_test", recordTableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mysql-sync-database
--database test_e2e_mysql
--mysql-conf database-name=test_e2e_mysql.*
--including-tables ".*"
--multi-to-one-origin "tbl2.*"
--multi-to-one-target "tbl2_merge"
--table-conf replication_num=1
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- tbl1
CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1;
DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1;
CREATE TABLE test_e2e_mysql_db1.tbl1 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18);

CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2;
DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1;
CREATE TABLE test_e2e_mysql_db2.tbl1 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',20);

-- tbl2_1 tbl2_2
DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_1;
CREATE TABLE test_e2e_mysql_db1.tbl2_1 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db1.tbl2_1 values (1,'db1_tb2_1',19);

DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_2;
CREATE TABLE test_e2e_mysql_db1.tbl2_2 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db1.tbl2_2 values (2,'db1_tb2_2',191);

-- db2
DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_1;
CREATE TABLE test_e2e_mysql_db2.tbl2_1 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl2_1 values (3,'db2_tb2_2',21);

DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_2;
CREATE TABLE test_e2e_mysql_db2.tbl2_2 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl2_2 values (4,'db2_tbl2_2',211);


DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
CREATE TABLE test_e2e_mysql_db2.tbl3 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ CREATE TABLE test_e2e_mysql_db2.tbl2 (
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl2 values (1,'db2_tb2',21);
insert into test_e2e_mysql_db2.tbl2 values (1,'db2_tb2',21);

DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
CREATE TABLE test_e2e_mysql_db2.tbl3 (
`id` int NOT NULL,
`name` varchar(255) DEFAULT NULL,
`age` bigint DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
);
insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);

0 comments on commit 8ebb7b9

Please sign in to comment.