Skip to content

Commit

Permalink
[Refactor] Minor refactor update the latest dynamic options using the…
Browse files Browse the repository at this point in the history
… cdc action. (apache#3090)
  • Loading branch information
yuzelin authored Mar 25, 2024
1 parent d77b75d commit 1cf8b47
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = copyOptionsWithoutBucket(fileStoreTable);
// Update the latest options to schema
toOptionsChange(identifier, fileStoreTable.options());
alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
Expand All @@ -151,7 +149,7 @@ protected void beforeBuildingSourceSink() throws Exception {
computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
catalog.createTable(identifier, paimonSchema, false);
fileStoreTable = (FileStoreTable) catalog.getTable(identifier).copy(tableConfig);
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,25 @@ protected abstract void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory);

protected FileStoreTable copyOptionsWithoutBucket(FileStoreTable table) {
Map<String, String> toCopy = new HashMap<>(tableConfig);
toCopy.remove(CoreOptions.BUCKET.key());
return table.copy(toCopy);
}
protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
// doesn't support altering bucket here
Map<String, String> withoutBucket = new HashMap<>(tableConfig);
withoutBucket.remove(CoreOptions.BUCKET.key());

protected void toOptionsChange(Identifier identifier, Map<String, String> options)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> optionChanges =
options.entrySet().stream()
withoutBucket.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
catalog.alterTable(identifier, optionChanges, false);

try {
catalog.alterTable(identifier, optionChanges, false);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
| Catalog.ColumnNotExistException e) {
throw new RuntimeException("This is unexpected.", e);
}

return table.copy(withoutBucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,12 @@ protected void beforeBuildingSourceSink() throws Exception {
true);
try {
table = (FileStoreTable) catalog.getTable(identifier);
table = copyOptionsWithoutBucket(table);
Supplier<String> errMsg =
incompatibleMessage(table.schema(), tableInfo, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
table = alterTableOptions(identifier, table);
tables.add(table);
monitoredTables.addAll(tableInfo.identifiers());
// Update the latest options to schema
toOptionsChange(identifier, table.options());
} else {
excludedTables.addAll(tableInfo.identifiers());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -155,34 +154,6 @@ protected void waitForResult(
}
}

protected void waitForOptions(Map<String, String> expected, FileStoreTable table)
throws Exception {

// wait for table options to become our expected options
Map<String, String> expectedOptions = new HashMap<>(expected);
expectedOptions.put("path", table.options().get("path"));
while (true) {
if (table.options().size() == expectedOptions.size()) {
boolean result =
table.options().entrySet().stream()
.allMatch(
entry -> {
Object key = entry.getKey();
Object value1 = entry.getValue();
Object value2 = expectedOptions.get(key);
return expectedOptions.containsKey(key)
&& Objects.equals(value1, value2);
});
if (result) {
break;
}
}

table = table.copyWithLatestSchema();
Thread.sleep(1000);
}
}

protected Map<String, String> getBasicTableConfig() {
Map<String, String> config = new HashMap<>();
ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testTableOptionsChange() throws Exception {
runActionWithDefaultEnv(action2);

FileStoreTable table = getFileStoreTable(tableName);
waitForOptions(tableConfig, table);
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void testOptionsChange() throws Exception {
runActionWithDefaultEnv(action2);

FileStoreTable table = getFileStoreTable(tableName);
waitForOptions(tableConfig, table);
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ public void testOptionsChange() throws Exception {
runActionWithDefaultEnv(action2);

FileStoreTable table = getFileStoreTable();
waitForOptions(tableConfig, table);
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
Expand Down Expand Up @@ -1276,6 +1276,6 @@ public void testInvalidAlterBucket() throws Exception {
assertThatCode(action::build).doesNotThrowAnyException();

FileStoreTable table = getFileStoreTable();
waitForOptions(Collections.singletonMap(BUCKET.key(), "1"), table);
assertThat(table.options().get(BUCKET.key())).isEqualTo("1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ public void testOptionsChange() throws Exception {
runActionWithDefaultEnv(action2);

FileStoreTable table = getFileStoreTable();
waitForOptions(tableConfig, table);
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
Expand Down

0 comments on commit 1cf8b47

Please sign in to comment.