Skip to content

Commit

Permalink
[FLINK-36128] Promote LENIENT as the default schema change behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 25, 2024
1 parent 060d203 commit 69ca252
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
Expand All @@ -35,11 +37,14 @@

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

Expand Down Expand Up @@ -99,6 +104,19 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon

private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {

// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Source is required
SourceDef sourceDef =
toSourceDef(
Expand All @@ -113,7 +131,8 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));
SINK_KEY),
schemaChangeBehavior);

// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Expand All @@ -128,14 +147,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
Expand All @@ -162,7 +173,7 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}

private SinkDef toSinkDef(JsonNode sinkNode) {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();

Expand All @@ -172,6 +183,23 @@ private SinkDef toSinkDef(JsonNode sinkNode) {
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));

if (includedSETypes.isEmpty()) {
// If no schema evolution types are specified, include all schema evolution types by
// default.
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.forEach(includedSETypes::add);
}

if (excludedSETypes.isEmpty()
&& SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
// In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be
// overridden by manually specifying excluded types.
Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE)
.map(SchemaChangeEventType::getTag)
.forEach(excludedSETypes::add);
}

Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.composer.definition.UdfDef;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

import org.junit.jupiter.api.Test;
Expand All @@ -37,6 +38,11 @@
import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
Expand Down Expand Up @@ -384,7 +390,13 @@ void testParsingFullDefinitionFromString() throws Exception {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers", "localhost:9092")
.build())),
.build()),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*",
Expand All @@ -401,7 +413,16 @@ void testParsingFullDefinitionFromString() throws Exception {
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
new SinkDef(
"kafka",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -474,7 +495,16 @@ void testParsingFullDefinitionFromString() throws Exception {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
new SinkDef("values", null, new Configuration()),
new SinkDef(
"values",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,23 @@
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
@PublicEvolving
public enum SchemaChangeEventType {
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE;
ADD_COLUMN("add.column"),
ALTER_COLUMN_TYPE("alter.column.type"),
CREATE_TABLE("create.table"),
DROP_COLUMN("drop.column"),
DROP_TABLE("drop.table"),
RENAME_COLUMN("rename.column"),
TRUNCATE_TABLE("truncate.table");

private final String tag;

SchemaChangeEventType(String tag) {
this.tag = tag;
}

public String getTag() {
return tag;
}

public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class PipelineOptions {
public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
.defaultValue(SchemaChangeBehavior.EVOLVE)
.defaultValue(SchemaChangeBehavior.LENIENT)
.withDescription(
Description.builder()
.text("Behavior for handling schema change events. ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ public static Set<SchemaChangeEventType> resolveSchemaEvolutionOptions(
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();

if (includedSchemaEvolutionTypes.isEmpty()) {
resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
} else {
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}

for (String excludeTag : excludedSchemaEvolutionTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package org.apache.flink.cdc.common.utils;

import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;

import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
Expand All @@ -36,9 +41,12 @@
public class ChangeEventUtilsTest {
@Test
public void testResolveSchemaEvolutionOptions() {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.emptyList()))

List<String> allTags =
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.collect(Collectors.toList());
assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, Collections.emptyList()))
.isEqualTo(
Sets.set(
TRUNCATE_TABLE,
Expand All @@ -51,7 +59,7 @@ public void testResolveSchemaEvolutionOptions() {

assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop")))
allTags, Collections.singletonList("drop")))
.isEqualTo(
Sets.set(
ADD_COLUMN,
Expand All @@ -73,7 +81,7 @@ public void testResolveSchemaEvolutionOptions() {

assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop.column")))
allTags, Collections.singletonList("drop.column")))
.isEqualTo(
Sets.set(
ADD_COLUMN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
Expand Down Expand Up @@ -134,6 +135,8 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -191,6 +194,8 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -313,6 +318,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -373,6 +380,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -441,6 +450,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -496,6 +507,8 @@ void testOneToOneRouting() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -569,6 +582,8 @@ void testIdenticalOneToOneRouting() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -766,6 +781,8 @@ void testMergingWithRoute() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -972,6 +989,8 @@ void testTransformMergingWithRoute() throws Exception {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down Expand Up @@ -1035,6 +1054,8 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
Expand Down
Loading

0 comments on commit 69ca252

Please sign in to comment.