Skip to content

Commit

Permalink
[Feature][MySQL-CDC] Support database/table wildcards scan read (#8323)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Jan 7, 2025
1 parent a4a38cc commit 2116843
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 3 deletions.
30 changes: 30 additions & 0 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ When an initial consistent snapshot is made for large databases, your establishe
| username | String | Yes | - | Name of the database to use when connecting to the database server. |
| password | String | Yes | - | Password to use when connecting to the database server. |
| database-names | List | No | - | Database name of the database to monitor. |
| database-pattern | String | No | .* | The database names RegEx of the database to capture, for example: `database_prefix.*`. |
| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` |
| table-pattern | String | Yes | - | The table names RegEx of the database to capture. The table name needs to include the database name, for example: `database.*\\.table_.*` |
| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}] |
| startup.mode | Enum | No | INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize historical data at startup, and then synchronize incremental data.<br/> `earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup from the latest offset.<br/> `specific`: Startup from user-supplied specific offsets. |
| startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** |
Expand Down Expand Up @@ -303,6 +305,34 @@ sink {
}
```
### Support table-pattern for multi-table reading
> `table-pattern` and `table-names` are mutually exclusive
```hocon
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652
username = "st_user_source"
password = "mysqlpw"
database-pattern = "source.*"
table-pattern = "source.*\\..*"
base-url = "jdbc:mysql://mysql_cdc_e2e:3306"
}
}
sink {
Console {
}
}
```


## Changelog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
protected String originUrl;
protected List<String> databaseList;
protected List<String> tableList;
protected String databasePattern;
protected String tablePattern;
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
protected double distributionFactorUpper =
Expand Down Expand Up @@ -243,6 +245,8 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.password = config.get(JdbcSourceOptions.PASSWORD);
this.databaseList = config.get(JdbcSourceOptions.DATABASE_NAMES);
this.tableList = config.get(CatalogOptions.TABLE_NAMES);
this.databasePattern = config.get(CatalogOptions.DATABASE_PATTERN);
this.tablePattern = config.get(CatalogOptions.TABLE_PATTERN);
this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ public MySqlSourceConfig create(int subtaskId) {
}
if (databaseList != null) {
props.setProperty("database.include.list", String.join(",", databaseList));
} else if (databasePattern != null) {
props.setProperty("database.include.list", databasePattern);
}
if (tableList != null) {
props.setProperty("table.include.list", String.join(",", tableList));
} else if (tablePattern != null) {
props.setProperty("table.include.list", tablePattern);
}
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
Expand All @@ -38,12 +39,14 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;

@AutoService(Factory.class)
@Slf4j
public class MySqlIncrementalSourceFactory extends BaseChangeStreamTableSourceFactory {
@Override
public String factoryIdentifier() {
Expand Down Expand Up @@ -99,9 +102,9 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
TableSource<T, SplitT, StateT> restoreSource(
TableSourceFactoryContext context, List<CatalogTable> restoreTables) {
return () -> {
ReadonlyConfig config = context.getOptions();
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
CatalogTableUtil.getCatalogTables(config, context.getClassLoader());
boolean enableSchemaChange =
context.getOptions()
.getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
Expand Down Expand Up @@ -137,7 +140,7 @@ TableSource<T, SplitT, StateT> restoreSource(
text -> TablePath.of(text, false));
}
return (SeaTunnelSource<T, SplitT, StateT>)
new MySqlIncrementalSource<>(context.getOptions(), catalogTables);
new MySqlIncrementalSource<>(config, catalogTables);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

@Slf4j
@DisabledOnContainer(
Expand All @@ -72,6 +73,7 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource {
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(
MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE);
private final String QUERY_SQL = "select * from %s.%s";

// mysql source table query sql
private static final String SOURCE_SQL_TEMPLATE =
Expand Down Expand Up @@ -539,6 +541,59 @@ public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer container)
SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)))));
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support cdc")
public void testMysqlCdcByWildcardsConfig(TestContainer container)
throws IOException, InterruptedException {
inventoryDatabase.setTemplateName("wildcards").createAndInitialize();
CompletableFuture.runAsync(
() -> {
try {
container.executeJob("/mysqlcdc_wildcards_to_mysql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
TimeUnit.SECONDS.sleep(5);
inventoryDatabase.setTemplateName("wildcards_dml").createAndInitialize();
given().pollDelay(20, TimeUnit.SECONDS)
.pollInterval(2000, TimeUnit.MILLISECONDS)
.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertAll(
() -> {
log.info(
query(getQuerySQL("sink", "source_products"))
.toString());
Assertions.assertIterableEquals(
query(getQuerySQL("source", "products")),
query(getQuerySQL("sink", "source_products")));
},
() -> {
log.info(
query(getQuerySQL("sink", "source_customers"))
.toString());
Assertions.assertIterableEquals(
query(getQuerySQL("source", "customers")),
query(getQuerySQL("sink", "source_customers")));
},
() -> {
log.info(
query(getQuerySQL("sink", "source1_orders"))
.toString());
Assertions.assertIterableEquals(
query(getQuerySQL("source1", "orders")),
query(getQuerySQL("sink", "source1_orders")));
});
});
}

private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
Expand Down Expand Up @@ -703,4 +758,8 @@ private String getSourceQuerySQL(String database, String tableName) {
private String getSinkQuerySQL(String database, String tableName) {
return String.format(SINK_SQL_TEMPLATE, database, tableName);
}

private String getQuerySQL(String database, String tableName) {
return String.format(QUERY_SQL, database, tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: source
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE IF NOT EXISTS `source`;
use `source`;

drop table if exists `source`.`products`;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight FLOAT
);

ALTER TABLE `source`.`products` AUTO_INCREMENT = 101;

INSERT INTO `source`.`products`
VALUES (101,"scooter","Small 2-wheel scooter",3.14),
(102,"car battery","12V car battery",8.1),
(103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(104,"hammer","12oz carpenter's hammer",0.75),
(105,"hammer","14oz carpenter's hammer",0.875),
(106,"hammer","16oz carpenter's hammer",1.0),
(107,"rocks","box of assorted rocks",5.3),
(108,"jacket","water resistent black wind breaker",0.1),
(109,"spare tire","24 inch spare tire",22.2);


DROP TABLE IF EXISTS `source`.`customers`;
CREATE TABLE `source`.`customers` (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;


INSERT INTO `source`.`customers`
VALUES (1001,"Sally","Thomas","[email protected]"),
(1002,"George","Bailey","[email protected]"),
(1003,"Edward","Walker","[email protected]"),
(1004,"Anne","Kretchmar","[email protected]");


-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: source1
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE IF NOT EXISTS `source1`;
use `source1`;

DROP TABLE IF EXISTS `source1`.`orders`;
CREATE TABLE `source1`.`orders` (
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL
) AUTO_INCREMENT = 10001;


INSERT INTO `source1`.`orders`
VALUES (10001, '2016-01-16', 1001, 1, 102),
(10002, '2016-01-17', 1002, 2, 105),
(10003, '2016-02-18', 1004, 3, 109),
(10004, '2016-02-19', 1002, 2, 106),
(10005, '16-02-21', 1003, 1, 107);

CREATE DATABASE IF NOT EXISTS `sink`;

use `sink`;

DROP TABLE IF EXISTS `source_products`;
DROP TABLE IF EXISTS `source_customers`;
DROP TABLE IF EXISTS `source1_orders`;

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: source
-- ----------------------------------------------------------------------------------------------------------------

use `source`;

UPDATE `source`.`products` SET name = 'Illustrated new quality productivity' WHERE id = 102;
INSERT INTO `source`.`customers` VALUES (1005,"Zhangdonghao","","[email protected]");

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: source1
-- ----------------------------------------------------------------------------------------------------------------

use `source1`;
DELETE FROM `source1`.`orders` where order_number < 10004;


Loading

0 comments on commit 2116843

Please sign in to comment.