From 72b54ec721d35fd0c9a04e687f6efde5ca457f1a Mon Sep 17 00:00:00 2001 From: humengyu Date: Wed, 31 May 2023 13:39:15 +0800 Subject: [PATCH] Support native canal format (#265) * Support native canal format --- .github/workflows/flink-stream-test.yml | 4 +- README.md | 4 +- README_zh_CN.md | 4 +- flink/README_unified_batch_streaming.md | 27 +- flink/README_unified_batch_streaming_zh_CN.md | 27 +- flink/flink-1.13/pom.xml | 29 ++ .../connector/source/CDCSourceBuilder.java | 30 +- .../connector/source/SnapshotSource.java | 3 +- .../flink/connector/source/TiDBOptions.java | 13 +- .../connector/source/TiDBSourceBuilder.java | 14 +- .../canal/CanalDeserializationSchema.java | 261 ++++++++++++++++++ .../canal/CanalJsonDeserializationSchema.java | 48 ++++ .../canal/CanalMessageDeserializer.java | 77 ++++++ .../CanalProtobufDeserializationSchema.java | 46 +++ .../flink/format/canal/FlatMessage.java | 243 ++++++++++++++++ .../flink/format/canal/MQMessageUtils.java | 206 ++++++++++++++ .../cdc/CDCDeserializationSchemaBuilder.java | 22 +- .../TiDBCanalJsonDeserializationSchema.java | 2 +- .../canal/CanalDeserializationSchemaTest.java | 240 ++++++++++++++++ flink/flink-1.14/pom.xml | 29 ++ .../bigdata/flink/connector/TiDBOptions.java | 13 +- .../connector/source/CDCSourceBuilder.java | 30 +- .../connector/source/SnapshotSource.java | 3 +- .../connector/source/TiDBSourceBuilder.java | 15 +- .../canal/CanalDeserializationSchema.java | 259 +++++++++++++++++ .../canal/CanalJsonDeserializationSchema.java | 48 ++++ .../canal/CanalMessageDeserializer.java | 77 ++++++ .../CanalProtobufDeserializationSchema.java | 46 +++ .../flink/format/canal/FlatMessage.java | 243 ++++++++++++++++ .../flink/format/canal/MQMessageUtils.java | 206 ++++++++++++++ .../cdc/CDCDeserializationSchemaBuilder.java | 22 +- .../TiDBCanalJsonDeserializationSchema.java | 1 + .../canal/CanalDeserializationSchemaTest.java | 240 ++++++++++++++++ .../io/tidb/bigdata/tidb/ClientSession.java | 11 + 34 files changed, 2469 insertions(+), 74 deletions(-) create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java create mode 100644 flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java create mode 100644 flink/flink-1.13/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java create mode 100644 flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java create mode 100644 flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java diff --git a/.github/workflows/flink-stream-test.yml b/.github/workflows/flink-stream-test.yml index f4fc4194..885e43ca 100644 --- a/.github/workflows/flink-stream-test.yml +++ b/.github/workflows/flink-stream-test.yml @@ -37,7 +37,9 @@ jobs: kafka/bin/kafka-server-start.sh kafka/config/server.properties & - name: deploy ticdc - run: /home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc & + run: | + /home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc & + sleep 10 - name: create cdc changefeed run: | diff --git a/README.md b/README.md index 3590846b..179bfcb1 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format **Compiling TiBigData requires git and downloading source code directly is not supported.** -[Flink-TiDB-Connector](./flink/README.md) +[Flink-TiDB-Connector(Batch)](./flink/README.md) + +[Flink-TiDB-Connector(Unified Batch & Streaming)](./flink/README_unified_batch_streaming.md) [PrestoSQL-TiDB-Connector](./prestosql/README.md) diff --git a/README_zh_CN.md b/README_zh_CN.md index 5632f074..6924982f 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format **编译 TiBigData 需要 git, 直接下载源码编译暂不支持。** -[TiDB 与 Flink 集成](./flink/README_zh_CN.md) +[TiDB 与 Flink 集成(批处理)](./flink/README_zh_CN.md) + +[TiDB 与 Flink 集成(流批一体)](./flink/README_unified_batch_streaming_zh_CN.md) [TiDB 与 PrestoSQL 集成 ***- 已废弃***](./prestosql/README_zh_CN.md) diff --git a/flink/README_unified_batch_streaming.md b/flink/README_unified_batch_streaming.md index 3c67868f..cbc85ecf 100644 --- a/flink/README_unified_batch_streaming.md +++ b/flink/README_unified_batch_streaming.md @@ -166,25 +166,26 @@ Keypoints In addition to supporting the configuration in [TiDB Batch Mode](./README.md), the streaming mode adds the following configuration: -| Configuration | Default Value | Description | -|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------------------------------------------| -| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. | -| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. | -| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json. See [Codec](#8-Codec) | -| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address | -| tidb.streaming.kafka.topic | - | Kafka topic | -| tidb.streaming.kafka.group.id | - | Kafka group id | -| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure | -| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) | -| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` | +| Configuration | Default Value | Description | +|:---------------------------------------|:--------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. | +| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. | +| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json, canal-protobuf. See [Codec](#8-Codec) | +| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address | +| tidb.streaming.kafka.topic | - | Kafka topic | +| tidb.streaming.kafka.group.id | - | Kafka group id | +| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure | +| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) | +| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` | ## 8 Codec -TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json. +TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json, canal-protobuf. 1. json is the default implementation of TiCDC and is highly readable; 2. craft sacrifices readability, is fully binary encoded, has higher compression, and requires a high version of TiDB(5.x). Currently, craft is still incubating, but it is working properly; -3. canal-json is compatible with canal and must be used with the TiDB extension field enabled to read `commitTs`. Lower versions of TiDB do not have `commitTs`, so it cannot be used. +3. canal-json is the compatibility for the canal json format; +4. canal-protobuf is the compatibility for the canal protobuf format; ## 9 TiDB Metadata diff --git a/flink/README_unified_batch_streaming_zh_CN.md b/flink/README_unified_batch_streaming_zh_CN.md index 2728603b..56a0988f 100644 --- a/flink/README_unified_batch_streaming_zh_CN.md +++ b/flink/README_unified_batch_streaming_zh_CN.md @@ -168,25 +168,26 @@ DELETE FROM `test`.`source_table` WHERE id = 1 or id = 2; 除了支持 [TiDB 批模式](./README_zh_CN.md) 中的配置外,流模式新增了以下配置: -| Configuration | Default Value | Description | -|:---------------------------------------|:--------------|:-----------------------------------------------------------------------------------------------| -| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 | -| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 | -| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json 三种格式,详细信息参考 [Codec](#8-Codec) | -| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 | -| tidb.streaming.kafka.topic | - | Kafka topic | -| tidb.streaming.kafka.group.id | - | Kafka group id | -| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 | -| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) | -| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 | +| Configuration | Default Value | Description | +|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------| +| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 | +| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 | +| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf 四种格式,详细信息参考 [Codec](#8-Codec) | +| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 | +| tidb.streaming.kafka.topic | - | Kafka topic | +| tidb.streaming.kafka.group.id | - | Kafka group id | +| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 | +| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) | +| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 | ## 8 Codec -TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json. +TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf. 1. json 是 TiCDC 的默认实现,具有很强的可读性; 2. craft 牺牲了可读性,是完全二进制的编码方式,具有更高的压缩率,需要高版本 TiDB(5.x),当前还在孵化中,但是已经能够正常使用; -3. canal-json 是对 canal 的兼容,使用时必须开启 TiDB 扩展字段以读取 commitTs,低版本的 TiDB 没有这个字段,所以不能使用。 +3. canal-json 是对 canal json 格式的兼容; +4. canal-protobuf 是对 canal protobuf 格式的兼容。 ## 9 TiDB Metadata diff --git a/flink/flink-1.13/pom.xml b/flink/flink-1.13/pom.xml index b22b10ba..73a0acbd 100644 --- a/flink/flink-1.13/pom.xml +++ b/flink/flink-1.13/pom.xml @@ -16,6 +16,7 @@ 1.13.0 + 1.1.6 @@ -43,6 +44,26 @@ commons-beanutils ${dep.apache.commons.version} + + + com.alibaba.otter + canal.protocol + ${dep.canal.protocol.version} + + + fastjson + com.alibaba + + + logback-classic + ch.qos.logback + + + logback-core + ch.qos.logback + + + org.apache.flink @@ -132,6 +153,14 @@ oshi io.tidb.bigdata.telemetry.shade.oshi + + com.google.protobuf + io.tidb.bigdata.shade.com.google.protobuf + + + com.alibaba.otter.canal + io.tidb.bigdata.shade.com.alibaba.otter.canal + ${project.artifactId}-${project.version} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java index a42d85e1..7608d73b 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableSet; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.table.data.RowData; @@ -55,6 +56,10 @@ public CDCSource canalJson() { return doBuild(builder.canalJson()); } + public CDCSource canalProtobuf() { + return doBuild(builder.canalProtobuf()); + } + private final CDCDeserializationSchemaBuilder builder; protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) { @@ -62,14 +67,23 @@ protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) { } public static KafkaCDCSourceBuilder kafka( - String database, String table, TiTimestamp ts, TiDBSchemaAdapter schema) { - return new KafkaCDCSourceBuilder( - new CDCDeserializationSchemaBuilder( - schema.getPhysicalRowDataType(), schema.getCDCMetadata()) - .startTs(ts.getVersion()) - .types(ROW_CHANGED_EVENT) - .schemas(ImmutableSet.of(database)) - .tables(ImmutableSet.of(table))); + String database, + String table, + TiTimestamp ts, + TiDBSchemaAdapter schema, + long startingOffsetTs) { + KafkaCDCSourceBuilder kafkaCDCSourceBuilder = + new KafkaCDCSourceBuilder( + new CDCDeserializationSchemaBuilder( + schema.getPhysicalRowDataType(), schema.getCDCMetadata()) + .startTs(ts.getVersion()) + .types(ROW_CHANGED_EVENT) + .schemas(ImmutableSet.of(database)) + .tables(ImmutableSet.of(table))); + if (startingOffsetTs > 0) { + kafkaCDCSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startingOffsetTs)); + } + return kafkaCDCSourceBuilder; } @SuppressWarnings("unchecked") diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java index 13c0988e..fccf4ac8 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java @@ -85,7 +85,8 @@ public SnapshotSource( tiTableInfo, Arrays.asList(schema.getPhysicalFieldNamesWithoutMeta())); this.timestamp = getOptionalVersion() - .orElseGet(() -> getOptionalTimestamp().orElseGet(session::getSnapshotVersion)); + .orElseGet( + () -> getOptionalTimestamp().orElseGet(session::getApproximateSnapshotVersion)); TableHandleInternal tableHandleInternal = new TableHandleInternal(this.databaseName, tiTableInfo); this.splits = diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java index 316b9d7a..8a618731 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java @@ -83,8 +83,13 @@ private static ConfigOption optional(String key) { public static final String STREAMING_CODEC_JSON = "json"; public static final String STREAMING_CODEC_CRAFT = "craft"; public static final String STREAMING_CODEC_CANAL_JSON = "canal-json"; + public static final String STREAMING_CODEC_CANAL_PROTOBUF = "canal-protobuf"; public static final Set VALID_STREAMING_CODECS = - ImmutableSet.of(STREAMING_CODEC_CRAFT, STREAMING_CODEC_JSON, STREAMING_CODEC_CANAL_JSON); + ImmutableSet.of( + STREAMING_CODEC_CRAFT, + STREAMING_CODEC_JSON, + STREAMING_CODEC_CANAL_JSON, + STREAMING_CODEC_CANAL_PROTOBUF); // Options for catalog public static final ConfigOption IGNORE_PARSE_ERRORS = @@ -118,6 +123,12 @@ private static ConfigOption optional(String key) { public static final String METADATA_INCLUDED = "tidb.metadata.included"; public static final String METADATA_INCLUDED_ALL = "*"; + // During the streaming phase, set a timestamp to avoid consuming expired data, + // with the default being one minute subtracted from the snapshot time. + // Additionally, we can set this value to less than or equal to 0 to disable this feature. + public static final String STARTING_OFFSETS_TS = "tidb.streaming.source.starting.offsets.ts"; + public static final long MINUTE = 60 * 1000L; + public static Set> requiredOptions() { return withMoreRequiredOptions(); } diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java index 94193cdb..49ef06d8 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java @@ -20,6 +20,7 @@ import static io.tidb.bigdata.flink.connector.source.TiDBOptions.IGNORE_PARSE_ERRORS; import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC; import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CANAL_JSON; +import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CANAL_PROTOBUF; import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CRAFT; import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_JSON; import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_SOURCE; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.connector.base.source.hybrid.HybridSource; @@ -102,9 +104,17 @@ private TiDBSourceBuilder setProperties(Map properties) { return this; } + private long getStartingOffsetTs(TiTimestamp timestamp) { + return Optional.ofNullable(properties.get(TiDBOptions.STARTING_OFFSETS_TS)) + .filter(StringUtils::isNotEmpty) + .map(Long::parseLong) + .orElse(timestamp.getPhysical() - TiDBOptions.MINUTE); + } + private CDCSourceBuilder createCDCBuilder(TiTimestamp timestamp) { if (streamingSource.equals(STREAMING_SOURCE_KAFKA)) { - return CDCSourceBuilder.kafka(databaseName, tableName, timestamp, schema) + return CDCSourceBuilder.kafka( + databaseName, tableName, timestamp, schema, getStartingOffsetTs(timestamp)) .ignoreParseErrors(ignoreParseErrors) .setProperties(properties); } else { @@ -150,6 +160,8 @@ private TiDBSourceBuilder setProperties(Map properties) { return cdcBuilder.json(); case STREAMING_CODEC_CANAL_JSON: return cdcBuilder.canalJson(); + case STREAMING_CODEC_CANAL_PROTOBUF: + return cdcBuilder.canalProtobuf(); default: throw new IllegalArgumentException( "Invalid streaming codec: '" + streamingCodec + "'"); diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java new file mode 100644 index 00000000..5399ec77 --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java @@ -0,0 +1,261 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import io.tidb.bigdata.flink.connector.source.TiDBMetadata; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.meta.TiTimestamp; + +public abstract class CanalDeserializationSchema implements DeserializationSchema { + + private static final Logger LOG = LoggerFactory.getLogger(CanalDeserializationSchema.class); + + private static final String INSERT = "INSERT"; + private static final String UPDATE = "UPDATE"; + private static final String DELETE = "DELETE"; + private static final String CREATE = "CREATE"; + private static final String QUERY = "QUERY"; + private static final String TIDB_WATERMARK = "TIDB_WATERMARK"; + + public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + protected final DataType physicalDataType; + protected final Set schemas; + protected final Set tables; + protected final CDCMetadata[] metadata; + protected final long startTs; + protected final TypeInformation producedTypeInfo; + protected final boolean ignoreParseErrors; + + protected final List fieldNames; + protected final List dataTypes; + + protected CanalDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + this.physicalDataType = physicalDataType; + this.schemas = schemas; + this.tables = tables; + this.metadata = metadata; + this.startTs = startTs; + Preconditions.checkArgument( + createTiTimestampFromVersion(startTs).getLogical() == 0, + "The logical ts must be 0 for canal format"); + this.producedTypeInfo = producedTypeInfo; + this.ignoreParseErrors = ignoreParseErrors; + RowType physicalRowType = ((RowType) physicalDataType.getLogicalType()); + this.fieldNames = new ArrayList<>(physicalRowType.getFieldNames()); + this.dataTypes = new ArrayList<>(physicalDataType.getChildren()); + } + + @Override + public final RowData deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException( + "Please invoke deserialize(byte[] message, Collector out)"); + } + + @Override + public void deserialize(byte[] message, Collector out) throws IOException { + List flatMessages; + try { + flatMessages = decodeToFlatMessage(message); + } catch (Exception e) { + if (ignoreParseErrors) { + LOG.warn("Can not decode to flatMessage", e); + return; + } else { + throw new IOException(e); + } + } + for (FlatMessage flatMessage : flatMessages) { + deserialize(flatMessage, out); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + public abstract List decodeToFlatMessage(byte[] data) throws Exception; + + public void deserialize(FlatMessage flatMessage, Collector out) { + if (!schemas.contains(flatMessage.getDatabase())) { + return; + } + if (!tables.contains(flatMessage.getTable())) { + return; + } + TiTimestamp tiTimestamp = createTiTimestampFromTs(flatMessage.getEs()); + if (startTs > tiTimestamp.getVersion()) { + return; + } + List> data = flatMessage.getData(); + List> old = flatMessage.getOld(); + String type = flatMessage.getType(); + switch (type) { + case CREATE: + case QUERY: + case TIDB_WATERMARK: + break; + case INSERT: + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.INSERT)) + .forEach(out::collect); + break; + case DELETE: + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.DELETE)) + .forEach(out::collect); + break; + case UPDATE: + old.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_BEFORE)) + .forEach(out::collect); + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_AFTER)) + .forEach(out::collect); + break; + default: + if (!ignoreParseErrors) { + throw new IllegalArgumentException( + String.format( + "Unknown \"type\" value \"%s\". The Canal message is '%s'", type, flatMessage)); + } + } + } + + protected RowData toRowData(TiTimestamp timestamp, Map values, RowKind rowKind) { + // ignore case + values = + values.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Entry::getValue)); + GenericRowData rowData = new GenericRowData(rowKind, fieldNames.size() + metadata.length); + for (int i = 0; i < fieldNames.size(); i++) { + String name = fieldNames.get(i); + DataType dataType = dataTypes.get(i); + String value = values.get(name); + rowData.setField(i, convertValue(value, dataType)); + } + // concat metadata + for (int i = 0; i < metadata.length; i++) { + CDCMetadata cdcMetadata = metadata[i]; + TiDBMetadata tiDBMetadata = + cdcMetadata + .toTiDBMetadata() + .orElseThrow( + () -> new IllegalArgumentException("Unsupported metadata: " + cdcMetadata)); + if (cdcMetadata == CDCMetadata.SOURCE_EVENT) { + rowData.setField(fieldNames.size() + i, StringData.fromString(CDCMetadata.STREAMING)); + continue; + } + rowData.setField(fieldNames.size() + i, tiDBMetadata.extract(timestamp)); + } + return rowData; + } + + public static Object convertValue(String value, DataType dataType) { + if (value == null) { + return null; + } + LogicalType logicalType = dataType.getLogicalType(); + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return Boolean.parseBoolean(value); + case TINYINT: + return Byte.parseByte(value); + case SMALLINT: + return Short.parseShort(value); + case INTEGER: + return Integer.parseInt(value); + case BIGINT: + return Long.parseLong(value); + case VARCHAR: + return StringData.fromString(value); + case VARBINARY: + return value.getBytes(); + case FLOAT: + return Float.parseFloat(value); + case DOUBLE: + return Double.parseDouble(value); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return DecimalData.fromBigDecimal( + new BigDecimal(value), decimalType.getPrecision(), decimalType.getScale()); + case DATE: + return (int) LocalDate.parse(value, DATE_FORMATTER).toEpochDay(); + case TIME_WITHOUT_TIME_ZONE: + return (int) (LocalTime.parse(value, TIME_FORMATTER).toNanoOfDay() / 1000 / 1000); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampData.fromTimestamp( + Timestamp.valueOf(LocalDateTime.parse(value, DATE_TIME_FORMATTER))); + default: + throw new IllegalStateException("Unsupported type root: " + logicalType.getTypeRoot()); + } + } + + public static TiTimestamp createTiTimestampFromVersion(long version) { + return new TiTimestamp(version >> 18, version & 0x3FFFF); + } + + public static TiTimestamp createTiTimestampFromTs(long ts) { + return new TiTimestamp(ts, 0); + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java new file mode 100644 index 00000000..093336d8 --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.util.List; +import java.util.Set; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +public class CanalJsonDeserializationSchema extends CanalDeserializationSchema { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public CanalJsonDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + super( + physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors); + } + + @Override + public List decodeToFlatMessage(byte[] data) throws Exception { + return ImmutableList.of(MAPPER.readValue(data, FlatMessage.class)); + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java new file mode 100644 index 00000000..c3cfcfbe --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.alibaba.otter.canal.protocol.CanalPacket.Ack; +import com.alibaba.otter.canal.protocol.CanalPacket.Compression; +import com.alibaba.otter.canal.protocol.CanalPacket.Messages; +import com.alibaba.otter.canal.protocol.CanalPacket.Packet; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.google.protobuf.ByteString; +import java.util.Iterator; + +public class CanalMessageDeserializer { + + public static Message deserializer(byte[] data) { + return deserializer(data, false); + } + + public static Message deserializer(byte[] data, boolean lazyParseEntry) { + try { + if (data == null) { + return null; + } else { + Packet p = Packet.parseFrom(data); + switch (p.getType()) { + case MESSAGES: + if (!p.getCompression().equals(Compression.NONE) + && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) { + throw new CanalClientException("compression is not supported in this connector"); + } + + Messages messages = Messages.parseFrom(p.getBody()); + Message result = new Message(messages.getBatchId()); + if (lazyParseEntry) { + result.setRawEntries(messages.getMessagesList()); + result.setRaw(true); + } else { + Iterator var5 = messages.getMessagesList().iterator(); + + while (var5.hasNext()) { + ByteString byteString = (ByteString) var5.next(); + result.addEntry(Entry.parseFrom(byteString)); + } + + result.setRaw(false); + } + + return result; + case ACK: + Ack ack = Ack.parseFrom(p.getBody()); + throw new CanalClientException( + "something goes wrong with reason: " + ack.getErrorMessage()); + default: + throw new CanalClientException("unexpected packet type: " + p.getType()); + } + } + } catch (Exception var7) { + throw new CanalClientException("deserializer failed", var7); + } + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java new file mode 100644 index 00000000..bc536453 --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.protocol.Message; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.util.List; +import java.util.Set; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +public class CanalProtobufDeserializationSchema extends CanalDeserializationSchema { + + public CanalProtobufDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + super( + physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors); + } + + @Override + public List decodeToFlatMessage(byte[] data) throws Exception { + Message message = CanalMessageDeserializer.deserializer(data); + return MQMessageUtils.getFlatMessages(message); + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java new file mode 100644 index 00000000..822da53a --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java @@ -0,0 +1,243 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FlatMessage implements Serializable { + + private long id; + private String database; + private String table; + private List pkNames; + private Boolean isDdl; + private String type; + // binlog executeTime + private Long es; + // dml build timeStamp + private Long ts; + private String sql; + private Map sqlType; + private Map mysqlType; + private List> data; + private List> old; + + @JsonProperty("_tidb") + private TiDB tidb; + + public FlatMessage() {} + + public FlatMessage(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public List getPkNames() { + return pkNames; + } + + public void addPkName(String pkName) { + if (this.pkNames == null) { + this.pkNames = new ArrayList<>(); + } + this.pkNames.add(pkName); + } + + public void setPkNames(List pkNames) { + this.pkNames = pkNames; + } + + public Boolean getIsDdl() { + return isDdl; + } + + public void setIsDdl(Boolean isDdl) { + this.isDdl = isDdl; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Long getTs() { + return ts; + } + + public void setTs(Long ts) { + this.ts = ts; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public Map getSqlType() { + return sqlType; + } + + public void setSqlType(Map sqlType) { + this.sqlType = sqlType; + } + + public Map getMysqlType() { + return mysqlType; + } + + public void setMysqlType(Map mysqlType) { + this.mysqlType = mysqlType; + } + + public List> getData() { + return data; + } + + public void setData(List> data) { + this.data = data; + } + + public List> getOld() { + return old; + } + + public void setOld(List> old) { + this.old = old; + } + + public Long getEs() { + return es; + } + + public void setEs(Long es) { + this.es = es; + } + + public Boolean getDdl() { + return isDdl; + } + + public void setDdl(Boolean ddl) { + isDdl = ddl; + } + + public TiDB getTidb() { + return tidb; + } + + public void setTidb(TiDB tidb) { + this.tidb = tidb; + } + + @Override + public String toString() { + return "FlatMessage{" + + "id=" + + id + + ", database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", pkNames=" + + pkNames + + ", isDdl=" + + isDdl + + ", type='" + + type + + '\'' + + ", es=" + + es + + ", ts=" + + ts + + ", sql='" + + sql + + '\'' + + ", sqlType=" + + sqlType + + ", mysqlType=" + + mysqlType + + ", data=" + + data + + ", old=" + + old + + ", tidb=" + + tidb + + '}'; + } + + public static class TiDB { + + private Long commitTs; + private Long watermarkTs; + + public Long getCommitTs() { + return commitTs; + } + + public void setCommitTs(Long commitTs) { + this.commitTs = commitTs; + } + + public Long getWatermarkTs() { + return watermarkTs; + } + + public void setWatermarkTs(Long watermarkTs) { + this.watermarkTs = watermarkTs; + } + + @Override + public String toString() { + return "TiDB{" + "commitTs=" + commitTs + ", watermarkTs=" + watermarkTs + '}'; + } + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java new file mode 100644 index 00000000..f5cbbda2 --- /dev/null +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.common.utils.ExecutorTemplate; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; +import com.alibaba.otter.canal.protocol.Message; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import javax.annotation.Nullable; + +public class MQMessageUtils { + + public static List getFlatMessages(Message message) { + EntryRowData[] datas = MQMessageUtils.buildMessageData(message, null); + return MQMessageUtils.messageConverter(datas, message.getId()); + } + + public static EntryRowData[] buildMessageData( + Message message, @Nullable ThreadPoolExecutor executor) { + Optional template = Optional.ofNullable(executor).map(ExecutorTemplate::new); + if (message.isRaw()) { + List rawEntries = message.getRawEntries(); + final EntryRowData[] datas = new EntryRowData[rawEntries.size()]; + int i = 0; + for (ByteString byteString : rawEntries) { + final int index = i; + Runnable task = + () -> { + try { + Entry entry = Entry.parseFrom(byteString); + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + datas[index] = new EntryRowData(); + datas[index].entry = entry; + datas[index].rowChange = rowChange; + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }; + if (template.isPresent()) { + template.get().submit(task); + } else { + task.run(); + } + i++; + } + template.ifPresent(ExecutorTemplate::waitForResult); + return datas; + } else { + final EntryRowData[] datas = new EntryRowData[message.getEntries().size()]; + int i = 0; + for (Entry entry : message.getEntries()) { + final int index = i; + Runnable task = + () -> { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + datas[index] = new EntryRowData(); + datas[index].entry = entry; + datas[index].rowChange = rowChange; + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }; + if (template.isPresent()) { + template.get().submit(task); + } else { + task.run(); + } + i++; + } + template.ifPresent(ExecutorTemplate::waitForResult); + return datas; + } + } + + public static List messageConverter(EntryRowData[] datas, long id) { + List flatMessages = new ArrayList<>(); + for (EntryRowData entryRowData : datas) { + Entry entry = entryRowData.entry; + RowChange rowChange = entryRowData.rowChange; + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN + || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + continue; + } + + // build flatMessage + CanalEntry.EventType eventType = rowChange.getEventType(); + FlatMessage flatMessage = new FlatMessage(id); + flatMessages.add(flatMessage); + flatMessage.setDatabase(entry.getHeader().getSchemaName()); + flatMessage.setTable(entry.getHeader().getTableName()); + flatMessage.setIsDdl(rowChange.getIsDdl()); + flatMessage.setType(eventType.toString()); + flatMessage.setEs(entry.getHeader().getExecuteTime()); + flatMessage.setTs(System.currentTimeMillis()); + flatMessage.setSql(rowChange.getSql()); + + if (!rowChange.getIsDdl()) { + Map sqlType = new LinkedHashMap<>(); + Map mysqlType = new LinkedHashMap<>(); + List> data = new ArrayList<>(); + List> old = new ArrayList<>(); + + Set updateSet = new HashSet<>(); + boolean hasInitPkNames = false; + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + if (eventType != CanalEntry.EventType.INSERT + && eventType != CanalEntry.EventType.UPDATE + && eventType != CanalEntry.EventType.DELETE) { + continue; + } + + Map row = new LinkedHashMap<>(); + List columns; + + if (eventType == CanalEntry.EventType.DELETE) { + columns = rowData.getBeforeColumnsList(); + } else { + columns = rowData.getAfterColumnsList(); + } + + for (CanalEntry.Column column : columns) { + if (!hasInitPkNames && column.getIsKey()) { + flatMessage.addPkName(column.getName()); + } + sqlType.put(column.getName(), column.getSqlType()); + mysqlType.put(column.getName(), column.getMysqlType()); + if (column.getIsNull()) { + row.put(column.getName(), null); + } else { + row.put(column.getName(), column.getValue()); + } + if (column.getUpdated()) { + updateSet.add(column.getName()); + } + } + + hasInitPkNames = true; + if (!row.isEmpty()) { + data.add(row); + } + + if (eventType == CanalEntry.EventType.UPDATE) { + Map rowOld = new LinkedHashMap<>(); + for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { + if (updateSet.contains(column.getName())) { + if (column.getIsNull()) { + rowOld.put(column.getName(), null); + } else { + rowOld.put(column.getName(), column.getValue()); + } + } + } + if (!rowOld.isEmpty()) { + old.add(rowOld); + } + } + } + if (!sqlType.isEmpty()) { + flatMessage.setSqlType(sqlType); + } + if (!mysqlType.isEmpty()) { + flatMessage.setMysqlType(mysqlType); + } + if (!data.isEmpty()) { + flatMessage.setData(data); + } + if (!old.isEmpty()) { + flatMessage.setOld(old); + } + } + } + return flatMessages; + } + + public static class EntryRowData { + + public Entry entry; + public RowChange rowChange; + } +} diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java index a52838fc..eecf6058 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java @@ -18,9 +18,11 @@ import com.google.common.base.Preconditions; import io.tidb.bigdata.cdc.Key; +import io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema; +import io.tidb.bigdata.flink.format.canal.CanalJsonDeserializationSchema; +import io.tidb.bigdata.flink.format.canal.CanalProtobufDeserializationSchema; import java.util.Set; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; @@ -87,15 +89,13 @@ public JsonDeserializationSchema json() { } // We use a new decoder, since canal does not follow TiCDC open protocol. - public TiDBCanalJsonDeserializationSchema canalJson() { - return new TiDBCanalJsonDeserializationSchema( - physicalDataType, - schemas, - tables, - metadata, - startTs, - typeInformation, - ignoreParseErrors, - TimestampFormat.SQL); + public CanalDeserializationSchema canalJson() { + return new CanalJsonDeserializationSchema( + physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors); + } + + public CanalDeserializationSchema canalProtobuf() { + return new CanalProtobufDeserializationSchema( + physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors); } } diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java index 41f229c0..41cec3a7 100644 --- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java +++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java @@ -47,7 +47,7 @@ import org.apache.flink.util.Collector; import org.tikv.common.meta.TiTimestamp; -// TODO: use canal native format, rather than json format, to support canal-json/canal-protobuf +@Deprecated public final class TiDBCanalJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; diff --git a/flink/flink-1.13/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java b/flink/flink-1.13/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java new file mode 100644 index 00000000..aede34c3 --- /dev/null +++ b/flink/flink-1.13/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java @@ -0,0 +1,240 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.DATE_FORMATTER; +import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.TIME_FORMATTER; + +import com.google.common.collect.ImmutableSet; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Set; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +public class CanalDeserializationSchemaTest { + + public static final DataType TYPES = + DataTypes.ROW( + DataTypes.FIELD("c1", DataTypes.TINYINT()), + DataTypes.FIELD("c2", DataTypes.SMALLINT()), + DataTypes.FIELD("c3", DataTypes.INT()), + DataTypes.FIELD("c4", DataTypes.INT()), + DataTypes.FIELD("c5", DataTypes.BIGINT()), + DataTypes.FIELD("c6", DataTypes.STRING()), + DataTypes.FIELD("c7", DataTypes.STRING()), + DataTypes.FIELD("c8", DataTypes.STRING()), + DataTypes.FIELD("c9", DataTypes.STRING()), + DataTypes.FIELD("c10", DataTypes.STRING()), + DataTypes.FIELD("c11", DataTypes.STRING()), + DataTypes.FIELD("c12", DataTypes.BYTES()), + DataTypes.FIELD("c13", DataTypes.BYTES()), + DataTypes.FIELD("c14", DataTypes.BYTES()), + DataTypes.FIELD("c15", DataTypes.BYTES()), + DataTypes.FIELD("c16", DataTypes.BYTES()), + DataTypes.FIELD("c17", DataTypes.BYTES()), + DataTypes.FIELD("c18", DataTypes.FLOAT()), + DataTypes.FIELD("c19", DataTypes.DOUBLE()), + DataTypes.FIELD("c20", DataTypes.DECIMAL(6, 3)), + DataTypes.FIELD("c21", DataTypes.DATE()), + DataTypes.FIELD("c22", DataTypes.TIME(0)), + DataTypes.FIELD("c23", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("c24", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("c25", DataTypes.INT()), + DataTypes.FIELD("c26", DataTypes.TINYINT()), + DataTypes.FIELD("c27", DataTypes.STRING()), + DataTypes.FIELD("c28", DataTypes.STRING()), + DataTypes.FIELD("c29", DataTypes.STRING())); + + public static final byte[] JSON_DATA = + "{\"id\":0,\"database\":\"test\",\"table\":\"test_tidb_type\",\"pkNames\":[\"c1\"],\"isDdl\":false,\"type\":\"INSERT\",\"es\":1685085356868,\"ts\":1685085357997,\"sql\":\"\",\"sqlType\":{\"c1\":-6,\"c10\":2005,\"c11\":2005,\"c12\":2004,\"c13\":2004,\"c14\":2004,\"c15\":2004,\"c16\":2004,\"c17\":2004,\"c18\":7,\"c19\":8,\"c2\":5,\"c20\":3,\"c21\":91,\"c22\":92,\"c23\":93,\"c24\":93,\"c25\":12,\"c26\":-6,\"c27\":12,\"c28\":4,\"c29\":-7,\"c3\":4,\"c4\":4,\"c5\":-5,\"c6\":1,\"c7\":12,\"c8\":2005,\"c9\":2005},\"mysqlType\":{\"c1\":\"tinyint\",\"c10\":\"text\",\"c11\":\"longtext\",\"c12\":\"binary\",\"c13\":\"varbinary\",\"c14\":\"tinyblob\",\"c15\":\"mediumblob\",\"c16\":\"blob\",\"c17\":\"longblob\",\"c18\":\"float\",\"c19\":\"double\",\"c2\":\"smallint\",\"c20\":\"decimal\",\"c21\":\"date\",\"c22\":\"time\",\"c23\":\"datetime\",\"c24\":\"timestamp\",\"c25\":\"year\",\"c26\":\"tinyint\",\"c27\":\"json\",\"c28\":\"enum\",\"c29\":\"set\",\"c3\":\"mediumint\",\"c4\":\"int\",\"c5\":\"bigint\",\"c6\":\"char\",\"c7\":\"varchar\",\"c8\":\"tinytext\",\"c9\":\"mediumtext\"},\"data\":[{\"c1\":\"1\",\"c10\":\"texttype\",\"c11\":\"longtexttype\",\"c12\":\"binarytype\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\",\"c13\":\"varbinarytype\",\"c14\":\"tinyblobtype\",\"c15\":\"mediumblobtype\",\"c16\":\"blobtype\",\"c17\":\"longblobtype\",\"c18\":\"1.2339999675750732\",\"c19\":\"2.456789\",\"c2\":\"2\",\"c20\":\"123.456\",\"c21\":\"2023-05-26\",\"c22\":\"15:15:56\",\"c23\":\"2023-05-26 15:15:56\",\"c24\":\"2023-05-26 15:15:56\",\"c25\":\"2020\",\"c26\":\"1\",\"c27\":\"{\\\"a\\\": 1, \\\"b\\\": 2}\",\"c28\":\"1\",\"c29\":\"1\",\"c3\":\"3\",\"c4\":\"4\",\"c5\":\"5\",\"c6\":\"chartype\",\"c7\":\"varchartype\",\"c8\":\"tinytexttype\",\"c9\":\"mediumtexttype\"}],\"old\":null}" + .getBytes(); + public static final RowData JSON_DECODE_RESULT = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 1, + (short) 2, + 3, + 4, + 5L, + StringData.fromString("chartype"), + StringData.fromString("varchartype"), + StringData.fromString("tinytexttype"), + StringData.fromString("mediumtexttype"), + StringData.fromString("texttype"), + StringData.fromString("longtexttype"), + ("binarytype" + new String(new byte[10])).getBytes(), + "varbinarytype".getBytes(), + "tinyblobtype".getBytes(), + "mediumblobtype".getBytes(), + "blobtype".getBytes(), + "longblobtype".getBytes(), + 1.234F, + 2.456789, + DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3), + (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(), + (int) (LocalTime.parse("15:15:56", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)), + 2020, + (byte) 1, + StringData.fromString("{\"a\": 1, \"b\": 2}"), + StringData.fromString("1"), + StringData.fromString("1"), + 441735015790804992L, + TimestampData.fromEpochMillis(441735015790804992L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final byte[] PROTOBUF_DATA = + Base64.getDecoder() + .decode( + "EAEYByr8DRKZCAo6CAEqBVVURi04MP2S87iFMTgCQgR0ZXN0Sg50ZXN0X3RpZGJfdHlwZVgBYg4KCXJvd3NDb3VudBIBMRACGtgHEAFQAGLRBxIhEPr//////////wEaAmMxIAEoATAAQgExUgd0aW55aW50EhcQBRoCYzIoATAAQgEyUghzbWFsbGludBIYEAQaAmMzKAEwAEIBM1IJbWVkaXVtaW50EhIQBBoCYzQoATAAQgE0UgNpbnQSHhD7//////////8BGgJjNSgBMABCATVSBmJpZ2ludBIaEAEaAmM2KAEwAEIIY2hhcnR5cGVSBGNoYXISIBAMGgJjNygBMABCC3ZhcmNoYXJ0eXBlUgd2YXJjaGFyEiMQ1Q8aAmM4KAEwAEIMdGlueXRleHR0eXBlUgh0aW55dGV4dBInENUPGgJjOSgBMABCDm1lZGl1bXRleHR0eXBlUgptZWRpdW10ZXh0EhwQ1Q8aA2MxMCgBMABCCHRleHR0eXBlUgR0ZXh0EiQQ1Q8aA2MxMSgBMABCDGxvbmd0ZXh0dHlwZVIIbG9uZ3RleHQSKhDUDxoDYzEyKAEwAEIUYmluYXJ5dHlwZQAAAAAAAAAAAABSBmJpbmFyeRImENQPGgNjMTMoATAAQg12YXJiaW5hcnl0eXBlUgl2YXJiaW5hcnkSJBDUDxoDYzE0KAEwAEIMdGlueWJsb2J0eXBlUgh0aW55YmxvYhIoENQPGgNjMTUoATAAQg5tZWRpdW1ibG9idHlwZVIKbWVkaXVtYmxvYhIcENQPGgNjMTYoATAAQghibG9idHlwZVIEYmxvYhIkENQPGgNjMTcoATAAQgxsb25nYmxvYnR5cGVSCGxvbmdibG9iEiYQBxoDYzE4KAEwAEISMS4yMzM5OTk5Njc1NzUwNzMyUgVmbG9hdBIdEAgaA2MxOSgBMABCCDIuNDU2Nzg5UgZkb3VibGUSHRADGgNjMjAoATAAQgcxMjMuNDU2UgdkZWNpbWFsEh0QWxoDYzIxKAEwAEIKMjAyMy0wNS0yNlIEZGF0ZRIbEFwaA2MyMigBMABCCDE2OjExOjI0UgR0aW1lEioQXRoDYzIzKAEwAEITMjAyMy0wNS0yNiAxNjoxMToyNFIIZGF0ZXRpbWUSKxBdGgNjMjQoATAAQhMyMDIzLTA1LTI2IDE2OjExOjI0Ugl0aW1lc3RhbXASFxAMGgNjMjUoATAAQgQyMDIwUgR5ZWFyEiAQ+v//////////ARoDYzI2KAEwAEIBMVIHdGlueWludBIjEAwaA2MyNygBMABCEHsiYSI6IDEsICJiIjogMn1SBGpzb24SFBAEGgNjMjgoATAAQgExUgRlbnVtEhwQ+f//////////ARoDYzI5KAEwAEIBMVIDc2V0Et0FCjoIASoFVVRGLTgw/ZLzuIUxOAJCBHRlc3RKDnRlc3RfdGlkYl90eXBlWAFiDgoJcm93c0NvdW50EgExEAIanAUQAVAAYpUFEiEQ+v//////////ARoCYzEgASgBMABCATJSB3RpbnlpbnQSFBAFGgJjMigBMAFSCHNtYWxsaW50EhUQBBoCYzMoATABUgltZWRpdW1pbnQSDxAEGgJjNCgBMAFSA2ludBIbEPv//////////wEaAmM1KAEwAVIGYmlnaW50EhAQARoCYzYoATABUgRjaGFyEhMQDBoCYzcoATABUgd2YXJjaGFyEhUQ1Q8aAmM4KAEwAVIIdGlueXRleHQSFxDVDxoCYzkoATABUgptZWRpdW10ZXh0EhIQ1Q8aA2MxMCgBMAFSBHRleHQSFhDVDxoDYzExKAEwAVIIbG9uZ3RleHQSFBDUDxoDYzEyKAEwAVIGYmluYXJ5EhcQ1A8aA2MxMygBMAFSCXZhcmJpbmFyeRIWENQPGgNjMTQoATABUgh0aW55YmxvYhIYENQPGgNjMTUoATABUgptZWRpdW1ibG9iEhIQ1A8aA2MxNigBMAFSBGJsb2ISFhDUDxoDYzE3KAEwAVIIbG9uZ2Jsb2ISEhAHGgNjMTgoATABUgVmbG9hdBITEAgaA2MxOSgBMAFSBmRvdWJsZRIUEAMaA2MyMCgBMAFSB2RlY2ltYWwSERBbGgNjMjEoATABUgRkYXRlEhEQXBoDYzIyKAEwAVIEdGltZRIVEF0aA2MyMygBMAFSCGRhdGV0aW1lEhYQXRoDYzI0KAEwAVIJdGltZXN0YW1wEhEQDBoDYzI1KAEwAVIEeWVhchIdEPr//////////wEaA2MyNigBMAFSB3RpbnlpbnQSERAMGgNjMjcoATABUgRqc29uEhEQBBoDYzI4KAEwAVIEZW51bRIZEPn//////////wEaA2MyOSgBMAFSA3NldA=="); + + public static final RowData PROTOBUF_DECODE_RESULT0 = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 1, + (short) 2, + 3, + 4, + 5L, + StringData.fromString("chartype"), + StringData.fromString("varchartype"), + StringData.fromString("tinytexttype"), + StringData.fromString("mediumtexttype"), + StringData.fromString("texttype"), + StringData.fromString("longtexttype"), + ("binarytype" + new String(new byte[10])).getBytes(), + "varbinarytype".getBytes(), + "tinyblobtype".getBytes(), + "mediumblobtype".getBytes(), + "blobtype".getBytes(), + "longblobtype".getBytes(), + 1.234F, + 2.456789, + DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3), + (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(), + (int) (LocalTime.parse("16:11:24", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)), + 2020, + (byte) 1, + StringData.fromString("{\"a\": 1, \"b\": 2}"), + StringData.fromString("1"), + StringData.fromString("1"), + 441735888086761472L, + TimestampData.fromEpochMillis(441735888086761472L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final RowData PROTOBUF_DECODE_RESULT1 = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 2, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 441735888086761472L, + TimestampData.fromEpochMillis(441735888086761472L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final Set SCHEMAS = ImmutableSet.of("test"); + + public static final Set TABLES = ImmutableSet.of("test_tidb_type"); + + public static final CDCMetadata[] METADATA = + new CDCMetadata[] { + CDCMetadata.COMMIT_VERSION, CDCMetadata.COMMIT_TIMESTAMP, CDCMetadata.SOURCE_EVENT + }; + + @Test + public void testJsonDecode() throws IOException { + ListCollector collector = new ListCollector(); + CanalJsonDeserializationSchema canalJsonDeserializationSchema = + new CanalJsonDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false); + canalJsonDeserializationSchema.deserialize(JSON_DATA, collector); + RowData row = collector.getRows().get(0); + Assert.assertEquals(row, JSON_DECODE_RESULT); + } + + @Test + public void testProtobufDecode() throws IOException { + ListCollector collector = new ListCollector(); + CanalProtobufDeserializationSchema canalProtobufDeserializationSchema = + new CanalProtobufDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false); + canalProtobufDeserializationSchema.deserialize(PROTOBUF_DATA, collector); + Assert.assertEquals(PROTOBUF_DECODE_RESULT0, collector.getRows().get(0)); + Assert.assertEquals(PROTOBUF_DECODE_RESULT1, collector.getRows().get(1)); + } + + public static class ListCollector implements Collector { + + private final List rows = new ArrayList<>(); + + @Override + public void collect(RowData record) { + rows.add(record); + } + + @Override + public void close() {} + + public List getRows() { + return rows; + } + } +} diff --git a/flink/flink-1.14/pom.xml b/flink/flink-1.14/pom.xml index 523d6924..2e53e8e5 100644 --- a/flink/flink-1.14/pom.xml +++ b/flink/flink-1.14/pom.xml @@ -16,6 +16,7 @@ 1.14.0 + 1.1.6 @@ -43,6 +44,26 @@ commons-beanutils ${dep.apache.commons.version} + + + com.alibaba.otter + canal.protocol + ${dep.canal.protocol.version} + + + fastjson + com.alibaba + + + logback-classic + ch.qos.logback + + + logback-core + ch.qos.logback + + + org.apache.flink @@ -150,6 +171,14 @@ oshi io.tidb.bigdata.telemetry.shade.oshi + + com.google.protobuf + io.tidb.bigdata.shade.com.google.protobuf + + + com.alibaba.otter.canal + io.tidb.bigdata.shade.com.alibaba.otter.canal + ${project.artifactId}-${project.version} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java index a444f8bb..641a23b1 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java @@ -150,8 +150,13 @@ private static ConfigOption optional(String key) { public static final String STREAMING_CODEC_JSON = "json"; public static final String STREAMING_CODEC_CRAFT = "craft"; public static final String STREAMING_CODEC_CANAL_JSON = "canal-json"; + public static final String STREAMING_CODEC_CANAL_PROTOBUF = "canal-protobuf"; public static final Set VALID_STREAMING_CODECS = - ImmutableSet.of(STREAMING_CODEC_CRAFT, STREAMING_CODEC_JSON, STREAMING_CODEC_CANAL_JSON); + ImmutableSet.of( + STREAMING_CODEC_CRAFT, + STREAMING_CODEC_JSON, + STREAMING_CODEC_CANAL_JSON, + STREAMING_CODEC_CANAL_PROTOBUF); // Options for catalog public static final ConfigOption IGNORE_PARSE_ERRORS = @@ -185,6 +190,12 @@ private static ConfigOption optional(String key) { public static final String METADATA_INCLUDED = "tidb.metadata.included"; public static final String METADATA_INCLUDED_ALL = "*"; + // During the streaming phase, set a timestamp to avoid consuming expired data, + // with the default being one minute subtracted from the snapshot time. + // Additionally, we can set this value to less than or equal to 0 to disable this feature. + public static final String STARTING_OFFSETS_TS = "tidb.streaming.source.starting.offsets.ts"; + public static final long MINUTE = 60 * 1000L; + /** * see {@link org.apache.flink.connector.jdbc.table.JdbcConnectorOptions} * diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java index 89d1406f..330c70bd 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.table.data.RowData; import org.tikv.common.meta.TiTimestamp; @@ -55,6 +56,10 @@ public CDCSource canalJson() { return doBuild(builder.canalJson()); } + public CDCSource canalProtobuf() { + return doBuild(builder.canalProtobuf()); + } + private final CDCDeserializationSchemaBuilder builder; protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) { @@ -62,14 +67,23 @@ protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) { } public static KafkaCDCSourceBuilder kafka( - String database, String table, TiTimestamp ts, TiDBSchemaAdapter schema) { - return new KafkaCDCSourceBuilder( - new CDCDeserializationSchemaBuilder( - schema.getPhysicalRowDataType(), schema.getCDCMetadata()) - .startTs(ts.getVersion()) - .types(ROW_CHANGED_EVENT) - .schemas(ImmutableSet.of(database)) - .tables(ImmutableSet.of(table))); + String database, + String table, + TiTimestamp ts, + TiDBSchemaAdapter schema, + long startingOffsetTs) { + KafkaCDCSourceBuilder kafkaCDCSourceBuilder = + new KafkaCDCSourceBuilder( + new CDCDeserializationSchemaBuilder( + schema.getPhysicalRowDataType(), schema.getCDCMetadata()) + .startTs(ts.getVersion()) + .types(ROW_CHANGED_EVENT) + .schemas(ImmutableSet.of(database)) + .tables(ImmutableSet.of(table))); + if (startingOffsetTs > 0) { + kafkaCDCSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startingOffsetTs)); + } + return kafkaCDCSourceBuilder; } @SuppressWarnings("unchecked") diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java index 3e2fb628..9f06a165 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java @@ -85,7 +85,8 @@ public SnapshotSource( tiTableInfo, Arrays.asList(schema.getPhysicalFieldNamesWithoutMeta())); this.timestamp = getOptionalVersion() - .orElseGet(() -> getOptionalTimestamp().orElseGet(session::getSnapshotVersion)); + .orElseGet( + () -> getOptionalTimestamp().orElseGet(session::getApproximateSnapshotVersion)); TableHandleInternal tableHandleInternal = new TableHandleInternal(this.databaseName, tiTableInfo); this.splits = diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java index ae3c0bcf..a4279b91 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java @@ -20,6 +20,7 @@ import static io.tidb.bigdata.flink.connector.TiDBOptions.IGNORE_PARSE_ERRORS; import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC; import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CANAL_JSON; +import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CANAL_PROTOBUF; import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CRAFT; import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_JSON; import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_SOURCE; @@ -28,6 +29,7 @@ import static io.tidb.bigdata.flink.connector.TiDBOptions.VALID_STREAMING_CODECS; import static io.tidb.bigdata.flink.connector.TiDBOptions.VALID_STREAMING_SOURCES; +import io.tidb.bigdata.flink.connector.TiDBOptions; import io.tidb.bigdata.flink.connector.source.enumerator.TiDBSourceSplitEnumerator; import io.tidb.bigdata.tidb.ClientConfig; import io.tidb.bigdata.tidb.expression.Expression; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.connector.base.source.hybrid.HybridSource; @@ -102,9 +105,17 @@ private TiDBSourceBuilder setProperties(Map properties) { return this; } + private long getStartingOffsetTs(TiTimestamp timestamp) { + return Optional.ofNullable(properties.get(TiDBOptions.STARTING_OFFSETS_TS)) + .filter(StringUtils::isNotEmpty) + .map(Long::parseLong) + .orElse(timestamp.getPhysical() - TiDBOptions.MINUTE); + } + private CDCSourceBuilder createCDCBuilder(TiTimestamp timestamp) { if (streamingSource.equals(STREAMING_SOURCE_KAFKA)) { - return CDCSourceBuilder.kafka(databaseName, tableName, timestamp, schema) + return CDCSourceBuilder.kafka( + databaseName, tableName, timestamp, schema, getStartingOffsetTs(timestamp)) .ignoreParseErrors(ignoreParseErrors) .setProperties(properties); } else { @@ -150,6 +161,8 @@ private TiDBSourceBuilder setProperties(Map properties) { return cdcBuilder.json(); case STREAMING_CODEC_CANAL_JSON: return cdcBuilder.canalJson(); + case STREAMING_CODEC_CANAL_PROTOBUF: + return cdcBuilder.canalProtobuf(); default: throw new IllegalArgumentException( "Invalid streaming codec: '" + streamingCodec + "'"); diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java new file mode 100644 index 00000000..76da58d7 --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java @@ -0,0 +1,259 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import io.tidb.bigdata.flink.connector.source.TiDBMetadata; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.meta.TiTimestamp; + +public abstract class CanalDeserializationSchema implements DeserializationSchema { + + private static final Logger LOG = LoggerFactory.getLogger(CanalDeserializationSchema.class); + + private static final String INSERT = "INSERT"; + private static final String UPDATE = "UPDATE"; + private static final String DELETE = "DELETE"; + private static final String CREATE = "CREATE"; + private static final String QUERY = "QUERY"; + private static final String TIDB_WATERMARK = "TIDB_WATERMARK"; + + public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + protected final DataType physicalDataType; + protected final Set schemas; + protected final Set tables; + protected final CDCMetadata[] metadata; + protected final long startTs; + protected final TypeInformation producedTypeInfo; + protected final boolean ignoreParseErrors; + + protected final List fieldNames; + protected final List dataTypes; + + protected CanalDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + this.physicalDataType = physicalDataType; + this.schemas = schemas; + this.tables = tables; + this.metadata = metadata; + this.startTs = startTs; + Preconditions.checkArgument( + createTiTimestampFromVersion(startTs).getLogical() == 0, + "The logical ts must be 0 for canal format"); + this.producedTypeInfo = producedTypeInfo; + this.ignoreParseErrors = ignoreParseErrors; + RowType physicalRowType = ((RowType) physicalDataType.getLogicalType()); + this.fieldNames = new ArrayList<>(physicalRowType.getFieldNames()); + this.dataTypes = new ArrayList<>(physicalDataType.getChildren()); + } + + @Override + public final RowData deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException( + "Please invoke deserialize(byte[] message, Collector out)"); + } + + @Override + public void deserialize(byte[] message, Collector out) throws IOException { + List flatMessages; + try { + flatMessages = decodeToFlatMessage(message); + } catch (Exception e) { + if (ignoreParseErrors) { + LOG.warn("Can not decode to flatMessage", e); + } + throw new RuntimeException(e); + } + for (FlatMessage flatMessage : flatMessages) { + deserialize(flatMessage, out); + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + public abstract List decodeToFlatMessage(byte[] data) throws Exception; + + public void deserialize(FlatMessage flatMessage, Collector out) { + if (!schemas.contains(flatMessage.getDatabase())) { + return; + } + if (!tables.contains(flatMessage.getTable())) { + return; + } + TiTimestamp tiTimestamp = createTiTimestampFromTs(flatMessage.getEs()); + if (startTs > tiTimestamp.getVersion()) { + return; + } + List> data = flatMessage.getData(); + List> old = flatMessage.getOld(); + String type = flatMessage.getType(); + switch (type) { + case CREATE: + case QUERY: + case TIDB_WATERMARK: + break; + case INSERT: + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.INSERT)) + .forEach(out::collect); + break; + case DELETE: + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.DELETE)) + .forEach(out::collect); + break; + case UPDATE: + old.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_BEFORE)) + .forEach(out::collect); + data.stream() + .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_AFTER)) + .forEach(out::collect); + break; + default: + if (!ignoreParseErrors) { + throw new IllegalArgumentException( + String.format( + "Unknown \"type\" value \"%s\". The Canal message is '%s'", type, flatMessage)); + } + } + } + + protected RowData toRowData(TiTimestamp timestamp, Map values, RowKind rowKind) { + // ignore case + values = + values.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Entry::getValue)); + GenericRowData rowData = new GenericRowData(rowKind, fieldNames.size() + metadata.length); + for (int i = 0; i < fieldNames.size(); i++) { + String name = fieldNames.get(i); + DataType dataType = dataTypes.get(i); + String value = values.get(name); + rowData.setField(i, convertValue(value, dataType)); + } + // concat metadata + for (int i = 0; i < metadata.length; i++) { + CDCMetadata cdcMetadata = metadata[i]; + TiDBMetadata tiDBMetadata = + cdcMetadata + .toTiDBMetadata() + .orElseThrow( + () -> new IllegalArgumentException("Unsupported metadata: " + cdcMetadata)); + if (cdcMetadata == CDCMetadata.SOURCE_EVENT) { + rowData.setField(fieldNames.size() + i, StringData.fromString(CDCMetadata.STREAMING)); + continue; + } + rowData.setField(fieldNames.size() + i, tiDBMetadata.extract(timestamp)); + } + return rowData; + } + + public static Object convertValue(String value, DataType dataType) { + if (value == null) { + return null; + } + LogicalType logicalType = dataType.getLogicalType(); + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return Boolean.parseBoolean(value); + case TINYINT: + return Byte.parseByte(value); + case SMALLINT: + return Short.parseShort(value); + case INTEGER: + return Integer.parseInt(value); + case BIGINT: + return Long.parseLong(value); + case VARCHAR: + return StringData.fromString(value); + case VARBINARY: + return value.getBytes(); + case FLOAT: + return Float.parseFloat(value); + case DOUBLE: + return Double.parseDouble(value); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return DecimalData.fromBigDecimal( + new BigDecimal(value), decimalType.getPrecision(), decimalType.getScale()); + case DATE: + return (int) LocalDate.parse(value, DATE_FORMATTER).toEpochDay(); + case TIME_WITHOUT_TIME_ZONE: + return (int) (LocalTime.parse(value, TIME_FORMATTER).toNanoOfDay() / 1000 / 1000); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampData.fromTimestamp( + Timestamp.valueOf(LocalDateTime.parse(value, DATE_TIME_FORMATTER))); + default: + throw new IllegalStateException("Unsupported type root: " + logicalType.getTypeRoot()); + } + } + + public static TiTimestamp createTiTimestampFromVersion(long version) { + return new TiTimestamp(version >> 18, version & 0x3FFFF); + } + + public static TiTimestamp createTiTimestampFromTs(long ts) { + return new TiTimestamp(ts, 0); + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java new file mode 100644 index 00000000..093336d8 --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.util.List; +import java.util.Set; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +public class CanalJsonDeserializationSchema extends CanalDeserializationSchema { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public CanalJsonDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + super( + physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors); + } + + @Override + public List decodeToFlatMessage(byte[] data) throws Exception { + return ImmutableList.of(MAPPER.readValue(data, FlatMessage.class)); + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java new file mode 100644 index 00000000..c3cfcfbe --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.alibaba.otter.canal.protocol.CanalPacket.Ack; +import com.alibaba.otter.canal.protocol.CanalPacket.Compression; +import com.alibaba.otter.canal.protocol.CanalPacket.Messages; +import com.alibaba.otter.canal.protocol.CanalPacket.Packet; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.google.protobuf.ByteString; +import java.util.Iterator; + +public class CanalMessageDeserializer { + + public static Message deserializer(byte[] data) { + return deserializer(data, false); + } + + public static Message deserializer(byte[] data, boolean lazyParseEntry) { + try { + if (data == null) { + return null; + } else { + Packet p = Packet.parseFrom(data); + switch (p.getType()) { + case MESSAGES: + if (!p.getCompression().equals(Compression.NONE) + && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) { + throw new CanalClientException("compression is not supported in this connector"); + } + + Messages messages = Messages.parseFrom(p.getBody()); + Message result = new Message(messages.getBatchId()); + if (lazyParseEntry) { + result.setRawEntries(messages.getMessagesList()); + result.setRaw(true); + } else { + Iterator var5 = messages.getMessagesList().iterator(); + + while (var5.hasNext()) { + ByteString byteString = (ByteString) var5.next(); + result.addEntry(Entry.parseFrom(byteString)); + } + + result.setRaw(false); + } + + return result; + case ACK: + Ack ack = Ack.parseFrom(p.getBody()); + throw new CanalClientException( + "something goes wrong with reason: " + ack.getErrorMessage()); + default: + throw new CanalClientException("unexpected packet type: " + p.getType()); + } + } + } catch (Exception var7) { + throw new CanalClientException("deserializer failed", var7); + } + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java new file mode 100644 index 00000000..bc536453 --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.protocol.Message; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.util.List; +import java.util.Set; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +public class CanalProtobufDeserializationSchema extends CanalDeserializationSchema { + + public CanalProtobufDeserializationSchema( + DataType physicalDataType, + Set schemas, + Set tables, + CDCMetadata[] metadata, + long startTs, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors) { + super( + physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors); + } + + @Override + public List decodeToFlatMessage(byte[] data) throws Exception { + Message message = CanalMessageDeserializer.deserializer(data); + return MQMessageUtils.getFlatMessages(message); + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java new file mode 100644 index 00000000..822da53a --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java @@ -0,0 +1,243 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FlatMessage implements Serializable { + + private long id; + private String database; + private String table; + private List pkNames; + private Boolean isDdl; + private String type; + // binlog executeTime + private Long es; + // dml build timeStamp + private Long ts; + private String sql; + private Map sqlType; + private Map mysqlType; + private List> data; + private List> old; + + @JsonProperty("_tidb") + private TiDB tidb; + + public FlatMessage() {} + + public FlatMessage(long id) { + this.id = id; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public List getPkNames() { + return pkNames; + } + + public void addPkName(String pkName) { + if (this.pkNames == null) { + this.pkNames = new ArrayList<>(); + } + this.pkNames.add(pkName); + } + + public void setPkNames(List pkNames) { + this.pkNames = pkNames; + } + + public Boolean getIsDdl() { + return isDdl; + } + + public void setIsDdl(Boolean isDdl) { + this.isDdl = isDdl; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Long getTs() { + return ts; + } + + public void setTs(Long ts) { + this.ts = ts; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public Map getSqlType() { + return sqlType; + } + + public void setSqlType(Map sqlType) { + this.sqlType = sqlType; + } + + public Map getMysqlType() { + return mysqlType; + } + + public void setMysqlType(Map mysqlType) { + this.mysqlType = mysqlType; + } + + public List> getData() { + return data; + } + + public void setData(List> data) { + this.data = data; + } + + public List> getOld() { + return old; + } + + public void setOld(List> old) { + this.old = old; + } + + public Long getEs() { + return es; + } + + public void setEs(Long es) { + this.es = es; + } + + public Boolean getDdl() { + return isDdl; + } + + public void setDdl(Boolean ddl) { + isDdl = ddl; + } + + public TiDB getTidb() { + return tidb; + } + + public void setTidb(TiDB tidb) { + this.tidb = tidb; + } + + @Override + public String toString() { + return "FlatMessage{" + + "id=" + + id + + ", database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", pkNames=" + + pkNames + + ", isDdl=" + + isDdl + + ", type='" + + type + + '\'' + + ", es=" + + es + + ", ts=" + + ts + + ", sql='" + + sql + + '\'' + + ", sqlType=" + + sqlType + + ", mysqlType=" + + mysqlType + + ", data=" + + data + + ", old=" + + old + + ", tidb=" + + tidb + + '}'; + } + + public static class TiDB { + + private Long commitTs; + private Long watermarkTs; + + public Long getCommitTs() { + return commitTs; + } + + public void setCommitTs(Long commitTs) { + this.commitTs = commitTs; + } + + public Long getWatermarkTs() { + return watermarkTs; + } + + public void setWatermarkTs(Long watermarkTs) { + this.watermarkTs = watermarkTs; + } + + @Override + public String toString() { + return "TiDB{" + "commitTs=" + commitTs + ", watermarkTs=" + watermarkTs + '}'; + } + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java new file mode 100644 index 00000000..f5cbbda2 --- /dev/null +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java @@ -0,0 +1,206 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import com.alibaba.otter.canal.common.utils.ExecutorTemplate; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; +import com.alibaba.otter.canal.protocol.Message; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import javax.annotation.Nullable; + +public class MQMessageUtils { + + public static List getFlatMessages(Message message) { + EntryRowData[] datas = MQMessageUtils.buildMessageData(message, null); + return MQMessageUtils.messageConverter(datas, message.getId()); + } + + public static EntryRowData[] buildMessageData( + Message message, @Nullable ThreadPoolExecutor executor) { + Optional template = Optional.ofNullable(executor).map(ExecutorTemplate::new); + if (message.isRaw()) { + List rawEntries = message.getRawEntries(); + final EntryRowData[] datas = new EntryRowData[rawEntries.size()]; + int i = 0; + for (ByteString byteString : rawEntries) { + final int index = i; + Runnable task = + () -> { + try { + Entry entry = Entry.parseFrom(byteString); + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + datas[index] = new EntryRowData(); + datas[index].entry = entry; + datas[index].rowChange = rowChange; + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }; + if (template.isPresent()) { + template.get().submit(task); + } else { + task.run(); + } + i++; + } + template.ifPresent(ExecutorTemplate::waitForResult); + return datas; + } else { + final EntryRowData[] datas = new EntryRowData[message.getEntries().size()]; + int i = 0; + for (Entry entry : message.getEntries()) { + final int index = i; + Runnable task = + () -> { + try { + RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); + datas[index] = new EntryRowData(); + datas[index].entry = entry; + datas[index].rowChange = rowChange; + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }; + if (template.isPresent()) { + template.get().submit(task); + } else { + task.run(); + } + i++; + } + template.ifPresent(ExecutorTemplate::waitForResult); + return datas; + } + } + + public static List messageConverter(EntryRowData[] datas, long id) { + List flatMessages = new ArrayList<>(); + for (EntryRowData entryRowData : datas) { + Entry entry = entryRowData.entry; + RowChange rowChange = entryRowData.rowChange; + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN + || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + continue; + } + + // build flatMessage + CanalEntry.EventType eventType = rowChange.getEventType(); + FlatMessage flatMessage = new FlatMessage(id); + flatMessages.add(flatMessage); + flatMessage.setDatabase(entry.getHeader().getSchemaName()); + flatMessage.setTable(entry.getHeader().getTableName()); + flatMessage.setIsDdl(rowChange.getIsDdl()); + flatMessage.setType(eventType.toString()); + flatMessage.setEs(entry.getHeader().getExecuteTime()); + flatMessage.setTs(System.currentTimeMillis()); + flatMessage.setSql(rowChange.getSql()); + + if (!rowChange.getIsDdl()) { + Map sqlType = new LinkedHashMap<>(); + Map mysqlType = new LinkedHashMap<>(); + List> data = new ArrayList<>(); + List> old = new ArrayList<>(); + + Set updateSet = new HashSet<>(); + boolean hasInitPkNames = false; + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + if (eventType != CanalEntry.EventType.INSERT + && eventType != CanalEntry.EventType.UPDATE + && eventType != CanalEntry.EventType.DELETE) { + continue; + } + + Map row = new LinkedHashMap<>(); + List columns; + + if (eventType == CanalEntry.EventType.DELETE) { + columns = rowData.getBeforeColumnsList(); + } else { + columns = rowData.getAfterColumnsList(); + } + + for (CanalEntry.Column column : columns) { + if (!hasInitPkNames && column.getIsKey()) { + flatMessage.addPkName(column.getName()); + } + sqlType.put(column.getName(), column.getSqlType()); + mysqlType.put(column.getName(), column.getMysqlType()); + if (column.getIsNull()) { + row.put(column.getName(), null); + } else { + row.put(column.getName(), column.getValue()); + } + if (column.getUpdated()) { + updateSet.add(column.getName()); + } + } + + hasInitPkNames = true; + if (!row.isEmpty()) { + data.add(row); + } + + if (eventType == CanalEntry.EventType.UPDATE) { + Map rowOld = new LinkedHashMap<>(); + for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { + if (updateSet.contains(column.getName())) { + if (column.getIsNull()) { + rowOld.put(column.getName(), null); + } else { + rowOld.put(column.getName(), column.getValue()); + } + } + } + if (!rowOld.isEmpty()) { + old.add(rowOld); + } + } + } + if (!sqlType.isEmpty()) { + flatMessage.setSqlType(sqlType); + } + if (!mysqlType.isEmpty()) { + flatMessage.setMysqlType(mysqlType); + } + if (!data.isEmpty()) { + flatMessage.setData(data); + } + if (!old.isEmpty()) { + flatMessage.setOld(old); + } + } + } + return flatMessages; + } + + public static class EntryRowData { + + public Entry entry; + public RowChange rowChange; + } +} diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java index a52838fc..eecf6058 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java @@ -18,9 +18,11 @@ import com.google.common.base.Preconditions; import io.tidb.bigdata.cdc.Key; +import io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema; +import io.tidb.bigdata.flink.format.canal.CanalJsonDeserializationSchema; +import io.tidb.bigdata.flink.format.canal.CanalProtobufDeserializationSchema; import java.util.Set; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; @@ -87,15 +89,13 @@ public JsonDeserializationSchema json() { } // We use a new decoder, since canal does not follow TiCDC open protocol. - public TiDBCanalJsonDeserializationSchema canalJson() { - return new TiDBCanalJsonDeserializationSchema( - physicalDataType, - schemas, - tables, - metadata, - startTs, - typeInformation, - ignoreParseErrors, - TimestampFormat.SQL); + public CanalDeserializationSchema canalJson() { + return new CanalJsonDeserializationSchema( + physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors); + } + + public CanalDeserializationSchema canalProtobuf() { + return new CanalProtobufDeserializationSchema( + physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors); } } diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java index 62d27e90..c04b35e0 100644 --- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java +++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java @@ -47,6 +47,7 @@ import org.apache.flink.util.Collector; import org.tikv.common.meta.TiTimestamp; +@Deprecated public final class TiDBCanalJsonDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; diff --git a/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java b/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java new file mode 100644 index 00000000..aede34c3 --- /dev/null +++ b/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java @@ -0,0 +1,240 @@ +/* + * Copyright 2023 TiDB Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.tidb.bigdata.flink.format.canal; + +import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.DATE_FORMATTER; +import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.TIME_FORMATTER; + +import com.google.common.collect.ImmutableSet; +import io.tidb.bigdata.flink.format.cdc.CDCMetadata; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Set; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +public class CanalDeserializationSchemaTest { + + public static final DataType TYPES = + DataTypes.ROW( + DataTypes.FIELD("c1", DataTypes.TINYINT()), + DataTypes.FIELD("c2", DataTypes.SMALLINT()), + DataTypes.FIELD("c3", DataTypes.INT()), + DataTypes.FIELD("c4", DataTypes.INT()), + DataTypes.FIELD("c5", DataTypes.BIGINT()), + DataTypes.FIELD("c6", DataTypes.STRING()), + DataTypes.FIELD("c7", DataTypes.STRING()), + DataTypes.FIELD("c8", DataTypes.STRING()), + DataTypes.FIELD("c9", DataTypes.STRING()), + DataTypes.FIELD("c10", DataTypes.STRING()), + DataTypes.FIELD("c11", DataTypes.STRING()), + DataTypes.FIELD("c12", DataTypes.BYTES()), + DataTypes.FIELD("c13", DataTypes.BYTES()), + DataTypes.FIELD("c14", DataTypes.BYTES()), + DataTypes.FIELD("c15", DataTypes.BYTES()), + DataTypes.FIELD("c16", DataTypes.BYTES()), + DataTypes.FIELD("c17", DataTypes.BYTES()), + DataTypes.FIELD("c18", DataTypes.FLOAT()), + DataTypes.FIELD("c19", DataTypes.DOUBLE()), + DataTypes.FIELD("c20", DataTypes.DECIMAL(6, 3)), + DataTypes.FIELD("c21", DataTypes.DATE()), + DataTypes.FIELD("c22", DataTypes.TIME(0)), + DataTypes.FIELD("c23", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("c24", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("c25", DataTypes.INT()), + DataTypes.FIELD("c26", DataTypes.TINYINT()), + DataTypes.FIELD("c27", DataTypes.STRING()), + DataTypes.FIELD("c28", DataTypes.STRING()), + DataTypes.FIELD("c29", DataTypes.STRING())); + + public static final byte[] JSON_DATA = + "{\"id\":0,\"database\":\"test\",\"table\":\"test_tidb_type\",\"pkNames\":[\"c1\"],\"isDdl\":false,\"type\":\"INSERT\",\"es\":1685085356868,\"ts\":1685085357997,\"sql\":\"\",\"sqlType\":{\"c1\":-6,\"c10\":2005,\"c11\":2005,\"c12\":2004,\"c13\":2004,\"c14\":2004,\"c15\":2004,\"c16\":2004,\"c17\":2004,\"c18\":7,\"c19\":8,\"c2\":5,\"c20\":3,\"c21\":91,\"c22\":92,\"c23\":93,\"c24\":93,\"c25\":12,\"c26\":-6,\"c27\":12,\"c28\":4,\"c29\":-7,\"c3\":4,\"c4\":4,\"c5\":-5,\"c6\":1,\"c7\":12,\"c8\":2005,\"c9\":2005},\"mysqlType\":{\"c1\":\"tinyint\",\"c10\":\"text\",\"c11\":\"longtext\",\"c12\":\"binary\",\"c13\":\"varbinary\",\"c14\":\"tinyblob\",\"c15\":\"mediumblob\",\"c16\":\"blob\",\"c17\":\"longblob\",\"c18\":\"float\",\"c19\":\"double\",\"c2\":\"smallint\",\"c20\":\"decimal\",\"c21\":\"date\",\"c22\":\"time\",\"c23\":\"datetime\",\"c24\":\"timestamp\",\"c25\":\"year\",\"c26\":\"tinyint\",\"c27\":\"json\",\"c28\":\"enum\",\"c29\":\"set\",\"c3\":\"mediumint\",\"c4\":\"int\",\"c5\":\"bigint\",\"c6\":\"char\",\"c7\":\"varchar\",\"c8\":\"tinytext\",\"c9\":\"mediumtext\"},\"data\":[{\"c1\":\"1\",\"c10\":\"texttype\",\"c11\":\"longtexttype\",\"c12\":\"binarytype\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\",\"c13\":\"varbinarytype\",\"c14\":\"tinyblobtype\",\"c15\":\"mediumblobtype\",\"c16\":\"blobtype\",\"c17\":\"longblobtype\",\"c18\":\"1.2339999675750732\",\"c19\":\"2.456789\",\"c2\":\"2\",\"c20\":\"123.456\",\"c21\":\"2023-05-26\",\"c22\":\"15:15:56\",\"c23\":\"2023-05-26 15:15:56\",\"c24\":\"2023-05-26 15:15:56\",\"c25\":\"2020\",\"c26\":\"1\",\"c27\":\"{\\\"a\\\": 1, \\\"b\\\": 2}\",\"c28\":\"1\",\"c29\":\"1\",\"c3\":\"3\",\"c4\":\"4\",\"c5\":\"5\",\"c6\":\"chartype\",\"c7\":\"varchartype\",\"c8\":\"tinytexttype\",\"c9\":\"mediumtexttype\"}],\"old\":null}" + .getBytes(); + public static final RowData JSON_DECODE_RESULT = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 1, + (short) 2, + 3, + 4, + 5L, + StringData.fromString("chartype"), + StringData.fromString("varchartype"), + StringData.fromString("tinytexttype"), + StringData.fromString("mediumtexttype"), + StringData.fromString("texttype"), + StringData.fromString("longtexttype"), + ("binarytype" + new String(new byte[10])).getBytes(), + "varbinarytype".getBytes(), + "tinyblobtype".getBytes(), + "mediumblobtype".getBytes(), + "blobtype".getBytes(), + "longblobtype".getBytes(), + 1.234F, + 2.456789, + DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3), + (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(), + (int) (LocalTime.parse("15:15:56", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)), + 2020, + (byte) 1, + StringData.fromString("{\"a\": 1, \"b\": 2}"), + StringData.fromString("1"), + StringData.fromString("1"), + 441735015790804992L, + TimestampData.fromEpochMillis(441735015790804992L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final byte[] PROTOBUF_DATA = + Base64.getDecoder() + .decode( + "EAEYByr8DRKZCAo6CAEqBVVURi04MP2S87iFMTgCQgR0ZXN0Sg50ZXN0X3RpZGJfdHlwZVgBYg4KCXJvd3NDb3VudBIBMRACGtgHEAFQAGLRBxIhEPr//////////wEaAmMxIAEoATAAQgExUgd0aW55aW50EhcQBRoCYzIoATAAQgEyUghzbWFsbGludBIYEAQaAmMzKAEwAEIBM1IJbWVkaXVtaW50EhIQBBoCYzQoATAAQgE0UgNpbnQSHhD7//////////8BGgJjNSgBMABCATVSBmJpZ2ludBIaEAEaAmM2KAEwAEIIY2hhcnR5cGVSBGNoYXISIBAMGgJjNygBMABCC3ZhcmNoYXJ0eXBlUgd2YXJjaGFyEiMQ1Q8aAmM4KAEwAEIMdGlueXRleHR0eXBlUgh0aW55dGV4dBInENUPGgJjOSgBMABCDm1lZGl1bXRleHR0eXBlUgptZWRpdW10ZXh0EhwQ1Q8aA2MxMCgBMABCCHRleHR0eXBlUgR0ZXh0EiQQ1Q8aA2MxMSgBMABCDGxvbmd0ZXh0dHlwZVIIbG9uZ3RleHQSKhDUDxoDYzEyKAEwAEIUYmluYXJ5dHlwZQAAAAAAAAAAAABSBmJpbmFyeRImENQPGgNjMTMoATAAQg12YXJiaW5hcnl0eXBlUgl2YXJiaW5hcnkSJBDUDxoDYzE0KAEwAEIMdGlueWJsb2J0eXBlUgh0aW55YmxvYhIoENQPGgNjMTUoATAAQg5tZWRpdW1ibG9idHlwZVIKbWVkaXVtYmxvYhIcENQPGgNjMTYoATAAQghibG9idHlwZVIEYmxvYhIkENQPGgNjMTcoATAAQgxsb25nYmxvYnR5cGVSCGxvbmdibG9iEiYQBxoDYzE4KAEwAEISMS4yMzM5OTk5Njc1NzUwNzMyUgVmbG9hdBIdEAgaA2MxOSgBMABCCDIuNDU2Nzg5UgZkb3VibGUSHRADGgNjMjAoATAAQgcxMjMuNDU2UgdkZWNpbWFsEh0QWxoDYzIxKAEwAEIKMjAyMy0wNS0yNlIEZGF0ZRIbEFwaA2MyMigBMABCCDE2OjExOjI0UgR0aW1lEioQXRoDYzIzKAEwAEITMjAyMy0wNS0yNiAxNjoxMToyNFIIZGF0ZXRpbWUSKxBdGgNjMjQoATAAQhMyMDIzLTA1LTI2IDE2OjExOjI0Ugl0aW1lc3RhbXASFxAMGgNjMjUoATAAQgQyMDIwUgR5ZWFyEiAQ+v//////////ARoDYzI2KAEwAEIBMVIHdGlueWludBIjEAwaA2MyNygBMABCEHsiYSI6IDEsICJiIjogMn1SBGpzb24SFBAEGgNjMjgoATAAQgExUgRlbnVtEhwQ+f//////////ARoDYzI5KAEwAEIBMVIDc2V0Et0FCjoIASoFVVRGLTgw/ZLzuIUxOAJCBHRlc3RKDnRlc3RfdGlkYl90eXBlWAFiDgoJcm93c0NvdW50EgExEAIanAUQAVAAYpUFEiEQ+v//////////ARoCYzEgASgBMABCATJSB3RpbnlpbnQSFBAFGgJjMigBMAFSCHNtYWxsaW50EhUQBBoCYzMoATABUgltZWRpdW1pbnQSDxAEGgJjNCgBMAFSA2ludBIbEPv//////////wEaAmM1KAEwAVIGYmlnaW50EhAQARoCYzYoATABUgRjaGFyEhMQDBoCYzcoATABUgd2YXJjaGFyEhUQ1Q8aAmM4KAEwAVIIdGlueXRleHQSFxDVDxoCYzkoATABUgptZWRpdW10ZXh0EhIQ1Q8aA2MxMCgBMAFSBHRleHQSFhDVDxoDYzExKAEwAVIIbG9uZ3RleHQSFBDUDxoDYzEyKAEwAVIGYmluYXJ5EhcQ1A8aA2MxMygBMAFSCXZhcmJpbmFyeRIWENQPGgNjMTQoATABUgh0aW55YmxvYhIYENQPGgNjMTUoATABUgptZWRpdW1ibG9iEhIQ1A8aA2MxNigBMAFSBGJsb2ISFhDUDxoDYzE3KAEwAVIIbG9uZ2Jsb2ISEhAHGgNjMTgoATABUgVmbG9hdBITEAgaA2MxOSgBMAFSBmRvdWJsZRIUEAMaA2MyMCgBMAFSB2RlY2ltYWwSERBbGgNjMjEoATABUgRkYXRlEhEQXBoDYzIyKAEwAVIEdGltZRIVEF0aA2MyMygBMAFSCGRhdGV0aW1lEhYQXRoDYzI0KAEwAVIJdGltZXN0YW1wEhEQDBoDYzI1KAEwAVIEeWVhchIdEPr//////////wEaA2MyNigBMAFSB3RpbnlpbnQSERAMGgNjMjcoATABUgRqc29uEhEQBBoDYzI4KAEwAVIEZW51bRIZEPn//////////wEaA2MyOSgBMAFSA3NldA=="); + + public static final RowData PROTOBUF_DECODE_RESULT0 = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 1, + (short) 2, + 3, + 4, + 5L, + StringData.fromString("chartype"), + StringData.fromString("varchartype"), + StringData.fromString("tinytexttype"), + StringData.fromString("mediumtexttype"), + StringData.fromString("texttype"), + StringData.fromString("longtexttype"), + ("binarytype" + new String(new byte[10])).getBytes(), + "varbinarytype".getBytes(), + "tinyblobtype".getBytes(), + "mediumblobtype".getBytes(), + "blobtype".getBytes(), + "longblobtype".getBytes(), + 1.234F, + 2.456789, + DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3), + (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(), + (int) (LocalTime.parse("16:11:24", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)), + TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)), + 2020, + (byte) 1, + StringData.fromString("{\"a\": 1, \"b\": 2}"), + StringData.fromString("1"), + StringData.fromString("1"), + 441735888086761472L, + TimestampData.fromEpochMillis(441735888086761472L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final RowData PROTOBUF_DECODE_RESULT1 = + GenericRowData.ofKind( + RowKind.INSERT, + (byte) 2, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 441735888086761472L, + TimestampData.fromEpochMillis(441735888086761472L >> 18), + StringData.fromString(CDCMetadata.STREAMING)); + + public static final Set SCHEMAS = ImmutableSet.of("test"); + + public static final Set TABLES = ImmutableSet.of("test_tidb_type"); + + public static final CDCMetadata[] METADATA = + new CDCMetadata[] { + CDCMetadata.COMMIT_VERSION, CDCMetadata.COMMIT_TIMESTAMP, CDCMetadata.SOURCE_EVENT + }; + + @Test + public void testJsonDecode() throws IOException { + ListCollector collector = new ListCollector(); + CanalJsonDeserializationSchema canalJsonDeserializationSchema = + new CanalJsonDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false); + canalJsonDeserializationSchema.deserialize(JSON_DATA, collector); + RowData row = collector.getRows().get(0); + Assert.assertEquals(row, JSON_DECODE_RESULT); + } + + @Test + public void testProtobufDecode() throws IOException { + ListCollector collector = new ListCollector(); + CanalProtobufDeserializationSchema canalProtobufDeserializationSchema = + new CanalProtobufDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false); + canalProtobufDeserializationSchema.deserialize(PROTOBUF_DATA, collector); + Assert.assertEquals(PROTOBUF_DECODE_RESULT0, collector.getRows().get(0)); + Assert.assertEquals(PROTOBUF_DECODE_RESULT1, collector.getRows().get(1)); + } + + public static class ListCollector implements Collector { + + private final List rows = new ArrayList<>(); + + @Override + public void collect(RowData record) { + rows.add(record); + } + + @Override + public void close() {} + + public List getRows() { + return rows; + } + } +} diff --git a/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java b/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java index 430f3821..cc0cc32f 100644 --- a/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java +++ b/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java @@ -492,6 +492,17 @@ public TiTimestamp getSnapshotVersion() { return session.getTimestamp(); } + /** + * Most of the time, if the user does not explicitly specify a snapshot time, we don't need to + * fetch the latest snapshot. A "relatively recent" snapshot is sufficient, which is crucial for + * merging CDC data. + * + * @return Current version with zero logical time. + */ + public TiTimestamp getApproximateSnapshotVersion() { + return new TiTimestamp(session.getTimestamp().getPhysical(), 0); + } + public TiSession getTiSession() { return session; }