diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index e45dd86b82a3..b41ecd7f4be5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -122,9 +122,7 @@ protected void beforeBuildingSourceSink() throws Exception { // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - fileStoreTable = copyOptionsWithoutBucket(fileStoreTable); - // Update the latest options to schema - toOptionsChange(identifier, fileStoreTable.options()); + alterTableOptions(identifier, fileStoreTable); try { Schema retrievedSchema = retrieveSchema(); computedColumns = @@ -151,7 +149,7 @@ protected void beforeBuildingSourceSink() throws Exception { computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); catalog.createTable(identifier, paimonSchema, false); - fileStoreTable = (FileStoreTable) catalog.getTable(identifier).copy(tableConfig); + fileStoreTable = (FileStoreTable) catalog.getTable(identifier); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 78af9050ee83..684408f2438e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -176,20 +176,25 @@ protected abstract void buildSink( DataStream input, EventParser.Factory parserFactory); - protected FileStoreTable copyOptionsWithoutBucket(FileStoreTable table) { - Map toCopy = new HashMap<>(tableConfig); - toCopy.remove(CoreOptions.BUCKET.key()); - return table.copy(toCopy); - } + protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { + // doesn't support altering bucket here + Map withoutBucket = new HashMap<>(tableConfig); + withoutBucket.remove(CoreOptions.BUCKET.key()); - protected void toOptionsChange(Identifier identifier, Map options) - throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, - Catalog.ColumnNotExistException { List optionChanges = - options.entrySet().stream() + withoutBucket.entrySet().stream() .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); - catalog.alterTable(identifier, optionChanges, false); + + try { + catalog.alterTable(identifier, optionChanges, false); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new RuntimeException("This is unexpected.", e); + } + + return table.copy(withoutBucket); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index ecd9921d116c..da3b80c5f09a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -155,14 +155,12 @@ protected void beforeBuildingSourceSink() throws Exception { true); try { table = (FileStoreTable) catalog.getTable(identifier); - table = copyOptionsWithoutBucket(table); Supplier errMsg = incompatibleMessage(table.schema(), tableInfo, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { + table = alterTableOptions(identifier, table); tables.add(table); monitoredTables.addAll(tableInfo.identifiers()); - // Update the latest options to schema - toOptionsChange(identifier, table.options()); } else { excludedTables.addAll(tableInfo.identifiers()); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index ae726fe0ee02..7a621030217a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -52,7 +52,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import static org.assertj.core.api.Assertions.assertThat; @@ -155,34 +154,6 @@ protected void waitForResult( } } - protected void waitForOptions(Map expected, FileStoreTable table) - throws Exception { - - // wait for table options to become our expected options - Map expectedOptions = new HashMap<>(expected); - expectedOptions.put("path", table.options().get("path")); - while (true) { - if (table.options().size() == expectedOptions.size()) { - boolean result = - table.options().entrySet().stream() - .allMatch( - entry -> { - Object key = entry.getKey(); - Object value1 = entry.getValue(); - Object value2 = expectedOptions.get(key); - return expectedOptions.containsKey(key) - && Objects.equals(value1, value2); - }); - if (result) { - break; - } - } - - table = table.copyWithLatestSchema(); - Thread.sleep(1000); - } - } - protected Map getBasicTableConfig() { Map config = new HashMap<>(); ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java index e95b204b2398..5c998c33cf6b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java @@ -111,7 +111,7 @@ public void testTableOptionsChange() throws Exception { runActionWithDefaultEnv(action2); FileStoreTable table = getFileStoreTable(tableName); - waitForOptions(tableConfig, table); + assertThat(table.options()).containsAllEntriesOf(tableConfig); } @Test diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index 66e31b170ef2..48c6cd481103 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -201,7 +201,7 @@ public void testOptionsChange() throws Exception { runActionWithDefaultEnv(action2); FileStoreTable table = getFileStoreTable(tableName); - waitForOptions(tableConfig, table); + assertThat(table.options()).containsAllEntriesOf(tableConfig); } @Test diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 1874a93a37fc..2ed3b6f466c5 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1094,7 +1094,7 @@ public void testOptionsChange() throws Exception { runActionWithDefaultEnv(action2); FileStoreTable table = getFileStoreTable(); - waitForOptions(tableConfig, table); + assertThat(table.options()).containsAllEntriesOf(tableConfig); } @Test @@ -1276,6 +1276,6 @@ public void testInvalidAlterBucket() throws Exception { assertThatCode(action::build).doesNotThrowAnyException(); FileStoreTable table = getFileStoreTable(); - waitForOptions(Collections.singletonMap(BUCKET.key(), "1"), table); + assertThat(table.options().get(BUCKET.key())).isEqualTo("1"); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index f7f0c7b06818..fbd441125cf6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -723,7 +723,7 @@ public void testOptionsChange() throws Exception { runActionWithDefaultEnv(action2); FileStoreTable table = getFileStoreTable(); - waitForOptions(tableConfig, table); + assertThat(table.options()).containsAllEntriesOf(tableConfig); } @Test