Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 #3409

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ public SelectorsBuilder includeTables(String tableInclusions) {
Predicates.setOf(
tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str);
for (String tableSplit : tableSplitSet) {
Set<String> tableIdSet =
Predicates.setOf(
List<String> tableIdList =
Predicates.listOf(
tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str);
Iterator<String> iterator = tableIdSet.iterator();
if (tableIdSet.size() == 1) {
Iterator<String> iterator = tableIdList.iterator();
if (tableIdList.size() == 1) {
selectors.add(new Selector(null, null, iterator.next()));
} else if (tableIdSet.size() == 2) {
} else if (tableIdList.size() == 2) {
selectors.add(new Selector(null, iterator.next(), iterator.next()));
} else if (tableIdSet.size() == 3) {
} else if (tableIdList.size() == 3) {
selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next()));
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -78,6 +79,21 @@ public static <T> Set<T> setOf(
return matches;
}

public static <T> List<T> listOf(
String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) {
return Collections.emptyList();
}
List<T> matches = new LinkedList<>();
for (String item : splitter.apply(input)) {
T obj = factory.apply(item);
if (obj != null) {
matches.add(obj);
}
}
return matches;
}

protected static <T> Function<T, Optional<Pattern>> matchedByPattern(
Collection<Pattern> patterns, Function<T, String> conversion) {
return (t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public void testTableSelector() {
// nameSpace, schemaName, tableName
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+")
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+,db.sc1.sc1")
.build();

assertAllowed(selectors, "db", "sc1", "sc1");
assertAllowed(selectors, "db", "sc1", "A1");
assertAllowed(selectors, "db", "sc1", "A2");
assertAllowed(selectors, "db", "sc2", "B0");
Expand All @@ -50,9 +51,12 @@ public void testTableSelector() {

selectors =
new Selectors.SelectorsBuilder()
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+")
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+,db\\..sc1.sc1,db.sc1.sc1")
.build();

assertAllowed(selectors, "db", "sc1", "sc1");
assertAllowed(selectors, "db1", "sc1", "sc1");
assertAllowed(selectors, "dba", "sc1", "sc1");
assertAllowed(selectors, "db1", "sc1", "A1");
assertAllowed(selectors, "dba", "sc1", "A2");
assertAllowed(selectors, "db", "sc2", "B0");
Expand All @@ -68,8 +72,11 @@ public void testTableSelector() {

// schemaName, tableName
selectors =
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
new Selectors.SelectorsBuilder()
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
.build();

assertAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, "sc1", "A1");
assertAllowed(selectors, null, "sc1", "A2");
assertAllowed(selectors, null, "sc2", "B0");
Expand All @@ -82,8 +89,12 @@ public void testTableSelector() {
assertNotAllowed(selectors, null, "sc1A", "A1");

// tableName
selectors = new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+").build();
selectors =
new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+,sc1").build();

assertAllowed(selectors, null, null, "sc1");
assertNotAllowed(selectors, "db", "sc1", "sc1");
assertNotAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, null, "1A1");
assertAllowed(selectors, null, null, "AA2");
assertAllowed(selectors, null, null, "B0");
Expand All @@ -94,8 +105,11 @@ public void testTableSelector() {
assertNotAllowed(selectors, null, null, "2B");

selectors =
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
new Selectors.SelectorsBuilder()
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
.build();

assertAllowed(selectors, null, "sc1", "sc1");
assertAllowed(selectors, null, "sc1", "A1");
assertAllowed(selectors, null, "sc1", "A2");
assertAllowed(selectors, null, "sc1", "A2");
Expand All @@ -107,6 +121,15 @@ public void testTableSelector() {
assertNotAllowed(selectors, null, "sc2", "B2");
assertNotAllowed(selectors, null, "sc11", "A1");
assertNotAllowed(selectors, null, "sc1A", "A1");

selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc1").build();
assertAllowed(selectors, null, "sc1", "sc1");

selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc[0-9]+").build();
assertAllowed(selectors, null, "sc1", "sc1");

selectors = new Selectors.SelectorsBuilder().includeTables("sc1.\\.*").build();
assertAllowed(selectors, null, "sc1", "sc1");
}

protected void assertAllowed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import org.junit.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -124,6 +127,44 @@ public void testExcludeAllTable() {
+ tableExclude);
}

@Test
public void testDatabaseAndTableWithTheSameName() throws SQLException {
inventoryDatabase.createAndInitialize();
// create a table with the same name of database
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
String createSameNameTableSql =
String.format(
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+ " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512)\n"
+ ");",
inventoryDatabase.getDatabaseName(),
inventoryDatabase.getDatabaseName());

statement.execute(createSameNameTableSql);
}
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(
TABLES.key(),
inventoryDatabase.getDatabaseName() + "." + inventoryDatabase.getDatabaseName());
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().getTableList())
.isEqualTo(
Arrays.asList(
inventoryDatabase.getDatabaseName()
+ "."
+ inventoryDatabase.getDatabaseName()));
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Expand Down
Loading