Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 12, 2024
1 parent 2633019 commit 0ad032c
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void processElement(StreamRecord<Event> streamRecord)
}

private void processSchemaChangeEvents(SchemaChangeEvent event)
throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, ExecutionException {
TableId tableId = event.tableId();
LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId);
handleSchemaChangeEvent(tableId, event);
Expand All @@ -258,7 +258,8 @@ private void processSchemaChangeEvents(SchemaChangeEvent event)

List<TableId> optionalRoutedTable = getRoutedTables(tableId);
if (!optionalRoutedTable.isEmpty()) {
getRoutedTables(tableId)
tableIdMappingCache
.get(tableId)
.forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed)));
} else {
evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
Expand Down Expand Up @@ -269,7 +268,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
metadataApplier,
schemaManager,
schemaDerivation,
schemaManager.getBehavior());
schemaChangeBehavior);
break;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@

package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

import java.util.List;
import java.util.stream.Collectors;

/** Provider of {@link SchemaRegistry}. */
@Internal
Expand Down Expand Up @@ -61,7 +57,7 @@ public OperatorID getOperatorId() {

@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
return new SchemaRegistry(operatorName, context, metadataApplier, routingRules);

return new SchemaRegistry(
operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
}
}

0 comments on commit 0ad032c

Please sign in to comment.