diff --git a/docs/content.zh/docs/core-concept/route.md b/docs/content.zh/docs/core-concept/route.md index 5e69205794..ca5855d586 100644 --- a/docs/content.zh/docs/core-concept/route.md +++ b/docs/content.zh/docs/core-concept/route.md @@ -30,11 +30,12 @@ under the License. # Parameters To describe a route, the follows are required: -| parameter | meaning | optional/required | -|--------------|----------------------------------------------------|-------------------| -| source-table | Source table id, supports regular expressions | required | -| sink-table | Sink table id, supports regular expressions | required | -| description | Routing rule description(a default value provided) | optional | +| parameter | meaning | optional/required | +|----------------|---------------------------------------------------------------------------------------------|-------------------| +| source-table | Source table id, supports regular expressions | required | +| sink-table | Sink table id, supports symbol replacement | required | +| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional | +| description | Routing rule description(a default value provided) | optional | A route module can contain a list of source-table/sink-table rules. @@ -71,4 +72,18 @@ route: - source-table: mydb.products sink-table: ods_db.ods_products description: sync products table to ods_products -``` \ No newline at end of file +``` + +## Pattern Replacement in routing rules + +If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this: + +```yaml +route: + - source-table: source_db.\.* + sink-table: sink_db.<> + replace-symbol: <> + description: route all tables in source_db to sink_db +``` + +Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle. \ No newline at end of file diff --git a/docs/content/docs/core-concept/route.md b/docs/content/docs/core-concept/route.md index 5e69205794..ca5855d586 100644 --- a/docs/content/docs/core-concept/route.md +++ b/docs/content/docs/core-concept/route.md @@ -30,11 +30,12 @@ under the License. # Parameters To describe a route, the follows are required: -| parameter | meaning | optional/required | -|--------------|----------------------------------------------------|-------------------| -| source-table | Source table id, supports regular expressions | required | -| sink-table | Sink table id, supports regular expressions | required | -| description | Routing rule description(a default value provided) | optional | +| parameter | meaning | optional/required | +|----------------|---------------------------------------------------------------------------------------------|-------------------| +| source-table | Source table id, supports regular expressions | required | +| sink-table | Sink table id, supports symbol replacement | required | +| replace-symbol | Special symbol in sink-table for pattern replacing, will be replaced by original table name | optional | +| description | Routing rule description(a default value provided) | optional | A route module can contain a list of source-table/sink-table rules. @@ -71,4 +72,18 @@ route: - source-table: mydb.products sink-table: ods_db.ods_products description: sync products table to ods_products -``` \ No newline at end of file +``` + +## Pattern Replacement in routing rules + +If you'd like to route source tables and rename them to sink tables with specific patterns, `replace-symbol` could be used to resemble source table names like this: + +```yaml +route: + - source-table: source_db.\.* + sink-table: sink_db.<> + replace-symbol: <> + description: route all tables in source_db to sink_db +``` + +Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle. \ No newline at end of file diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 23b2c63ff7..7ad07af1a4 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -55,6 +55,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // Route keys private static final String ROUTE_SOURCE_TABLE_KEY = "source-table"; private static final String ROUTE_SINK_TABLE_KEY = "sink-table"; + private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol"; private static final String ROUTE_DESCRIPTION_KEY = "description"; // Transform keys @@ -164,11 +165,15 @@ private RouteDef toRouteDef(JsonNode routeNode) { "Missing required field \"%s\" in route configuration", ROUTE_SINK_TABLE_KEY) .asText(); + String replaceSymbol = + Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL)) + .map(JsonNode::asText) + .orElse(null); String description = Optional.ofNullable(routeNode.get(ROUTE_DESCRIPTION_KEY)) .map(JsonNode::asText) .orElse(null); - return new RouteDef(sourceTable, sinkTable, description); + return new RouteDef(sourceTable, sinkTable, replaceSymbol, description); } private TransformDef toTransformDef(JsonNode transformNode) { diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 2d05bcbde3..75dc5bd628 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -166,6 +166,15 @@ void testInvalidTimeZone() throws Exception { + "Or use 'UTC' without time zone and daylight saving time."); } + @Test + void testRouteWithReplacementSymbol() throws Exception { + URL resource = + Resources.getResource("definitions/pipeline-definition-full-with-repsym.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isEqualTo(fullDefWithRouteRepSym); + } + private final PipelineDef fullDef = new PipelineDef( new SourceDef( @@ -197,10 +206,12 @@ void testInvalidTimeZone() throws Exception { new RouteDef( "mydb.default.app_order_.*", "odsdb.default.app_order", + null, "sync all sharding tables to one"), new RouteDef( "mydb.default.web_order", "odsdb.default.ods_web_order", + null, "sync table to with given prefix ods_")), Arrays.asList( new TransformDef( @@ -258,10 +269,12 @@ void testInvalidTimeZone() throws Exception { new RouteDef( "mydb.default.app_order_.*", "odsdb.default.app_order", + null, "sync all sharding tables to one"), new RouteDef( "mydb.default.web_order", "odsdb.default.ods_web_order", + null, "sync table to with given prefix ods_")), Arrays.asList( new TransformDef( @@ -312,7 +325,10 @@ void testInvalidTimeZone() throws Exception { .build())), Collections.singletonList( new RouteDef( - "mydb.default.app_order_.*", "odsdb.default.app_order", null)), + "mydb.default.app_order_.*", + "odsdb.default.app_order", + null, + null)), Collections.emptyList(), Configuration.fromMap( ImmutableMap.builder() @@ -326,4 +342,67 @@ void testInvalidTimeZone() throws Exception { Collections.emptyList(), Collections.emptyList(), Configuration.fromMap(Collections.singletonMap("parallelism", "1"))); + + private final PipelineDef fullDefWithRouteRepSym = + new PipelineDef( + new SourceDef( + "mysql", + "source-database", + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "pass") + .put( + "tables", + "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") + .put( + "chunk-column", + "app_order_.*:id,web_order:product_id") + .put("capture-new-tables", "true") + .build())), + new SinkDef( + "kafka", + "sink-queue", + Configuration.fromMap( + ImmutableMap.builder() + .put("bootstrap-servers", "localhost:9092") + .put("auto-create-table", "true") + .build())), + Arrays.asList( + new RouteDef( + "mydb.default.app_order_.*", + "odsdb.default.app_order_<>", + "<>", + "sync all sharding tables to one"), + new RouteDef( + "mydb.default.web_order", + "odsdb.default.ods_web_order_>_<", + ">_<", + "sync table to with given prefix ods_")), + Arrays.asList( + new TransformDef( + "mydb.app_order_.*", + "id, order_id, TO_UPPER(product_name)", + "id > 10 AND order_id > 100", + "id", + "product_name", + "comment=app order", + "project fields from source table"), + new TransformDef( + "mydb.web_order_.*", + "CONCAT(id, order_id) as uniq_id, *", + "uniq_id > 10", + null, + null, + null, + "add new uniq_id for each row")), + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "source-database-sync-pipe") + .put("parallelism", "4") + .put("schema.change.behavior", "evolve") + .put("schema-operator.rpc-timeout", "1 h") + .build())); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml new file mode 100644 index 0000000000..265358fb41 --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml @@ -0,0 +1,61 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +source: + type: mysql + name: source-database + host: localhost + port: 3306 + username: admin + password: pass + tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.* + chunk-column: app_order_.*:id,web_order:product_id + capture-new-tables: true + +sink: + type: kafka + name: sink-queue + bootstrap-servers: localhost:9092 + auto-create-table: true + +route: + - source-table: mydb.default.app_order_.* + sink-table: odsdb.default.app_order_<> + replace-symbol: "<>" + description: sync all sharding tables to one + - source-table: mydb.default.web_order + sink-table: odsdb.default.ods_web_order_>_< + replace-symbol: ">_<" + description: sync table to with given prefix ods_ + +transform: + - source-table: mydb.app_order_.* + projection: id, order_id, TO_UPPER(product_name) + filter: id > 10 AND order_id > 100 + primary-keys: id + partition-keys: product_name + table-options: comment=app order + description: project fields from source table + - source-table: mydb.web_order_.* + projection: CONCAT(id, order_id) as uniq_id, * + filter: uniq_id > 10 + description: add new uniq_id for each row + +pipeline: + name: source-database-sync-pipe + parallelism: 4 + schema.change.behavior: evolve + schema-operator.rpc-timeout: 1 h \ No newline at end of file diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java new file mode 100644 index 0000000000..4fbfb61b66 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.route; + +import java.io.Serializable; + +/** Definition of a routing rule with replacement symbol. */ +public class RouteRule implements Serializable { + + private static final long serialVersionUID = 1L; + + public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) { + this.sourceTable = sourceTable; + this.sinkTable = sinkTable; + this.replaceSymbol = replaceSymbol; + } + + public String sourceTable; + public String sinkTable; + public String replaceSymbol; +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java index ee55eb366b..868c5c67fd 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/RouteDef.java @@ -36,11 +36,17 @@ public class RouteDef { private final String sourceTable; private final String sinkTable; + private final String replaceSymbol; @Nullable private final String description; - public RouteDef(String sourceTable, String sinkTable, @Nullable String description) { + public RouteDef( + String sourceTable, + String sinkTable, + @Nullable String replaceSymbol, + @Nullable String description) { this.sourceTable = sourceTable; this.sinkTable = sinkTable; + this.replaceSymbol = replaceSymbol; this.description = description; } @@ -52,6 +58,10 @@ public String getSinkTable() { return sinkTable; } + public Optional getReplaceSymbol() { + return Optional.ofNullable(replaceSymbol); + } + public Optional getDescription() { return Optional.ofNullable(description); } @@ -63,6 +73,8 @@ public String toString() { + sourceTable + ", sinkTable=" + sinkTable + + ", replaceSymbol=" + + replaceSymbol + ", description='" + description + '\'' @@ -80,11 +92,12 @@ public boolean equals(Object o) { RouteDef routeDef = (RouteDef) o; return Objects.equals(sourceTable, routeDef.sourceTable) && Objects.equals(sinkTable, routeDef.sinkTable) + && Objects.equals(replaceSymbol, routeDef.replaceSymbol) && Objects.equals(description, routeDef.description); } @Override public int hashCode() { - return Objects.hash(sourceTable, sinkTable, description); + return Objects.hash(sourceTable, sinkTable, replaceSymbol, description); } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index a69741c7b4..1f5dc44bdc 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -17,12 +17,11 @@ package org.apache.flink.cdc.composer.flink.translator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.composer.definition.RouteDef; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; @@ -80,10 +79,13 @@ private DataStream addSchemaOperator( int parallelism, MetadataApplier metadataApplier, List routes) { - List> routingRules = new ArrayList<>(); + List routingRules = new ArrayList<>(); for (RouteDef route : routes) { routingRules.add( - Tuple2.of(route.getSourceTable(), TableId.parse(route.getSinkTable()))); + new RouteRule( + route.getSourceTable(), + route.getSinkTable(), + route.getReplaceSymbol().orElse(null))); } SingleOutputStreamOperator stream = input.transform( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index b59f4ead64..26c9c91875 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -424,8 +424,8 @@ void testOneToOneRouting() throws Exception { TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2"); List routeDef = Arrays.asList( - new RouteDef(TABLE_1.toString(), routedTable1.toString(), null), - new RouteDef(TABLE_2.toString(), routedTable2.toString(), null)); + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null)); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -616,6 +616,7 @@ void testMergingWithRoute() throws Exception { new RouteDef( "default_namespace.default_schema.mytable[0-9]", mergedTable.toString(), + null, null)); // Setup pipeline @@ -657,4 +658,62 @@ void testMergingWithRoute() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}"); } + + @ParameterizedTest + @EnumSource + void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.table[0-9]", + "replaced_namespace.replaced_schema.__$__", + "__$__", + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=replaced_namespace.replaced_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=replaced_namespace.replaced_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=replaced_namespace.replaced_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index 4ff001e79e..e674d57d05 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -624,6 +624,102 @@ public void testOneToManyRoute() throws Exception { "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); } + @Test + public void testReplacementSymbol() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "route:\n" + + " - source-table: %s.\\.*\n" + + " sink-table: NEW_%s.NEW_<>\n" + + " replace-symbol: <>\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + routeTestDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + routeTestDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + routeTestDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + routeTestDatabase.getDatabaseName())); + + validateResult( + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1008, 8], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1009, 8.1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1010, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[1011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2012, 12], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2013, 13], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[2014, 14], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3015, Amber], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3016, Black], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3017, Cyan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3018, Denim], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4019, Yosemite], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4020, El Capitan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4021, Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4022, High Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4023, Mojave], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[4024, Catalina], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + + generateIncrementalChanges(); + + validateResult( + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[3007, 7], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[2014, 14], after=[2014, 2014], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[3019, Emerald], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[4024, Catalina], after=[], op=DELETE, meta=()}"); + + generateSchemaChanges(); + validateResult( + "AddColumnEvent{tableId=NEW_%s.NEW_TABLEALPHA, addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VARCHAR(19)}}", + "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", + "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}", + "DataChangeEvent{tableId=NEW_%s.NEW_TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); + } + private void validateResult(String... expectedEvents) throws Exception { for (String event : expectedEvents) { waitUntilSpecificEvent(String.format(event, routeTestDatabase.getDatabaseName())); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 430e7f263f..d1f468bfe7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; @@ -70,6 +71,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -88,22 +90,33 @@ public class SchemaOperator extends AbstractStreamOperator private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class); private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1); - private final List> routingRules; + private final List routingRules; + + /** + * Storing route source table selector, sink table name (before symbol replacement), and replace + * symbol in a tuple. + */ + private transient List> routes; - private transient List> routes; private transient TaskOperatorEventGateway toCoordinator; private transient SchemaEvolutionClient schemaEvolutionClient; private transient LoadingCache cachedSchemas; + /** + * Storing mapping relations between upstream tableId (source table) mapping to downstream + * tableIds (sink tables). + */ + private transient LoadingCache> tableIdMappingCache; + private final long rpcTimeOutInMillis; - public SchemaOperator(List> routingRules) { + public SchemaOperator(List routingRules) { this.routingRules = routingRules; this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); } - public SchemaOperator(List> routingRules, Duration rpcTimeOut) { + public SchemaOperator(List routingRules, Duration rpcTimeOut) { this.routingRules = routingRules; this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); @@ -119,14 +132,14 @@ public void setup( routes = routingRules.stream() .map( - tuple2 -> { - String tableInclusions = tuple2.f0; - TableId replaceBy = tuple2.f1; + rule -> { + String tableInclusions = rule.sourceTable; Selectors selectors = new Selectors.SelectorsBuilder() .includeTables(tableInclusions) .build(); - return new Tuple2<>(selectors, replaceBy); + return new Tuple3<>( + selectors, rule.sinkTable, rule.replaceSymbol); }) .collect(Collectors.toList()); schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, getOperatorID()); @@ -140,6 +153,16 @@ public Schema load(TableId tableId) { return getLatestSchema(tableId); } }); + tableIdMappingCache = + CacheBuilder.newBuilder() + .expireAfterAccess(CACHE_EXPIRE_DURATION) + .build( + new CacheLoader>() { + @Override + public List load(TableId tableId) { + return getRoutedTables(tableId); + } + }); } @Override @@ -158,7 +181,7 @@ public void initializeState(StateInitializationContext context) throws Exception */ @Override public void processElement(StreamRecord streamRecord) - throws InterruptedException, TimeoutException { + throws InterruptedException, TimeoutException, ExecutionException { Event event = streamRecord.getValue(); // Schema changes if (event instanceof SchemaChangeEvent) { @@ -169,15 +192,15 @@ public void processElement(StreamRecord streamRecord) handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event); // Update caches cachedSchemas.put(tableId, getLatestSchema(tableId)); - getRoutedTables(tableId) + tableIdMappingCache + .get(tableId) .forEach(routed -> cachedSchemas.put(routed, getLatestSchema(routed))); return; } // Data changes DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - TableId tableId = dataChangeEvent.tableId(); - List optionalRoutedTable = getRoutedTables(tableId); + List optionalRoutedTable = tableIdMappingCache.get(dataChangeEvent.tableId()); if (optionalRoutedTable.isEmpty()) { output.collect(streamRecord); } else { @@ -270,10 +293,18 @@ private RecordData regenerateRecordData( private List getRoutedTables(TableId originalTableId) { return routes.stream() .filter(route -> route.f0.isMatch(originalTableId)) - .map(route -> route.f1) + .map(route -> resolveReplacement(originalTableId, route)) .collect(Collectors.toList()); } + private TableId resolveReplacement( + TableId originalTable, Tuple3 route) { + if (route.f2 != null) { + return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName())); + } + return TableId.parse(route.f1); + } + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { // The request will need to send a FlushEvent or block until flushing finished diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index ecf5001715..eba7c77122 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -17,10 +17,9 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -40,12 +39,10 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private static final long serialVersionUID = 1L; private final MetadataApplier metadataApplier; - private final List> routingRules; + private final List routingRules; public SchemaOperatorFactory( - MetadataApplier metadataApplier, - List> routingRules, - Duration rpcTimeOut) { + MetadataApplier metadataApplier, List routingRules, Duration rpcTimeOut) { super(new SchemaOperator(routingRules, rpcTimeOut)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index 7b85383824..b936da6080 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; @@ -48,19 +49,37 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** Derive schema changes based on the routing rules. */ public class SchemaDerivation { private final SchemaManager schemaManager; - private final List> routes; private final Map> derivationMapping; + /** + * Storing route source table selector, sink table name (before symbol replacement), and replace + * symbol in a tuple. + */ + private transient List> routes; + public SchemaDerivation( SchemaManager schemaManager, - List> routes, + List routeRules, Map> derivationMapping) { this.schemaManager = schemaManager; - this.routes = routes; + this.routes = + routeRules.stream() + .map( + rule -> { + String tableInclusions = rule.sourceTable; + Selectors selectors = + new Selectors.SelectorsBuilder() + .includeTables(tableInclusions) + .build(); + return new Tuple3<>( + selectors, rule.sinkTable, rule.replaceSymbol); + }) + .collect(Collectors.toList()); this.derivationMapping = derivationMapping; } @@ -69,7 +88,7 @@ public List applySchemaChange(SchemaChangeEvent schemaChangeE TableId originalTable = schemaChangeEvent.tableId(); boolean noRouteMatched = true; - for (Tuple2 route : routes) { + for (Tuple3 route : routes) { // Check routing table if (!route.f0.isMatch(originalTable)) { continue; @@ -78,7 +97,7 @@ public List applySchemaChange(SchemaChangeEvent schemaChangeE noRouteMatched = false; // Matched a routing rule - TableId derivedTable = route.f1; + TableId derivedTable = resolveReplacement(originalTable, route); Set originalTables = derivationMapping.computeIfAbsent(derivedTable, t -> new HashSet<>()); originalTables.add(originalTable); @@ -134,6 +153,14 @@ public List applySchemaChange(SchemaChangeEvent schemaChangeE } } + private TableId resolveReplacement( + TableId originalTable, Tuple3 route) { + if (route.f2 != null) { + return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName())); + } + return TableId.parse(route.f1); + } + public Map> getDerivationMapping() { return derivationMapping; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 2c718dcec4..02abb8903e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -17,9 +17,8 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; @@ -90,7 +89,7 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH /** Metadata applier for applying schema changes to external system. */ private final MetadataApplier metadataApplier; - private final List> routes; + private final List routes; /** The request handler that handle all requests and events. */ private SchemaRegistryRequestHandler requestHandler; @@ -104,7 +103,7 @@ public SchemaRegistry( String operatorName, OperatorCoordinator.Context context, MetadataApplier metadataApplier, - List> routes) { + List routes) { this.context = context; this.operatorName = operatorName; this.failedReasons = new HashMap<>(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java index 1f6e7aaf57..0db0cf3a7e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java @@ -17,16 +17,13 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import java.util.List; -import java.util.stream.Collectors; /** Provider of {@link SchemaRegistry}. */ @Internal @@ -36,13 +33,13 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider { private final OperatorID operatorID; private final String operatorName; private final MetadataApplier metadataApplier; - private final List> routingRules; + private final List routingRules; public SchemaRegistryProvider( OperatorID operatorID, String operatorName, MetadataApplier metadataApplier, - List> routingRules) { + List routingRules) { this.operatorID = operatorID; this.operatorName = operatorName; this.metadataApplier = metadataApplier; @@ -56,19 +53,6 @@ public OperatorID getOperatorId() { @Override public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { - List> routes = - routingRules.stream() - .map( - tuple2 -> { - String tableInclusions = tuple2.f0; - TableId replaceBy = tuple2.f1; - Selectors selectors = - new Selectors.SelectorsBuilder() - .includeTables(tableInclusions) - .build(); - return new Tuple2<>(selectors, replaceBy); - }) - .collect(Collectors.toList()); - return new SchemaRegistry(operatorName, context, metadataApplier, routes); + return new SchemaRegistry(operatorName, context, metadataApplier, routingRules); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 19fd5a4a13..adaf3b1400 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -25,10 +24,10 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; @@ -81,13 +80,9 @@ class SchemaDerivationTest { .column(Column.physicalColumn("gender", DataTypes.STRING())) .build(); - private static final List> ROUTES = + private static final List ROUTES = Collections.singletonList( - Tuple2.of( - new Selectors.SelectorsBuilder() - .includeTables("mydb.myschema.mytable[0-9]") - .build(), - MERGED_TABLE)); + new RouteRule("mydb.myschema.mytable[0-9]", MERGED_TABLE.toString(), null)); @Test void testOneToOneMapping() {