Skip to content

Commit

Permalink
[FLINK-35805][transform] Add __data_event_type__ metadata column
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 13, 2024
1 parent 1042095 commit 565942b
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 78 deletions.
9 changes: 5 additions & 4 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |

## Metadata relationship

Expand Down
9 changes: 5 additions & 4 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |

## Metadata relationship

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,66 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
}

@ParameterizedTest
@EnumSource
void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup transform
TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.table1",
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
"col1 <> '3'",
"col1",
"col12",
"key1=value1",
"");

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
new ArrayList<>(Collections.singletonList(transformDef)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
}

@ParameterizedTest
@EnumSource
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
Expand All @@ -462,25 +462,25 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}");

// generate binlogs
String mysqlJdbcUrl =
Expand All @@ -492,9 +492,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
insertBinlogEvents(mysqlJdbcUrl);

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}");
}

private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
Expand Down Expand Up @@ -389,13 +390,15 @@ private Optional<DataChangeEvent> processFilter(
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
// insert and update event only process afterData, delete only process beforeData
if (after != null) {
if (transformFilterProcessor.process(after, epochTime)) {
if (transformFilterProcessor.process(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
}
} else if (before != null) {
if (transformFilterProcessor.process(before, epochTime)) {
if (transformFilterProcessor.process(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
Expand All @@ -412,11 +415,14 @@ private Optional<DataChangeEvent> processProjection(
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
postTransformProcessor.processData(before, epochTime);
postTransformProcessor.processData(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'));
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
}
if (after != null) {
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
BinaryRecordData projectedAfter =
postTransformProcessor.processData(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'));
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
}
return Optional.of(dataChangeEvent);
Expand Down Expand Up @@ -499,4 +505,8 @@ private void destroyUdf() {
}
});
}

private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
return String.format("%c%c", beforeOrAfter, opType.name().charAt(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;

import org.codehaus.janino.ExpressionEvaluator;
Expand All @@ -33,6 +33,8 @@
import java.util.LinkedHashSet;
import java.util.List;

import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;

/**
* The processor of the projection column. It processes the data column and the user-defined
* computed columns.
Expand Down Expand Up @@ -79,9 +81,9 @@ public ProjectionColumn getProjectionColumn() {
return projectionColumn;
}

public Object evaluate(BinaryRecordData after, long epochTime) {
public Object evaluate(BinaryRecordData record, long epochTime, String opType) {
try {
return expressionEvaluator.evaluate(generateParams(after, epochTime));
return expressionEvaluator.evaluate(generateParams(record, epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} column:{} projection:{} execute failed. {}",
Expand All @@ -93,7 +95,7 @@ public Object evaluate(BinaryRecordData after, long epochTime) {
}
}

private Object[] generateParams(BinaryRecordData after, long epochTime) {
private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();

Expand All @@ -103,15 +105,18 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
case TransformParser.DEFAULT_SCHEMA_NAME:
case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
case TransformParser.DEFAULT_TABLE_NAME:
case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
params.add(opType);
continue;
}

boolean argumentFound = false;
Expand All @@ -120,7 +125,7 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
if (column.getName().equals(originalColumnName)) {
params.add(
DataTypeConverter.convertToOriginal(
fieldGetters[i].getFieldOrNull(after), column.getType()));
fieldGetters[i].getFieldOrNull(record), column.getType()));
argumentFound = true;
break;
}
Expand Down Expand Up @@ -158,20 +163,14 @@ private TransformExpressionKey generateTransformExpressionKey() {
}

for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_SCHEMA_NAME:
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_TABLE_NAME:
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
paramTypes.add(String.class);
break;
}
METADATA_COLUMNS.stream()
.filter(col -> col.f0.equals(originalColumnName))
.findFirst()
.ifPresent(
col -> {
argumentNames.add(col.f0);
paramTypes.add(col.f2);
});
}

argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
Expand Down
Loading

0 comments on commit 565942b

Please sign in to comment.