diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 62f8f210eb3..f3a11ea7cc8 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file. There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. | Field | Data Type | Description | -|--------------------|-----------|----------------------------------------------| -| __namespace_name__ | String | Name of the namespace that contains the row. | -| __schema_name__ | String | Name of the schema that contains the row. | -| __table_name__ | String | Name of the table that contains the row. | +|---------------------|-----------|----------------------------------------------| +| __namespace_name__ | String | Name of the namespace that contains the row. | +| __schema_name__ | String | Name of the schema that contains the row. | +| __table_name__ | String | Name of the table that contains the row. | +| __data_event_type__ | String | Operation type of data change event. | ## Metadata relationship diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 62f8f210eb3..f3a11ea7cc8 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file. There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. | Field | Data Type | Description | -|--------------------|-----------|----------------------------------------------| -| __namespace_name__ | String | Name of the namespace that contains the row. | -| __schema_name__ | String | Name of the schema that contains the row. | -| __table_name__ | String | Name of the table that contains the row. | +|---------------------|-----------|----------------------------------------------| +| __namespace_name__ | String | Name of the namespace that contains the row. | +| __schema_name__ | String | Name of the schema that contains the row. | +| __table_name__ | String | Name of the table that contains the row. | +| __data_event_type__ | String | Operation type of data change event. | ## Metadata relationship diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index eafb4035e65..45cdbe9e852 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -340,6 +340,66 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*,concat(col1,'0') as col12,__data_event_type__ as rk", + "col1 <> '3'", + "col1", + "col12", + "key1=value1", + ""); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Collections.singletonList(transformDef)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}"); + } + @ParameterizedTest @EnumSource void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 0dad9f36330..e2797715336 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -442,9 +442,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { + " type: values\n" + "transform:\n" + " - source-table: %s.TABLEALPHA\n" - + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n" + + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n" + " - source-table: %s.TABLEBETA\n" - + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n" + + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n" + "pipeline:\n" + " parallelism: 1", INTER_CONTAINER_MYSQL_ALIAS, @@ -462,25 +462,25 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); validateEvents( - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}"); // generate binlogs String mysqlJdbcUrl = @@ -492,9 +492,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { insertBinlogEvents(mysqlJdbcUrl); validateEvents( - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA], op=UPDATE, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}"); + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}"); } private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index a4f938d158a..36955f8e1b5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; @@ -389,13 +390,15 @@ private Optional processFilter( BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); // insert and update event only process afterData, delete only process beforeData if (after != null) { - if (transformFilterProcessor.process(after, epochTime)) { + if (transformFilterProcessor.process( + after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) { return Optional.of(dataChangeEvent); } else { return Optional.empty(); } } else if (before != null) { - if (transformFilterProcessor.process(before, epochTime)) { + if (transformFilterProcessor.process( + before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) { return Optional.of(dataChangeEvent); } else { return Optional.empty(); @@ -412,11 +415,14 @@ private Optional processProjection( BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); if (before != null) { BinaryRecordData projectedBefore = - postTransformProcessor.processData(before, epochTime); + postTransformProcessor.processData( + before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-')); dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); } if (after != null) { - BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime); + BinaryRecordData projectedAfter = + postTransformProcessor.processData( + after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+')); dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); } return Optional.of(dataChangeEvent); @@ -499,4 +505,8 @@ private void destroyUdf() { } }); } + + private String opTypeToRowKind(OperationType opType, char beforeOrAfter) { + return String.format("%c%c", beforeOrAfter, opType.name().charAt(0)); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index cbe290dcb73..ee7740d9e2a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -21,7 +21,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; -import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import org.codehaus.janino.ExpressionEvaluator; @@ -33,6 +33,8 @@ import java.util.LinkedHashSet; import java.util.List; +import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; + /** * The processor of the projection column. It processes the data column and the user-defined * computed columns. @@ -79,9 +81,9 @@ public ProjectionColumn getProjectionColumn() { return projectionColumn; } - public Object evaluate(BinaryRecordData after, long epochTime) { + public Object evaluate(BinaryRecordData record, long epochTime, String opType) { try { - return expressionEvaluator.evaluate(generateParams(after, epochTime)); + return expressionEvaluator.evaluate(generateParams(record, epochTime, opType)); } catch (InvocationTargetException e) { LOG.error( "Table:{} column:{} projection:{} execute failed. {}", @@ -93,7 +95,7 @@ public Object evaluate(BinaryRecordData after, long epochTime) { } } - private Object[] generateParams(BinaryRecordData after, long epochTime) { + private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) { List params = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); @@ -103,15 +105,18 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); for (String originalColumnName : originalColumnNames) { switch (originalColumnName) { - case TransformParser.DEFAULT_NAMESPACE_NAME: + case MetadataColumns.DEFAULT_NAMESPACE_NAME: params.add(tableInfo.getNamespace()); continue; - case TransformParser.DEFAULT_SCHEMA_NAME: + case MetadataColumns.DEFAULT_SCHEMA_NAME: params.add(tableInfo.getSchemaName()); continue; - case TransformParser.DEFAULT_TABLE_NAME: + case MetadataColumns.DEFAULT_TABLE_NAME: params.add(tableInfo.getTableName()); continue; + case MetadataColumns.DEFAULT_DATA_EVENT_TYPE: + params.add(opType); + continue; } boolean argumentFound = false; @@ -120,7 +125,7 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { if (column.getName().equals(originalColumnName)) { params.add( DataTypeConverter.convertToOriginal( - fieldGetters[i].getFieldOrNull(after), column.getType())); + fieldGetters[i].getFieldOrNull(record), column.getType())); argumentFound = true; break; } @@ -158,20 +163,14 @@ private TransformExpressionKey generateTransformExpressionKey() { } for (String originalColumnName : originalColumnNames) { - switch (originalColumnName) { - case TransformParser.DEFAULT_NAMESPACE_NAME: - argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); - paramTypes.add(String.class); - break; - case TransformParser.DEFAULT_SCHEMA_NAME: - argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); - paramTypes.add(String.class); - break; - case TransformParser.DEFAULT_TABLE_NAME: - argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); - paramTypes.add(String.class); - break; - } + METADATA_COLUMNS.stream() + .filter(col -> col.f0.equals(originalColumnName)) + .findFirst() + .ifPresent( + col -> { + argumentNames.add(col.f0); + paramTypes.add(col.f2); + }); } argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index d1f67818b0c..430813d7fd8 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; +import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import org.codehaus.janino.ExpressionEvaluator; @@ -32,11 +33,8 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; -import java.util.stream.Stream; -import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_NAMESPACE_NAME; -import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_SCHEMA_NAME; -import static org.apache.flink.cdc.runtime.parser.TransformParser.DEFAULT_TABLE_NAME; +import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; /** The processor of the transform filter. It processes the data change event of matched table. */ public class TransformFilterProcessor { @@ -74,9 +72,10 @@ public static TransformFilterProcessor of( tableInfo, transformFilter, timezone, udfDescriptors, udfFunctionInstances); } - public boolean process(BinaryRecordData after, long epochTime) { + public boolean process(BinaryRecordData record, long epochTime, String opType) { try { - return (Boolean) expressionEvaluator.evaluate(generateParams(after, epochTime)); + return (Boolean) + expressionEvaluator.evaluate(generateParams(record, epochTime, opType)); } catch (InvocationTargetException e) { LOG.error( "Table:{} filter:{} execute failed. {}", @@ -102,19 +101,19 @@ private Tuple2, List>> generateArguments() { } } } - Stream.of(DEFAULT_NAMESPACE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME) + + METADATA_COLUMNS.stream() .forEach( - metadataColumn -> { - if (scriptExpression.contains(metadataColumn) - && !argNames.contains(metadataColumn)) { - argNames.add(metadataColumn); - argTypes.add(String.class); + col -> { + if (scriptExpression.contains(col.f0) && !argNames.contains(col.f0)) { + argNames.add(col.f0); + argTypes.add(col.f2); } }); return Tuple2.of(argNames, argTypes); } - private Object[] generateParams(BinaryRecordData after, long epochTime) { + private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) { List params = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); @@ -123,22 +122,25 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters(); for (String columnName : args.f0) { switch (columnName) { - case DEFAULT_NAMESPACE_NAME: + case MetadataColumns.DEFAULT_NAMESPACE_NAME: params.add(tableInfo.getNamespace()); continue; - case DEFAULT_SCHEMA_NAME: + case MetadataColumns.DEFAULT_SCHEMA_NAME: params.add(tableInfo.getSchemaName()); continue; - case DEFAULT_TABLE_NAME: + case MetadataColumns.DEFAULT_TABLE_NAME: params.add(tableInfo.getTableName()); continue; + case MetadataColumns.DEFAULT_DATA_EVENT_TYPE: + params.add(opType); + continue; } for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); if (column.getName().equals(columnName)) { params.add( DataTypeConverter.convertToOriginal( - fieldGetters[i].getFieldOrNull(after), column.getType())); + fieldGetters[i].getFieldOrNull(record), column.getType())); break; } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 452ac2238c4..45ea3577094 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -113,7 +113,7 @@ public Schema processSchemaChangeEvent(Schema schema) { .collect(Collectors.toList())); } - public BinaryRecordData processData(BinaryRecordData payload, long epochTime) { + public BinaryRecordData processData(BinaryRecordData payload, long epochTime, String opType) { List valueList = new ArrayList<>(); List columns = postTransformChangeInfo.getPostTransformedSchema().getColumns(); @@ -124,7 +124,7 @@ public BinaryRecordData processData(BinaryRecordData payload, long epochTime) { ProjectionColumn projectionColumn = projectionColumnProcessor.getProjectionColumn(); valueList.add( DataTypeConverter.convert( - projectionColumnProcessor.evaluate(payload, epochTime), + projectionColumnProcessor.evaluate(payload, epochTime, opType), projectionColumn.getDataType())); } else { Column column = columns.get(i); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index ea7bf4c5c13..c7a1b718e1c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory; @@ -84,6 +83,7 @@ import java.util.stream.IntStream; import static org.apache.flink.cdc.common.utils.StringUtils.isNullOrWhitespaceOnly; +import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; import static org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.convertCalciteType; /** Use Flink's calcite parser to parse the statement of flink cdc pipeline transform. */ @@ -91,9 +91,6 @@ public class TransformParser { private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class); private static final String DEFAULT_SCHEMA = "default_schema"; private static final String DEFAULT_TABLE = "TB"; - public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__"; - public static final String DEFAULT_SCHEMA_NAME = "__schema_name__"; - public static final String DEFAULT_TABLE_NAME = "__table_name__"; private static SqlParser getCalciteParser(String sql) { return SqlParser.create( @@ -497,16 +494,14 @@ private static SqlSelect parseProjectionExpression(String projection) { private static List copyFillMetadataColumn(List columns) { // Add metaColumn for SQLValidator.validate List columnsWithMetadata = new ArrayList<>(columns); - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); + METADATA_COLUMNS.stream() + .map(col -> Column.physicalColumn(col.f0, col.f1)) + .forEach(columnsWithMetadata::add); return columnsWithMetadata; } private static boolean isMetadataColumn(String columnName) { - return DEFAULT_TABLE_NAME.equals(columnName) - || DEFAULT_SCHEMA_NAME.equals(columnName) - || DEFAULT_NAMESPACE_NAME.equals(columnName); + return METADATA_COLUMNS.stream().anyMatch(col -> col.f0.equals(columnName)); } public static SqlSelect parseFilterExpression(String filterExpression) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java new file mode 100644 index 00000000000..f70e012f14c --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.parser.metadata; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Arrays; +import java.util.List; + +/** Contains all supported metadata columns that could be used in transform expressions. */ +public class MetadataColumns { + public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__"; + public static final String DEFAULT_SCHEMA_NAME = "__schema_name__"; + public static final String DEFAULT_TABLE_NAME = "__table_name__"; + public static final String DEFAULT_DATA_EVENT_TYPE = "__data_event_type__"; + + public static final List>> METADATA_COLUMNS = + Arrays.asList( + Tuple3.of(DEFAULT_NAMESPACE_NAME, DataTypes.STRING(), String.class), + Tuple3.of(DEFAULT_SCHEMA_NAME, DataTypes.STRING(), String.class), + Tuple3.of(DEFAULT_TABLE_NAME, DataTypes.STRING(), String.class), + Tuple3.of(DEFAULT_DATA_EVENT_TYPE, DataTypes.STRING(), String.class)); +}