Skip to content

Commit

Permalink
[cdc-connector][mysql] Support metadata 'operation' virtual column fo…
Browse files Browse the repository at this point in the history
…r MySql Connector. (apache#2913)
  • Loading branch information
YesOrNo828 authored Dec 22, 2023
1 parent 6b6d9fc commit 8eba910
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 10 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ Flink SQL> SELECT * FROM orders;
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>当前记录表在数据库中更新的时间。 <br>如果从表的快照而不是 binlog 读取记录,该值将始终为0。</td>
</tr>
<tr>
<td>op</td>
<td>STRING NOT NULL</td>
<td>当前记录对应的操作类型。 <br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。</td>
</tr>
</tbody>
</table>

Expand All @@ -378,6 +383,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
Expand All @@ -402,6 +408,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
Expand Down
7 changes: 7 additions & 0 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the binlog, the value is always 0.</td>
</tr>
<tr>
<td>op</td>
<td>STRING NOT NULL</td>
<td>It indicates the operation type of the row. <br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody>
</table>

Expand All @@ -386,6 +391,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
Expand All @@ -410,6 +416,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
public void collect(RowData physicalRow) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
Object meta = metadataConverters[i].read(inputRecord);
MetadataConverter metadataConverter = metadataConverters[i];
Object meta;
if (metadataConverter instanceof MetadataWithRowDataConverter) {
meta =
((MetadataWithRowDataConverter) metadataConverter)
.read(inputRecord, physicalRow);
} else {
meta = metadataConverter.read(inputRecord);
}

metaRow.setField(i, meta);
}
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.debezium.table;

import org.apache.flink.table.data.RowData;

import org.apache.kafka.connect.source.SourceRecord;

/**
* A converter converts {@link SourceRecord} metadata and {@link RowData} into Flink internal data
* structures.
*/
public interface MetadataWithRowDataConverter extends MetadataConverter {
Object read(SourceRecord record, RowData rowData);

default Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"This method should never be called, please call the read(SourceRecord, RowData) method instead.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.ververica.cdc.connectors.mysql.table;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;

import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.MetadataWithRowDataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -78,6 +80,22 @@ public Object read(SourceRecord record) {
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),

/**
* It indicates the operation type of the row. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
OP_TYPE(
"op",
DataTypes.STRING().notNull(),
new MetadataWithRowDataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record, RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}
});

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ public void testMetadataColumns() throws Exception {
"CREATE TABLE mysql_users ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " op STRING METADATA FROM 'op' VIRTUAL,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
Expand Down Expand Up @@ -967,6 +968,7 @@ public void testMetadataColumns() throws Exception {
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " op STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
Expand Down Expand Up @@ -1004,15 +1006,15 @@ public void testMetadataColumns() throws Exception {

List<String> expected =
Stream.of(
"+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, [email protected], null]",
"+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]",
"+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]",
"+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, [email protected], null]",
"+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, [email protected], null]",
"+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]",
"-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, [email protected], null]",
"-U[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, [email protected], null]",
"-U[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]")
"+I[%s, user_table_1_1, +I, 111, user_111, Shanghai, 123567891234, [email protected], null]",
"+I[%s, user_table_1_2, +I, 121, user_121, Shanghai, 123567891234, null, null]",
"+I[%s, user_table_1_2, +I, 200, user_200, Wuhan, 123567891234, null, null]",
"+I[%s, user_table_1_1, +I, 300, user_300, Hangzhou, 123567891234, [email protected], null]",
"+U[%s, user_table_1_1, +U, 300, user_300, Beijing, 123567891234, [email protected], null]",
"+U[%s, user_table_1_2, +U, 121, user_121, Shanghai, 88888888, null, null]",
"-D[%s, user_table_1_1, -D, 111, user_111, Shanghai, 123567891234, [email protected], null]",
"-U[%s, user_table_1_1, -U, 300, user_300, Hangzhou, 123567891234, [email protected], null]",
"-U[%s, user_table_1_2, -U, 121, user_121, Shanghai, 123567891234, null, null]")
.map(s -> String.format(s, userDatabase1.getDatabaseName()))
.sorted()
.collect(Collectors.toList());
Expand Down

0 comments on commit 8eba910

Please sign in to comment.