From 0ad032cf656e7488df067ffdf8dfbf12b2438c39 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 12 Jul 2024 18:14:19 +0800 Subject: [PATCH] Resolve conflicts --- .../flink/translator/SchemaOperatorTranslator.java | 2 -- .../cdc/runtime/operators/schema/SchemaOperator.java | 5 +++-- .../runtime/operators/schema/SchemaOperatorFactory.java | 1 - .../operators/schema/coordinator/SchemaRegistry.java | 3 +-- .../schema/coordinator/SchemaRegistryProvider.java | 8 ++------ 5 files changed, 6 insertions(+), 13 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index fedeee11ebd..c965c88a623 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -17,11 +17,9 @@ package org.apache.flink.cdc.composer.flink.translator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 803186e330d..fe6c7a3bfcf 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -248,7 +248,7 @@ public void processElement(StreamRecord streamRecord) } private void processSchemaChangeEvents(SchemaChangeEvent event) - throws InterruptedException, TimeoutException { + throws InterruptedException, TimeoutException, ExecutionException { TableId tableId = event.tableId(); LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId); handleSchemaChangeEvent(tableId, event); @@ -258,7 +258,8 @@ private void processSchemaChangeEvents(SchemaChangeEvent event) List optionalRoutedTable = getRoutedTables(tableId); if (!optionalRoutedTable.isEmpty()) { - getRoutedTables(tableId) + tableIdMappingCache + .get(tableId) .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed))); } else { evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId)); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 5f263bb2d42..7cd35a20d3e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -19,7 +19,6 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index dbeaae137b2..bbf7bc5ac0e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -19,7 +19,6 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; -import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; @@ -269,7 +268,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData metadataApplier, schemaManager, schemaDerivation, - schemaManager.getBehavior()); + schemaChangeBehavior); break; } default: diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java index c965ed2afd0..dd7f2dc36ca 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java @@ -17,18 +17,14 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; -import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import java.util.List; -import java.util.stream.Collectors; /** Provider of {@link SchemaRegistry}. */ @Internal @@ -61,7 +57,7 @@ public OperatorID getOperatorId() { @Override public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { - return new SchemaRegistry(operatorName, context, metadataApplier, routingRules); - + return new SchemaRegistry( + operatorName, context, metadataApplier, routingRules, schemaChangeBehavior); } }