From b986dcb9f10af8b4ebf2185c79e2c9160842c4a4 Mon Sep 17 00:00:00 2001 From: zhourui999 <1459939299@qq.com> Date: Mon, 19 Aug 2024 10:06:33 +0800 Subject: [PATCH] feat:Mysql-CDC supports configuring parameter scan.incremental.snapshot.backfill.skip,Postgres-CDC supports scan.incremental.snapshot.backfill.skip and scan.incremental.close-idle-reader.enabled parameters --- .../paimon/flink/action/cdc/mysql/MySqlActionUtils.java | 4 +++- .../flink/action/cdc/postgres/PostgresActionUtils.java | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index 309471cbbc70..792b763117f9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -176,10 +176,12 @@ public static MySqlSource buildMySqlSource( mySqlConfig .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL) .ifPresent(sourceBuilder::heartbeatInterval); - mySqlConfig .getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED) .ifPresent(sourceBuilder::closeIdleReaders); + mySqlConfig + .getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP) + .ifPresent(sourceBuilder::skipSnapshotBackfill); String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE); // see diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java index 165a77eb3f1c..027fef8fd3a6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java @@ -154,6 +154,12 @@ public static JdbcIncrementalSource buildPostgresSource( postgresConfig .getOptional(PostgresSourceOptions.HEARTBEAT_INTERVAL) .ifPresent(sourceBuilder::heartbeatInterval); + postgresConfig + .getOptional(PostgresSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED) + .ifPresent(sourceBuilder::closeIdleReaders); + postgresConfig + .getOptional(PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP) + .ifPresent(sourceBuilder::skipSnapshotBackfill); String startupMode = postgresConfig.get(PostgresSourceOptions.SCAN_STARTUP_MODE);