Skip to content

Commit

Permalink
[FLINK-FLINK-36115][pipeline-connector][mysql] add scan.binlog.newly-…
Browse files Browse the repository at this point in the history
…added-table.enabled option to Allow to scan newly table during binlog reading stage.
  • Loading branch information
lvyanquan committed Aug 21, 2024
1 parent ea93328 commit 5de2c45
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 11 deletions.
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ pipeline:
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 <br>
scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于: <br>
scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; <br>
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ pipeline:
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. <br>
The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: <br>
scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore; <br>
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
Expand Down Expand Up @@ -132,7 +132,7 @@ public DataSource createDataSource(Context context) {
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanIncrementalNewlyAddedTableEnabled =
config.get(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED);
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -267,7 +267,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

Expand Down Expand Up @@ -442,7 +442,7 @@ private String validateTableAndReturnDebeziumStyle(String tables) {
"the `,` in "
+ tables
+ " is not supported when "
+ SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED
+ SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
+ " was enabled.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ public class MySqlDataSourceOptions {
+ "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.incremental.newly-added-table.enabled")
public static final ConfigOption<Boolean> SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.binlog.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to scan the ddl and dml statements of newly added tables or not in incremental reading stage, by default is false.");
"In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n"
+ "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n"
+ "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restore; \n"
+ "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -154,10 +154,10 @@ private MySqlConnection getConnection() {
}

@Test
public void testAddNewlyTablesInIncrementalStage() throws Exception {
public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_INCREMENTAL_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
Expand All @@ -180,7 +180,7 @@ public void testAddNewlyTablesInIncrementalStage() throws Exception {
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(env, source, resultBuffer, serializer, accumulatorName);
env.executeAsync("AddNewlyTablesInIncrementalStage");
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(getConnection(), Collections.singletonList("address_beijing"));
List<Event> actual = fetchResults(iterator, 4);
assertThat(((ChangeEvent) actual.get(0)).tableId())
Expand Down

0 comments on commit 5de2c45

Please sign in to comment.