diff --git a/.github/workflows/flink-stream-test.yml b/.github/workflows/flink-stream-test.yml
index f4fc4194..885e43ca 100644
--- a/.github/workflows/flink-stream-test.yml
+++ b/.github/workflows/flink-stream-test.yml
@@ -37,7 +37,9 @@ jobs:
kafka/bin/kafka-server-start.sh kafka/config/server.properties &
- name: deploy ticdc
- run: /home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc &
+ run: |
+ /home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc &
+ sleep 10
- name: create cdc changefeed
run: |
diff --git a/README.md b/README.md
index 3590846b..179bfcb1 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format
**Compiling TiBigData requires git and downloading source code directly is not supported.**
-[Flink-TiDB-Connector](./flink/README.md)
+[Flink-TiDB-Connector(Batch)](./flink/README.md)
+
+[Flink-TiDB-Connector(Unified Batch & Streaming)](./flink/README_unified_batch_streaming.md)
[PrestoSQL-TiDB-Connector](./prestosql/README.md)
diff --git a/README_zh_CN.md b/README_zh_CN.md
index 5632f074..6924982f 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format
**编译 TiBigData 需要 git, 直接下载源码编译暂不支持。**
-[TiDB 与 Flink 集成](./flink/README_zh_CN.md)
+[TiDB 与 Flink 集成(批处理)](./flink/README_zh_CN.md)
+
+[TiDB 与 Flink 集成(流批一体)](./flink/README_unified_batch_streaming_zh_CN.md)
[TiDB 与 PrestoSQL 集成 ***- 已废弃***](./prestosql/README_zh_CN.md)
diff --git a/flink/README_unified_batch_streaming.md b/flink/README_unified_batch_streaming.md
index 3c67868f..cbc85ecf 100644
--- a/flink/README_unified_batch_streaming.md
+++ b/flink/README_unified_batch_streaming.md
@@ -166,25 +166,26 @@ Keypoints
In addition to supporting the configuration in [TiDB Batch Mode](./README.md), the streaming mode adds the following configuration:
-| Configuration | Default Value | Description |
-|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------------------------------------------|
-| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. |
-| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. |
-| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json. See [Codec](#8-Codec) |
-| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address |
-| tidb.streaming.kafka.topic | - | Kafka topic |
-| tidb.streaming.kafka.group.id | - | Kafka group id |
-| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure |
-| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) |
-| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` |
+| Configuration | Default Value | Description |
+|:---------------------------------------|:--------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. |
+| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. |
+| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json, canal-protobuf. See [Codec](#8-Codec) |
+| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address |
+| tidb.streaming.kafka.topic | - | Kafka topic |
+| tidb.streaming.kafka.group.id | - | Kafka group id |
+| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure |
+| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) |
+| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` |
## 8 Codec
-TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json.
+TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json, canal-protobuf.
1. json is the default implementation of TiCDC and is highly readable;
2. craft sacrifices readability, is fully binary encoded, has higher compression, and requires a high version of TiDB(5.x). Currently, craft is still incubating, but it is working properly;
-3. canal-json is compatible with canal and must be used with the TiDB extension field enabled to read `commitTs`. Lower versions of TiDB do not have `commitTs`, so it cannot be used.
+3. canal-json is the compatibility for the canal json format;
+4. canal-protobuf is the compatibility for the canal protobuf format;
## 9 TiDB Metadata
diff --git a/flink/README_unified_batch_streaming_zh_CN.md b/flink/README_unified_batch_streaming_zh_CN.md
index 2728603b..56a0988f 100644
--- a/flink/README_unified_batch_streaming_zh_CN.md
+++ b/flink/README_unified_batch_streaming_zh_CN.md
@@ -168,25 +168,26 @@ DELETE FROM `test`.`source_table` WHERE id = 1 or id = 2;
除了支持 [TiDB 批模式](./README_zh_CN.md) 中的配置外,流模式新增了以下配置:
-| Configuration | Default Value | Description |
-|:---------------------------------------|:--------------|:-----------------------------------------------------------------------------------------------|
-| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 |
-| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 |
-| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json 三种格式,详细信息参考 [Codec](#8-Codec) |
-| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 |
-| tidb.streaming.kafka.topic | - | Kafka topic |
-| tidb.streaming.kafka.group.id | - | Kafka group id |
-| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 |
-| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) |
-| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 |
+| Configuration | Default Value | Description |
+|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------|
+| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 |
+| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 |
+| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf 四种格式,详细信息参考 [Codec](#8-Codec) |
+| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 |
+| tidb.streaming.kafka.topic | - | Kafka topic |
+| tidb.streaming.kafka.group.id | - | Kafka group id |
+| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 |
+| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) |
+| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 |
## 8 Codec
-TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json.
+TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf.
1. json 是 TiCDC 的默认实现,具有很强的可读性;
2. craft 牺牲了可读性,是完全二进制的编码方式,具有更高的压缩率,需要高版本 TiDB(5.x),当前还在孵化中,但是已经能够正常使用;
-3. canal-json 是对 canal 的兼容,使用时必须开启 TiDB 扩展字段以读取 commitTs,低版本的 TiDB 没有这个字段,所以不能使用。
+3. canal-json 是对 canal json 格式的兼容;
+4. canal-protobuf 是对 canal protobuf 格式的兼容。
## 9 TiDB Metadata
diff --git a/flink/flink-1.13/pom.xml b/flink/flink-1.13/pom.xml
index b22b10ba..73a0acbd 100644
--- a/flink/flink-1.13/pom.xml
+++ b/flink/flink-1.13/pom.xml
@@ -16,6 +16,7 @@
1.13.0
+ 1.1.6
@@ -43,6 +44,26 @@
commons-beanutils
${dep.apache.commons.version}
+
+
+ com.alibaba.otter
+ canal.protocol
+ ${dep.canal.protocol.version}
+
+
+ fastjson
+ com.alibaba
+
+
+ logback-classic
+ ch.qos.logback
+
+
+ logback-core
+ ch.qos.logback
+
+
+
org.apache.flink
@@ -132,6 +153,14 @@
oshi
io.tidb.bigdata.telemetry.shade.oshi
+
+ com.google.protobuf
+ io.tidb.bigdata.shade.com.google.protobuf
+
+
+ com.alibaba.otter.canal
+ io.tidb.bigdata.shade.com.alibaba.otter.canal
+
${project.artifactId}-${project.version}
diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
index a42d85e1..7608d73b 100644
--- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
+++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableSet;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.RowData;
@@ -55,6 +56,10 @@ public CDCSource canalJson() {
return doBuild(builder.canalJson());
}
+ public CDCSource canalProtobuf() {
+ return doBuild(builder.canalProtobuf());
+ }
+
private final CDCDeserializationSchemaBuilder builder;
protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) {
@@ -62,14 +67,23 @@ protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) {
}
public static KafkaCDCSourceBuilder kafka(
- String database, String table, TiTimestamp ts, TiDBSchemaAdapter schema) {
- return new KafkaCDCSourceBuilder(
- new CDCDeserializationSchemaBuilder(
- schema.getPhysicalRowDataType(), schema.getCDCMetadata())
- .startTs(ts.getVersion())
- .types(ROW_CHANGED_EVENT)
- .schemas(ImmutableSet.of(database))
- .tables(ImmutableSet.of(table)));
+ String database,
+ String table,
+ TiTimestamp ts,
+ TiDBSchemaAdapter schema,
+ long startingOffsetTs) {
+ KafkaCDCSourceBuilder kafkaCDCSourceBuilder =
+ new KafkaCDCSourceBuilder(
+ new CDCDeserializationSchemaBuilder(
+ schema.getPhysicalRowDataType(), schema.getCDCMetadata())
+ .startTs(ts.getVersion())
+ .types(ROW_CHANGED_EVENT)
+ .schemas(ImmutableSet.of(database))
+ .tables(ImmutableSet.of(table)));
+ if (startingOffsetTs > 0) {
+ kafkaCDCSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startingOffsetTs));
+ }
+ return kafkaCDCSourceBuilder;
}
@SuppressWarnings("unchecked")
diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
index 13c0988e..fccf4ac8 100644
--- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
+++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
@@ -85,7 +85,8 @@ public SnapshotSource(
tiTableInfo, Arrays.asList(schema.getPhysicalFieldNamesWithoutMeta()));
this.timestamp =
getOptionalVersion()
- .orElseGet(() -> getOptionalTimestamp().orElseGet(session::getSnapshotVersion));
+ .orElseGet(
+ () -> getOptionalTimestamp().orElseGet(session::getApproximateSnapshotVersion));
TableHandleInternal tableHandleInternal =
new TableHandleInternal(this.databaseName, tiTableInfo);
this.splits =
diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java
index 316b9d7a..8a618731 100644
--- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java
+++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBOptions.java
@@ -83,8 +83,13 @@ private static ConfigOption optional(String key) {
public static final String STREAMING_CODEC_JSON = "json";
public static final String STREAMING_CODEC_CRAFT = "craft";
public static final String STREAMING_CODEC_CANAL_JSON = "canal-json";
+ public static final String STREAMING_CODEC_CANAL_PROTOBUF = "canal-protobuf";
public static final Set VALID_STREAMING_CODECS =
- ImmutableSet.of(STREAMING_CODEC_CRAFT, STREAMING_CODEC_JSON, STREAMING_CODEC_CANAL_JSON);
+ ImmutableSet.of(
+ STREAMING_CODEC_CRAFT,
+ STREAMING_CODEC_JSON,
+ STREAMING_CODEC_CANAL_JSON,
+ STREAMING_CODEC_CANAL_PROTOBUF);
// Options for catalog
public static final ConfigOption IGNORE_PARSE_ERRORS =
@@ -118,6 +123,12 @@ private static ConfigOption optional(String key) {
public static final String METADATA_INCLUDED = "tidb.metadata.included";
public static final String METADATA_INCLUDED_ALL = "*";
+ // During the streaming phase, set a timestamp to avoid consuming expired data,
+ // with the default being one minute subtracted from the snapshot time.
+ // Additionally, we can set this value to less than or equal to 0 to disable this feature.
+ public static final String STARTING_OFFSETS_TS = "tidb.streaming.source.starting.offsets.ts";
+ public static final long MINUTE = 60 * 1000L;
+
public static Set> requiredOptions() {
return withMoreRequiredOptions();
}
diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
index 94193cdb..49ef06d8 100644
--- a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
+++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
@@ -20,6 +20,7 @@
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.IGNORE_PARSE_ERRORS;
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC;
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CANAL_JSON;
+import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CANAL_PROTOBUF;
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_CRAFT;
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_CODEC_JSON;
import static io.tidb.bigdata.flink.connector.source.TiDBOptions.STREAMING_SOURCE;
@@ -35,6 +36,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
@@ -102,9 +104,17 @@ private TiDBSourceBuilder setProperties(Map properties) {
return this;
}
+ private long getStartingOffsetTs(TiTimestamp timestamp) {
+ return Optional.ofNullable(properties.get(TiDBOptions.STARTING_OFFSETS_TS))
+ .filter(StringUtils::isNotEmpty)
+ .map(Long::parseLong)
+ .orElse(timestamp.getPhysical() - TiDBOptions.MINUTE);
+ }
+
private CDCSourceBuilder, ?> createCDCBuilder(TiTimestamp timestamp) {
if (streamingSource.equals(STREAMING_SOURCE_KAFKA)) {
- return CDCSourceBuilder.kafka(databaseName, tableName, timestamp, schema)
+ return CDCSourceBuilder.kafka(
+ databaseName, tableName, timestamp, schema, getStartingOffsetTs(timestamp))
.ignoreParseErrors(ignoreParseErrors)
.setProperties(properties);
} else {
@@ -150,6 +160,8 @@ private TiDBSourceBuilder setProperties(Map properties) {
return cdcBuilder.json();
case STREAMING_CODEC_CANAL_JSON:
return cdcBuilder.canalJson();
+ case STREAMING_CODEC_CANAL_PROTOBUF:
+ return cdcBuilder.canalProtobuf();
default:
throw new IllegalArgumentException(
"Invalid streaming codec: '" + streamingCodec + "'");
diff --git a/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java
new file mode 100644
index 00000000..5399ec77
--- /dev/null
+++ b/flink/flink-1.13/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import io.tidb.bigdata.flink.connector.source.TiDBMetadata;
+import io.tidb.bigdata.flink.format.cdc.CDCMetadata;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.meta.TiTimestamp;
+
+public abstract class CanalDeserializationSchema implements DeserializationSchema {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CanalDeserializationSchema.class);
+
+ private static final String INSERT = "INSERT";
+ private static final String UPDATE = "UPDATE";
+ private static final String DELETE = "DELETE";
+ private static final String CREATE = "CREATE";
+ private static final String QUERY = "QUERY";
+ private static final String TIDB_WATERMARK = "TIDB_WATERMARK";
+
+ public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
+ public static final DateTimeFormatter DATE_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ protected final DataType physicalDataType;
+ protected final Set schemas;
+ protected final Set tables;
+ protected final CDCMetadata[] metadata;
+ protected final long startTs;
+ protected final TypeInformation producedTypeInfo;
+ protected final boolean ignoreParseErrors;
+
+ protected final List fieldNames;
+ protected final List dataTypes;
+
+ protected CanalDeserializationSchema(
+ DataType physicalDataType,
+ Set schemas,
+ Set tables,
+ CDCMetadata[] metadata,
+ long startTs,
+ TypeInformation producedTypeInfo,
+ boolean ignoreParseErrors) {
+ this.physicalDataType = physicalDataType;
+ this.schemas = schemas;
+ this.tables = tables;
+ this.metadata = metadata;
+ this.startTs = startTs;
+ Preconditions.checkArgument(
+ createTiTimestampFromVersion(startTs).getLogical() == 0,
+ "The logical ts must be 0 for canal format");
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreParseErrors = ignoreParseErrors;
+ RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
+ this.fieldNames = new ArrayList<>(physicalRowType.getFieldNames());
+ this.dataTypes = new ArrayList<>(physicalDataType.getChildren());
+ }
+
+ @Override
+ public final RowData deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException(
+ "Please invoke deserialize(byte[] message, Collector out)");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector out) throws IOException {
+ List flatMessages;
+ try {
+ flatMessages = decodeToFlatMessage(message);
+ } catch (Exception e) {
+ if (ignoreParseErrors) {
+ LOG.warn("Can not decode to flatMessage", e);
+ return;
+ } else {
+ throw new IOException(e);
+ }
+ }
+ for (FlatMessage flatMessage : flatMessages) {
+ deserialize(flatMessage, out);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+
+ public abstract List decodeToFlatMessage(byte[] data) throws Exception;
+
+ public void deserialize(FlatMessage flatMessage, Collector out) {
+ if (!schemas.contains(flatMessage.getDatabase())) {
+ return;
+ }
+ if (!tables.contains(flatMessage.getTable())) {
+ return;
+ }
+ TiTimestamp tiTimestamp = createTiTimestampFromTs(flatMessage.getEs());
+ if (startTs > tiTimestamp.getVersion()) {
+ return;
+ }
+ List
+
+
+ com.alibaba.otter
+ canal.protocol
+ ${dep.canal.protocol.version}
+
+
+ fastjson
+ com.alibaba
+
+
+ logback-classic
+ ch.qos.logback
+
+
+ logback-core
+ ch.qos.logback
+
+
+
org.apache.flink
@@ -150,6 +171,14 @@
oshi
io.tidb.bigdata.telemetry.shade.oshi
+
+ com.google.protobuf
+ io.tidb.bigdata.shade.com.google.protobuf
+
+
+ com.alibaba.otter.canal
+ io.tidb.bigdata.shade.com.alibaba.otter.canal
+
${project.artifactId}-${project.version}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java
index a444f8bb..641a23b1 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/TiDBOptions.java
@@ -150,8 +150,13 @@ private static ConfigOption optional(String key) {
public static final String STREAMING_CODEC_JSON = "json";
public static final String STREAMING_CODEC_CRAFT = "craft";
public static final String STREAMING_CODEC_CANAL_JSON = "canal-json";
+ public static final String STREAMING_CODEC_CANAL_PROTOBUF = "canal-protobuf";
public static final Set VALID_STREAMING_CODECS =
- ImmutableSet.of(STREAMING_CODEC_CRAFT, STREAMING_CODEC_JSON, STREAMING_CODEC_CANAL_JSON);
+ ImmutableSet.of(
+ STREAMING_CODEC_CRAFT,
+ STREAMING_CODEC_JSON,
+ STREAMING_CODEC_CANAL_JSON,
+ STREAMING_CODEC_CANAL_PROTOBUF);
// Options for catalog
public static final ConfigOption IGNORE_PARSE_ERRORS =
@@ -185,6 +190,12 @@ private static ConfigOption optional(String key) {
public static final String METADATA_INCLUDED = "tidb.metadata.included";
public static final String METADATA_INCLUDED_ALL = "*";
+ // During the streaming phase, set a timestamp to avoid consuming expired data,
+ // with the default being one minute subtracted from the snapshot time.
+ // Additionally, we can set this value to less than or equal to 0 to disable this feature.
+ public static final String STARTING_OFFSETS_TS = "tidb.streaming.source.starting.offsets.ts";
+ public static final long MINUTE = 60 * 1000L;
+
/**
* see {@link org.apache.flink.connector.jdbc.table.JdbcConnectorOptions}
*
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
index 89d1406f..330c70bd 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/CDCSourceBuilder.java
@@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.tikv.common.meta.TiTimestamp;
@@ -55,6 +56,10 @@ public CDCSource canalJson() {
return doBuild(builder.canalJson());
}
+ public CDCSource canalProtobuf() {
+ return doBuild(builder.canalProtobuf());
+ }
+
private final CDCDeserializationSchemaBuilder builder;
protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) {
@@ -62,14 +67,23 @@ protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) {
}
public static KafkaCDCSourceBuilder kafka(
- String database, String table, TiTimestamp ts, TiDBSchemaAdapter schema) {
- return new KafkaCDCSourceBuilder(
- new CDCDeserializationSchemaBuilder(
- schema.getPhysicalRowDataType(), schema.getCDCMetadata())
- .startTs(ts.getVersion())
- .types(ROW_CHANGED_EVENT)
- .schemas(ImmutableSet.of(database))
- .tables(ImmutableSet.of(table)));
+ String database,
+ String table,
+ TiTimestamp ts,
+ TiDBSchemaAdapter schema,
+ long startingOffsetTs) {
+ KafkaCDCSourceBuilder kafkaCDCSourceBuilder =
+ new KafkaCDCSourceBuilder(
+ new CDCDeserializationSchemaBuilder(
+ schema.getPhysicalRowDataType(), schema.getCDCMetadata())
+ .startTs(ts.getVersion())
+ .types(ROW_CHANGED_EVENT)
+ .schemas(ImmutableSet.of(database))
+ .tables(ImmutableSet.of(table)));
+ if (startingOffsetTs > 0) {
+ kafkaCDCSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startingOffsetTs));
+ }
+ return kafkaCDCSourceBuilder;
}
@SuppressWarnings("unchecked")
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
index 3e2fb628..9f06a165 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/SnapshotSource.java
@@ -85,7 +85,8 @@ public SnapshotSource(
tiTableInfo, Arrays.asList(schema.getPhysicalFieldNamesWithoutMeta()));
this.timestamp =
getOptionalVersion()
- .orElseGet(() -> getOptionalTimestamp().orElseGet(session::getSnapshotVersion));
+ .orElseGet(
+ () -> getOptionalTimestamp().orElseGet(session::getApproximateSnapshotVersion));
TableHandleInternal tableHandleInternal =
new TableHandleInternal(this.databaseName, tiTableInfo);
this.splits =
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
index ae3c0bcf..a4279b91 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/connector/source/TiDBSourceBuilder.java
@@ -20,6 +20,7 @@
import static io.tidb.bigdata.flink.connector.TiDBOptions.IGNORE_PARSE_ERRORS;
import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC;
import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CANAL_JSON;
+import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CANAL_PROTOBUF;
import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_CRAFT;
import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_CODEC_JSON;
import static io.tidb.bigdata.flink.connector.TiDBOptions.STREAMING_SOURCE;
@@ -28,6 +29,7 @@
import static io.tidb.bigdata.flink.connector.TiDBOptions.VALID_STREAMING_CODECS;
import static io.tidb.bigdata.flink.connector.TiDBOptions.VALID_STREAMING_SOURCES;
+import io.tidb.bigdata.flink.connector.TiDBOptions;
import io.tidb.bigdata.flink.connector.source.enumerator.TiDBSourceSplitEnumerator;
import io.tidb.bigdata.tidb.ClientConfig;
import io.tidb.bigdata.tidb.expression.Expression;
@@ -35,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
@@ -102,9 +105,17 @@ private TiDBSourceBuilder setProperties(Map properties) {
return this;
}
+ private long getStartingOffsetTs(TiTimestamp timestamp) {
+ return Optional.ofNullable(properties.get(TiDBOptions.STARTING_OFFSETS_TS))
+ .filter(StringUtils::isNotEmpty)
+ .map(Long::parseLong)
+ .orElse(timestamp.getPhysical() - TiDBOptions.MINUTE);
+ }
+
private CDCSourceBuilder, ?> createCDCBuilder(TiTimestamp timestamp) {
if (streamingSource.equals(STREAMING_SOURCE_KAFKA)) {
- return CDCSourceBuilder.kafka(databaseName, tableName, timestamp, schema)
+ return CDCSourceBuilder.kafka(
+ databaseName, tableName, timestamp, schema, getStartingOffsetTs(timestamp))
.ignoreParseErrors(ignoreParseErrors)
.setProperties(properties);
} else {
@@ -150,6 +161,8 @@ private TiDBSourceBuilder setProperties(Map properties) {
return cdcBuilder.json();
case STREAMING_CODEC_CANAL_JSON:
return cdcBuilder.canalJson();
+ case STREAMING_CODEC_CANAL_PROTOBUF:
+ return cdcBuilder.canalProtobuf();
default:
throw new IllegalArgumentException(
"Invalid streaming codec: '" + streamingCodec + "'");
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java
new file mode 100644
index 00000000..76da58d7
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchema.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import io.tidb.bigdata.flink.connector.source.TiDBMetadata;
+import io.tidb.bigdata.flink.format.cdc.CDCMetadata;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tikv.common.meta.TiTimestamp;
+
+public abstract class CanalDeserializationSchema implements DeserializationSchema {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CanalDeserializationSchema.class);
+
+ private static final String INSERT = "INSERT";
+ private static final String UPDATE = "UPDATE";
+ private static final String DELETE = "DELETE";
+ private static final String CREATE = "CREATE";
+ private static final String QUERY = "QUERY";
+ private static final String TIDB_WATERMARK = "TIDB_WATERMARK";
+
+ public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ public static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
+ public static final DateTimeFormatter DATE_TIME_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ protected final DataType physicalDataType;
+ protected final Set schemas;
+ protected final Set tables;
+ protected final CDCMetadata[] metadata;
+ protected final long startTs;
+ protected final TypeInformation producedTypeInfo;
+ protected final boolean ignoreParseErrors;
+
+ protected final List fieldNames;
+ protected final List dataTypes;
+
+ protected CanalDeserializationSchema(
+ DataType physicalDataType,
+ Set schemas,
+ Set tables,
+ CDCMetadata[] metadata,
+ long startTs,
+ TypeInformation producedTypeInfo,
+ boolean ignoreParseErrors) {
+ this.physicalDataType = physicalDataType;
+ this.schemas = schemas;
+ this.tables = tables;
+ this.metadata = metadata;
+ this.startTs = startTs;
+ Preconditions.checkArgument(
+ createTiTimestampFromVersion(startTs).getLogical() == 0,
+ "The logical ts must be 0 for canal format");
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreParseErrors = ignoreParseErrors;
+ RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
+ this.fieldNames = new ArrayList<>(physicalRowType.getFieldNames());
+ this.dataTypes = new ArrayList<>(physicalDataType.getChildren());
+ }
+
+ @Override
+ public final RowData deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException(
+ "Please invoke deserialize(byte[] message, Collector out)");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector out) throws IOException {
+ List flatMessages;
+ try {
+ flatMessages = decodeToFlatMessage(message);
+ } catch (Exception e) {
+ if (ignoreParseErrors) {
+ LOG.warn("Can not decode to flatMessage", e);
+ }
+ throw new RuntimeException(e);
+ }
+ for (FlatMessage flatMessage : flatMessages) {
+ deserialize(flatMessage, out);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+
+ public abstract List decodeToFlatMessage(byte[] data) throws Exception;
+
+ public void deserialize(FlatMessage flatMessage, Collector out) {
+ if (!schemas.contains(flatMessage.getDatabase())) {
+ return;
+ }
+ if (!tables.contains(flatMessage.getTable())) {
+ return;
+ }
+ TiTimestamp tiTimestamp = createTiTimestampFromTs(flatMessage.getEs());
+ if (startTs > tiTimestamp.getVersion()) {
+ return;
+ }
+ List> data = flatMessage.getData();
+ List> old = flatMessage.getOld();
+ String type = flatMessage.getType();
+ switch (type) {
+ case CREATE:
+ case QUERY:
+ case TIDB_WATERMARK:
+ break;
+ case INSERT:
+ data.stream()
+ .map(values -> toRowData(tiTimestamp, values, RowKind.INSERT))
+ .forEach(out::collect);
+ break;
+ case DELETE:
+ data.stream()
+ .map(values -> toRowData(tiTimestamp, values, RowKind.DELETE))
+ .forEach(out::collect);
+ break;
+ case UPDATE:
+ old.stream()
+ .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_BEFORE))
+ .forEach(out::collect);
+ data.stream()
+ .map(values -> toRowData(tiTimestamp, values, RowKind.UPDATE_AFTER))
+ .forEach(out::collect);
+ break;
+ default:
+ if (!ignoreParseErrors) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown \"type\" value \"%s\". The Canal message is '%s'", type, flatMessage));
+ }
+ }
+ }
+
+ protected RowData toRowData(TiTimestamp timestamp, Map values, RowKind rowKind) {
+ // ignore case
+ values =
+ values.entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Entry::getValue));
+ GenericRowData rowData = new GenericRowData(rowKind, fieldNames.size() + metadata.length);
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String name = fieldNames.get(i);
+ DataType dataType = dataTypes.get(i);
+ String value = values.get(name);
+ rowData.setField(i, convertValue(value, dataType));
+ }
+ // concat metadata
+ for (int i = 0; i < metadata.length; i++) {
+ CDCMetadata cdcMetadata = metadata[i];
+ TiDBMetadata tiDBMetadata =
+ cdcMetadata
+ .toTiDBMetadata()
+ .orElseThrow(
+ () -> new IllegalArgumentException("Unsupported metadata: " + cdcMetadata));
+ if (cdcMetadata == CDCMetadata.SOURCE_EVENT) {
+ rowData.setField(fieldNames.size() + i, StringData.fromString(CDCMetadata.STREAMING));
+ continue;
+ }
+ rowData.setField(fieldNames.size() + i, tiDBMetadata.extract(timestamp));
+ }
+ return rowData;
+ }
+
+ public static Object convertValue(String value, DataType dataType) {
+ if (value == null) {
+ return null;
+ }
+ LogicalType logicalType = dataType.getLogicalType();
+ switch (logicalType.getTypeRoot()) {
+ case BOOLEAN:
+ return Boolean.parseBoolean(value);
+ case TINYINT:
+ return Byte.parseByte(value);
+ case SMALLINT:
+ return Short.parseShort(value);
+ case INTEGER:
+ return Integer.parseInt(value);
+ case BIGINT:
+ return Long.parseLong(value);
+ case VARCHAR:
+ return StringData.fromString(value);
+ case VARBINARY:
+ return value.getBytes();
+ case FLOAT:
+ return Float.parseFloat(value);
+ case DOUBLE:
+ return Double.parseDouble(value);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ return DecimalData.fromBigDecimal(
+ new BigDecimal(value), decimalType.getPrecision(), decimalType.getScale());
+ case DATE:
+ return (int) LocalDate.parse(value, DATE_FORMATTER).toEpochDay();
+ case TIME_WITHOUT_TIME_ZONE:
+ return (int) (LocalTime.parse(value, TIME_FORMATTER).toNanoOfDay() / 1000 / 1000);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TimestampData.fromTimestamp(
+ Timestamp.valueOf(LocalDateTime.parse(value, DATE_TIME_FORMATTER)));
+ default:
+ throw new IllegalStateException("Unsupported type root: " + logicalType.getTypeRoot());
+ }
+ }
+
+ public static TiTimestamp createTiTimestampFromVersion(long version) {
+ return new TiTimestamp(version >> 18, version & 0x3FFFF);
+ }
+
+ public static TiTimestamp createTiTimestampFromTs(long ts) {
+ return new TiTimestamp(ts, 0);
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java
new file mode 100644
index 00000000..093336d8
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalJsonDeserializationSchema.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import io.tidb.bigdata.flink.format.cdc.CDCMetadata;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+public class CanalJsonDeserializationSchema extends CanalDeserializationSchema {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public CanalJsonDeserializationSchema(
+ DataType physicalDataType,
+ Set schemas,
+ Set tables,
+ CDCMetadata[] metadata,
+ long startTs,
+ TypeInformation producedTypeInfo,
+ boolean ignoreParseErrors) {
+ super(
+ physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors);
+ }
+
+ @Override
+ public List decodeToFlatMessage(byte[] data) throws Exception {
+ return ImmutableList.of(MAPPER.readValue(data, FlatMessage.class));
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java
new file mode 100644
index 00000000..c3cfcfbe
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalMessageDeserializer.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalPacket.Ack;
+import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
+import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
+import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import java.util.Iterator;
+
+public class CanalMessageDeserializer {
+
+ public static Message deserializer(byte[] data) {
+ return deserializer(data, false);
+ }
+
+ public static Message deserializer(byte[] data, boolean lazyParseEntry) {
+ try {
+ if (data == null) {
+ return null;
+ } else {
+ Packet p = Packet.parseFrom(data);
+ switch (p.getType()) {
+ case MESSAGES:
+ if (!p.getCompression().equals(Compression.NONE)
+ && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
+ throw new CanalClientException("compression is not supported in this connector");
+ }
+
+ Messages messages = Messages.parseFrom(p.getBody());
+ Message result = new Message(messages.getBatchId());
+ if (lazyParseEntry) {
+ result.setRawEntries(messages.getMessagesList());
+ result.setRaw(true);
+ } else {
+ Iterator var5 = messages.getMessagesList().iterator();
+
+ while (var5.hasNext()) {
+ ByteString byteString = (ByteString) var5.next();
+ result.addEntry(Entry.parseFrom(byteString));
+ }
+
+ result.setRaw(false);
+ }
+
+ return result;
+ case ACK:
+ Ack ack = Ack.parseFrom(p.getBody());
+ throw new CanalClientException(
+ "something goes wrong with reason: " + ack.getErrorMessage());
+ default:
+ throw new CanalClientException("unexpected packet type: " + p.getType());
+ }
+ }
+ } catch (Exception var7) {
+ throw new CanalClientException("deserializer failed", var7);
+ }
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java
new file mode 100644
index 00000000..bc536453
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/CanalProtobufDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import com.alibaba.otter.canal.protocol.Message;
+import io.tidb.bigdata.flink.format.cdc.CDCMetadata;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+public class CanalProtobufDeserializationSchema extends CanalDeserializationSchema {
+
+ public CanalProtobufDeserializationSchema(
+ DataType physicalDataType,
+ Set schemas,
+ Set tables,
+ CDCMetadata[] metadata,
+ long startTs,
+ TypeInformation producedTypeInfo,
+ boolean ignoreParseErrors) {
+ super(
+ physicalDataType, schemas, tables, metadata, startTs, producedTypeInfo, ignoreParseErrors);
+ }
+
+ @Override
+ public List decodeToFlatMessage(byte[] data) throws Exception {
+ Message message = CanalMessageDeserializer.deserializer(data);
+ return MQMessageUtils.getFlatMessages(message);
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java
new file mode 100644
index 00000000..822da53a
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/FlatMessage.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FlatMessage implements Serializable {
+
+ private long id;
+ private String database;
+ private String table;
+ private List pkNames;
+ private Boolean isDdl;
+ private String type;
+ // binlog executeTime
+ private Long es;
+ // dml build timeStamp
+ private Long ts;
+ private String sql;
+ private Map sqlType;
+ private Map mysqlType;
+ private List> data;
+ private List> old;
+
+ @JsonProperty("_tidb")
+ private TiDB tidb;
+
+ public FlatMessage() {}
+
+ public FlatMessage(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public List getPkNames() {
+ return pkNames;
+ }
+
+ public void addPkName(String pkName) {
+ if (this.pkNames == null) {
+ this.pkNames = new ArrayList<>();
+ }
+ this.pkNames.add(pkName);
+ }
+
+ public void setPkNames(List pkNames) {
+ this.pkNames = pkNames;
+ }
+
+ public Boolean getIsDdl() {
+ return isDdl;
+ }
+
+ public void setIsDdl(Boolean isDdl) {
+ this.isDdl = isDdl;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Long getTs() {
+ return ts;
+ }
+
+ public void setTs(Long ts) {
+ this.ts = ts;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
+ public Map getSqlType() {
+ return sqlType;
+ }
+
+ public void setSqlType(Map sqlType) {
+ this.sqlType = sqlType;
+ }
+
+ public Map getMysqlType() {
+ return mysqlType;
+ }
+
+ public void setMysqlType(Map mysqlType) {
+ this.mysqlType = mysqlType;
+ }
+
+ public List> getData() {
+ return data;
+ }
+
+ public void setData(List> data) {
+ this.data = data;
+ }
+
+ public List> getOld() {
+ return old;
+ }
+
+ public void setOld(List> old) {
+ this.old = old;
+ }
+
+ public Long getEs() {
+ return es;
+ }
+
+ public void setEs(Long es) {
+ this.es = es;
+ }
+
+ public Boolean getDdl() {
+ return isDdl;
+ }
+
+ public void setDdl(Boolean ddl) {
+ isDdl = ddl;
+ }
+
+ public TiDB getTidb() {
+ return tidb;
+ }
+
+ public void setTidb(TiDB tidb) {
+ this.tidb = tidb;
+ }
+
+ @Override
+ public String toString() {
+ return "FlatMessage{"
+ + "id="
+ + id
+ + ", database='"
+ + database
+ + '\''
+ + ", table='"
+ + table
+ + '\''
+ + ", pkNames="
+ + pkNames
+ + ", isDdl="
+ + isDdl
+ + ", type='"
+ + type
+ + '\''
+ + ", es="
+ + es
+ + ", ts="
+ + ts
+ + ", sql='"
+ + sql
+ + '\''
+ + ", sqlType="
+ + sqlType
+ + ", mysqlType="
+ + mysqlType
+ + ", data="
+ + data
+ + ", old="
+ + old
+ + ", tidb="
+ + tidb
+ + '}';
+ }
+
+ public static class TiDB {
+
+ private Long commitTs;
+ private Long watermarkTs;
+
+ public Long getCommitTs() {
+ return commitTs;
+ }
+
+ public void setCommitTs(Long commitTs) {
+ this.commitTs = commitTs;
+ }
+
+ public Long getWatermarkTs() {
+ return watermarkTs;
+ }
+
+ public void setWatermarkTs(Long watermarkTs) {
+ this.watermarkTs = watermarkTs;
+ }
+
+ @Override
+ public String toString() {
+ return "TiDB{" + "commitTs=" + commitTs + ", watermarkTs=" + watermarkTs + '}';
+ }
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java
new file mode 100644
index 00000000..f5cbbda2
--- /dev/null
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/canal/MQMessageUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
+import javax.annotation.Nullable;
+
+public class MQMessageUtils {
+
+ public static List getFlatMessages(Message message) {
+ EntryRowData[] datas = MQMessageUtils.buildMessageData(message, null);
+ return MQMessageUtils.messageConverter(datas, message.getId());
+ }
+
+ public static EntryRowData[] buildMessageData(
+ Message message, @Nullable ThreadPoolExecutor executor) {
+ Optional template = Optional.ofNullable(executor).map(ExecutorTemplate::new);
+ if (message.isRaw()) {
+ List rawEntries = message.getRawEntries();
+ final EntryRowData[] datas = new EntryRowData[rawEntries.size()];
+ int i = 0;
+ for (ByteString byteString : rawEntries) {
+ final int index = i;
+ Runnable task =
+ () -> {
+ try {
+ Entry entry = Entry.parseFrom(byteString);
+ RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
+ datas[index] = new EntryRowData();
+ datas[index].entry = entry;
+ datas[index].rowChange = rowChange;
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ if (template.isPresent()) {
+ template.get().submit(task);
+ } else {
+ task.run();
+ }
+ i++;
+ }
+ template.ifPresent(ExecutorTemplate::waitForResult);
+ return datas;
+ } else {
+ final EntryRowData[] datas = new EntryRowData[message.getEntries().size()];
+ int i = 0;
+ for (Entry entry : message.getEntries()) {
+ final int index = i;
+ Runnable task =
+ () -> {
+ try {
+ RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
+ datas[index] = new EntryRowData();
+ datas[index].entry = entry;
+ datas[index].rowChange = rowChange;
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ if (template.isPresent()) {
+ template.get().submit(task);
+ } else {
+ task.run();
+ }
+ i++;
+ }
+ template.ifPresent(ExecutorTemplate::waitForResult);
+ return datas;
+ }
+ }
+
+ public static List messageConverter(EntryRowData[] datas, long id) {
+ List flatMessages = new ArrayList<>();
+ for (EntryRowData entryRowData : datas) {
+ Entry entry = entryRowData.entry;
+ RowChange rowChange = entryRowData.rowChange;
+ if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+ || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+ continue;
+ }
+
+ // build flatMessage
+ CanalEntry.EventType eventType = rowChange.getEventType();
+ FlatMessage flatMessage = new FlatMessage(id);
+ flatMessages.add(flatMessage);
+ flatMessage.setDatabase(entry.getHeader().getSchemaName());
+ flatMessage.setTable(entry.getHeader().getTableName());
+ flatMessage.setIsDdl(rowChange.getIsDdl());
+ flatMessage.setType(eventType.toString());
+ flatMessage.setEs(entry.getHeader().getExecuteTime());
+ flatMessage.setTs(System.currentTimeMillis());
+ flatMessage.setSql(rowChange.getSql());
+
+ if (!rowChange.getIsDdl()) {
+ Map sqlType = new LinkedHashMap<>();
+ Map mysqlType = new LinkedHashMap<>();
+ List> data = new ArrayList<>();
+ List> old = new ArrayList<>();
+
+ Set updateSet = new HashSet<>();
+ boolean hasInitPkNames = false;
+ for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+ if (eventType != CanalEntry.EventType.INSERT
+ && eventType != CanalEntry.EventType.UPDATE
+ && eventType != CanalEntry.EventType.DELETE) {
+ continue;
+ }
+
+ Map row = new LinkedHashMap<>();
+ List columns;
+
+ if (eventType == CanalEntry.EventType.DELETE) {
+ columns = rowData.getBeforeColumnsList();
+ } else {
+ columns = rowData.getAfterColumnsList();
+ }
+
+ for (CanalEntry.Column column : columns) {
+ if (!hasInitPkNames && column.getIsKey()) {
+ flatMessage.addPkName(column.getName());
+ }
+ sqlType.put(column.getName(), column.getSqlType());
+ mysqlType.put(column.getName(), column.getMysqlType());
+ if (column.getIsNull()) {
+ row.put(column.getName(), null);
+ } else {
+ row.put(column.getName(), column.getValue());
+ }
+ if (column.getUpdated()) {
+ updateSet.add(column.getName());
+ }
+ }
+
+ hasInitPkNames = true;
+ if (!row.isEmpty()) {
+ data.add(row);
+ }
+
+ if (eventType == CanalEntry.EventType.UPDATE) {
+ Map rowOld = new LinkedHashMap<>();
+ for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+ if (updateSet.contains(column.getName())) {
+ if (column.getIsNull()) {
+ rowOld.put(column.getName(), null);
+ } else {
+ rowOld.put(column.getName(), column.getValue());
+ }
+ }
+ }
+ if (!rowOld.isEmpty()) {
+ old.add(rowOld);
+ }
+ }
+ }
+ if (!sqlType.isEmpty()) {
+ flatMessage.setSqlType(sqlType);
+ }
+ if (!mysqlType.isEmpty()) {
+ flatMessage.setMysqlType(mysqlType);
+ }
+ if (!data.isEmpty()) {
+ flatMessage.setData(data);
+ }
+ if (!old.isEmpty()) {
+ flatMessage.setOld(old);
+ }
+ }
+ }
+ return flatMessages;
+ }
+
+ public static class EntryRowData {
+
+ public Entry entry;
+ public RowChange rowChange;
+ }
+}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java
index a52838fc..eecf6058 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/CDCDeserializationSchemaBuilder.java
@@ -18,9 +18,11 @@
import com.google.common.base.Preconditions;
import io.tidb.bigdata.cdc.Key;
+import io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema;
+import io.tidb.bigdata.flink.format.canal.CanalJsonDeserializationSchema;
+import io.tidb.bigdata.flink.format.canal.CanalProtobufDeserializationSchema;
import java.util.Set;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -87,15 +89,13 @@ public JsonDeserializationSchema json() {
}
// We use a new decoder, since canal does not follow TiCDC open protocol.
- public TiDBCanalJsonDeserializationSchema canalJson() {
- return new TiDBCanalJsonDeserializationSchema(
- physicalDataType,
- schemas,
- tables,
- metadata,
- startTs,
- typeInformation,
- ignoreParseErrors,
- TimestampFormat.SQL);
+ public CanalDeserializationSchema canalJson() {
+ return new CanalJsonDeserializationSchema(
+ physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors);
+ }
+
+ public CanalDeserializationSchema canalProtobuf() {
+ return new CanalProtobufDeserializationSchema(
+ physicalDataType, schemas, tables, metadata, startTs, typeInformation, ignoreParseErrors);
}
}
diff --git a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java
index 62d27e90..c04b35e0 100644
--- a/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java
+++ b/flink/flink-1.14/src/main/java/io/tidb/bigdata/flink/format/cdc/TiDBCanalJsonDeserializationSchema.java
@@ -47,6 +47,7 @@
import org.apache.flink.util.Collector;
import org.tikv.common.meta.TiTimestamp;
+@Deprecated
public final class TiDBCanalJsonDeserializationSchema implements DeserializationSchema {
private static final long serialVersionUID = 1L;
diff --git a/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java b/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java
new file mode 100644
index 00000000..aede34c3
--- /dev/null
+++ b/flink/flink-1.14/src/test/java/io/tidb/bigdata/flink/format/canal/CanalDeserializationSchemaTest.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2023 TiDB Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.tidb.bigdata.flink.format.canal;
+
+import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.DATE_FORMATTER;
+import static io.tidb.bigdata.flink.format.canal.CanalDeserializationSchema.TIME_FORMATTER;
+
+import com.google.common.collect.ImmutableSet;
+import io.tidb.bigdata.flink.format.cdc.CDCMetadata;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CanalDeserializationSchemaTest {
+
+ public static final DataType TYPES =
+ DataTypes.ROW(
+ DataTypes.FIELD("c1", DataTypes.TINYINT()),
+ DataTypes.FIELD("c2", DataTypes.SMALLINT()),
+ DataTypes.FIELD("c3", DataTypes.INT()),
+ DataTypes.FIELD("c4", DataTypes.INT()),
+ DataTypes.FIELD("c5", DataTypes.BIGINT()),
+ DataTypes.FIELD("c6", DataTypes.STRING()),
+ DataTypes.FIELD("c7", DataTypes.STRING()),
+ DataTypes.FIELD("c8", DataTypes.STRING()),
+ DataTypes.FIELD("c9", DataTypes.STRING()),
+ DataTypes.FIELD("c10", DataTypes.STRING()),
+ DataTypes.FIELD("c11", DataTypes.STRING()),
+ DataTypes.FIELD("c12", DataTypes.BYTES()),
+ DataTypes.FIELD("c13", DataTypes.BYTES()),
+ DataTypes.FIELD("c14", DataTypes.BYTES()),
+ DataTypes.FIELD("c15", DataTypes.BYTES()),
+ DataTypes.FIELD("c16", DataTypes.BYTES()),
+ DataTypes.FIELD("c17", DataTypes.BYTES()),
+ DataTypes.FIELD("c18", DataTypes.FLOAT()),
+ DataTypes.FIELD("c19", DataTypes.DOUBLE()),
+ DataTypes.FIELD("c20", DataTypes.DECIMAL(6, 3)),
+ DataTypes.FIELD("c21", DataTypes.DATE()),
+ DataTypes.FIELD("c22", DataTypes.TIME(0)),
+ DataTypes.FIELD("c23", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD("c24", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD("c25", DataTypes.INT()),
+ DataTypes.FIELD("c26", DataTypes.TINYINT()),
+ DataTypes.FIELD("c27", DataTypes.STRING()),
+ DataTypes.FIELD("c28", DataTypes.STRING()),
+ DataTypes.FIELD("c29", DataTypes.STRING()));
+
+ public static final byte[] JSON_DATA =
+ "{\"id\":0,\"database\":\"test\",\"table\":\"test_tidb_type\",\"pkNames\":[\"c1\"],\"isDdl\":false,\"type\":\"INSERT\",\"es\":1685085356868,\"ts\":1685085357997,\"sql\":\"\",\"sqlType\":{\"c1\":-6,\"c10\":2005,\"c11\":2005,\"c12\":2004,\"c13\":2004,\"c14\":2004,\"c15\":2004,\"c16\":2004,\"c17\":2004,\"c18\":7,\"c19\":8,\"c2\":5,\"c20\":3,\"c21\":91,\"c22\":92,\"c23\":93,\"c24\":93,\"c25\":12,\"c26\":-6,\"c27\":12,\"c28\":4,\"c29\":-7,\"c3\":4,\"c4\":4,\"c5\":-5,\"c6\":1,\"c7\":12,\"c8\":2005,\"c9\":2005},\"mysqlType\":{\"c1\":\"tinyint\",\"c10\":\"text\",\"c11\":\"longtext\",\"c12\":\"binary\",\"c13\":\"varbinary\",\"c14\":\"tinyblob\",\"c15\":\"mediumblob\",\"c16\":\"blob\",\"c17\":\"longblob\",\"c18\":\"float\",\"c19\":\"double\",\"c2\":\"smallint\",\"c20\":\"decimal\",\"c21\":\"date\",\"c22\":\"time\",\"c23\":\"datetime\",\"c24\":\"timestamp\",\"c25\":\"year\",\"c26\":\"tinyint\",\"c27\":\"json\",\"c28\":\"enum\",\"c29\":\"set\",\"c3\":\"mediumint\",\"c4\":\"int\",\"c5\":\"bigint\",\"c6\":\"char\",\"c7\":\"varchar\",\"c8\":\"tinytext\",\"c9\":\"mediumtext\"},\"data\":[{\"c1\":\"1\",\"c10\":\"texttype\",\"c11\":\"longtexttype\",\"c12\":\"binarytype\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\",\"c13\":\"varbinarytype\",\"c14\":\"tinyblobtype\",\"c15\":\"mediumblobtype\",\"c16\":\"blobtype\",\"c17\":\"longblobtype\",\"c18\":\"1.2339999675750732\",\"c19\":\"2.456789\",\"c2\":\"2\",\"c20\":\"123.456\",\"c21\":\"2023-05-26\",\"c22\":\"15:15:56\",\"c23\":\"2023-05-26 15:15:56\",\"c24\":\"2023-05-26 15:15:56\",\"c25\":\"2020\",\"c26\":\"1\",\"c27\":\"{\\\"a\\\": 1, \\\"b\\\": 2}\",\"c28\":\"1\",\"c29\":\"1\",\"c3\":\"3\",\"c4\":\"4\",\"c5\":\"5\",\"c6\":\"chartype\",\"c7\":\"varchartype\",\"c8\":\"tinytexttype\",\"c9\":\"mediumtexttype\"}],\"old\":null}"
+ .getBytes();
+ public static final RowData JSON_DECODE_RESULT =
+ GenericRowData.ofKind(
+ RowKind.INSERT,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4,
+ 5L,
+ StringData.fromString("chartype"),
+ StringData.fromString("varchartype"),
+ StringData.fromString("tinytexttype"),
+ StringData.fromString("mediumtexttype"),
+ StringData.fromString("texttype"),
+ StringData.fromString("longtexttype"),
+ ("binarytype" + new String(new byte[10])).getBytes(),
+ "varbinarytype".getBytes(),
+ "tinyblobtype".getBytes(),
+ "mediumblobtype".getBytes(),
+ "blobtype".getBytes(),
+ "longblobtype".getBytes(),
+ 1.234F,
+ 2.456789,
+ DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3),
+ (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(),
+ (int) (LocalTime.parse("15:15:56", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000),
+ TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)),
+ TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 15, 15, 56)),
+ 2020,
+ (byte) 1,
+ StringData.fromString("{\"a\": 1, \"b\": 2}"),
+ StringData.fromString("1"),
+ StringData.fromString("1"),
+ 441735015790804992L,
+ TimestampData.fromEpochMillis(441735015790804992L >> 18),
+ StringData.fromString(CDCMetadata.STREAMING));
+
+ public static final byte[] PROTOBUF_DATA =
+ Base64.getDecoder()
+ .decode(
+ "EAEYByr8DRKZCAo6CAEqBVVURi04MP2S87iFMTgCQgR0ZXN0Sg50ZXN0X3RpZGJfdHlwZVgBYg4KCXJvd3NDb3VudBIBMRACGtgHEAFQAGLRBxIhEPr//////////wEaAmMxIAEoATAAQgExUgd0aW55aW50EhcQBRoCYzIoATAAQgEyUghzbWFsbGludBIYEAQaAmMzKAEwAEIBM1IJbWVkaXVtaW50EhIQBBoCYzQoATAAQgE0UgNpbnQSHhD7//////////8BGgJjNSgBMABCATVSBmJpZ2ludBIaEAEaAmM2KAEwAEIIY2hhcnR5cGVSBGNoYXISIBAMGgJjNygBMABCC3ZhcmNoYXJ0eXBlUgd2YXJjaGFyEiMQ1Q8aAmM4KAEwAEIMdGlueXRleHR0eXBlUgh0aW55dGV4dBInENUPGgJjOSgBMABCDm1lZGl1bXRleHR0eXBlUgptZWRpdW10ZXh0EhwQ1Q8aA2MxMCgBMABCCHRleHR0eXBlUgR0ZXh0EiQQ1Q8aA2MxMSgBMABCDGxvbmd0ZXh0dHlwZVIIbG9uZ3RleHQSKhDUDxoDYzEyKAEwAEIUYmluYXJ5dHlwZQAAAAAAAAAAAABSBmJpbmFyeRImENQPGgNjMTMoATAAQg12YXJiaW5hcnl0eXBlUgl2YXJiaW5hcnkSJBDUDxoDYzE0KAEwAEIMdGlueWJsb2J0eXBlUgh0aW55YmxvYhIoENQPGgNjMTUoATAAQg5tZWRpdW1ibG9idHlwZVIKbWVkaXVtYmxvYhIcENQPGgNjMTYoATAAQghibG9idHlwZVIEYmxvYhIkENQPGgNjMTcoATAAQgxsb25nYmxvYnR5cGVSCGxvbmdibG9iEiYQBxoDYzE4KAEwAEISMS4yMzM5OTk5Njc1NzUwNzMyUgVmbG9hdBIdEAgaA2MxOSgBMABCCDIuNDU2Nzg5UgZkb3VibGUSHRADGgNjMjAoATAAQgcxMjMuNDU2UgdkZWNpbWFsEh0QWxoDYzIxKAEwAEIKMjAyMy0wNS0yNlIEZGF0ZRIbEFwaA2MyMigBMABCCDE2OjExOjI0UgR0aW1lEioQXRoDYzIzKAEwAEITMjAyMy0wNS0yNiAxNjoxMToyNFIIZGF0ZXRpbWUSKxBdGgNjMjQoATAAQhMyMDIzLTA1LTI2IDE2OjExOjI0Ugl0aW1lc3RhbXASFxAMGgNjMjUoATAAQgQyMDIwUgR5ZWFyEiAQ+v//////////ARoDYzI2KAEwAEIBMVIHdGlueWludBIjEAwaA2MyNygBMABCEHsiYSI6IDEsICJiIjogMn1SBGpzb24SFBAEGgNjMjgoATAAQgExUgRlbnVtEhwQ+f//////////ARoDYzI5KAEwAEIBMVIDc2V0Et0FCjoIASoFVVRGLTgw/ZLzuIUxOAJCBHRlc3RKDnRlc3RfdGlkYl90eXBlWAFiDgoJcm93c0NvdW50EgExEAIanAUQAVAAYpUFEiEQ+v//////////ARoCYzEgASgBMABCATJSB3RpbnlpbnQSFBAFGgJjMigBMAFSCHNtYWxsaW50EhUQBBoCYzMoATABUgltZWRpdW1pbnQSDxAEGgJjNCgBMAFSA2ludBIbEPv//////////wEaAmM1KAEwAVIGYmlnaW50EhAQARoCYzYoATABUgRjaGFyEhMQDBoCYzcoATABUgd2YXJjaGFyEhUQ1Q8aAmM4KAEwAVIIdGlueXRleHQSFxDVDxoCYzkoATABUgptZWRpdW10ZXh0EhIQ1Q8aA2MxMCgBMAFSBHRleHQSFhDVDxoDYzExKAEwAVIIbG9uZ3RleHQSFBDUDxoDYzEyKAEwAVIGYmluYXJ5EhcQ1A8aA2MxMygBMAFSCXZhcmJpbmFyeRIWENQPGgNjMTQoATABUgh0aW55YmxvYhIYENQPGgNjMTUoATABUgptZWRpdW1ibG9iEhIQ1A8aA2MxNigBMAFSBGJsb2ISFhDUDxoDYzE3KAEwAVIIbG9uZ2Jsb2ISEhAHGgNjMTgoATABUgVmbG9hdBITEAgaA2MxOSgBMAFSBmRvdWJsZRIUEAMaA2MyMCgBMAFSB2RlY2ltYWwSERBbGgNjMjEoATABUgRkYXRlEhEQXBoDYzIyKAEwAVIEdGltZRIVEF0aA2MyMygBMAFSCGRhdGV0aW1lEhYQXRoDYzI0KAEwAVIJdGltZXN0YW1wEhEQDBoDYzI1KAEwAVIEeWVhchIdEPr//////////wEaA2MyNigBMAFSB3RpbnlpbnQSERAMGgNjMjcoATABUgRqc29uEhEQBBoDYzI4KAEwAVIEZW51bRIZEPn//////////wEaA2MyOSgBMAFSA3NldA==");
+
+ public static final RowData PROTOBUF_DECODE_RESULT0 =
+ GenericRowData.ofKind(
+ RowKind.INSERT,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4,
+ 5L,
+ StringData.fromString("chartype"),
+ StringData.fromString("varchartype"),
+ StringData.fromString("tinytexttype"),
+ StringData.fromString("mediumtexttype"),
+ StringData.fromString("texttype"),
+ StringData.fromString("longtexttype"),
+ ("binarytype" + new String(new byte[10])).getBytes(),
+ "varbinarytype".getBytes(),
+ "tinyblobtype".getBytes(),
+ "mediumblobtype".getBytes(),
+ "blobtype".getBytes(),
+ "longblobtype".getBytes(),
+ 1.234F,
+ 2.456789,
+ DecimalData.fromBigDecimal(new BigDecimal("123.456"), 6, 3),
+ (int) LocalDate.parse("2023-05-26", DATE_FORMATTER).toEpochDay(),
+ (int) (LocalTime.parse("16:11:24", TIME_FORMATTER).toNanoOfDay() / 1000 / 1000),
+ TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)),
+ TimestampData.fromLocalDateTime(LocalDateTime.of(2023, 5, 26, 16, 11, 24)),
+ 2020,
+ (byte) 1,
+ StringData.fromString("{\"a\": 1, \"b\": 2}"),
+ StringData.fromString("1"),
+ StringData.fromString("1"),
+ 441735888086761472L,
+ TimestampData.fromEpochMillis(441735888086761472L >> 18),
+ StringData.fromString(CDCMetadata.STREAMING));
+
+ public static final RowData PROTOBUF_DECODE_RESULT1 =
+ GenericRowData.ofKind(
+ RowKind.INSERT,
+ (byte) 2,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 441735888086761472L,
+ TimestampData.fromEpochMillis(441735888086761472L >> 18),
+ StringData.fromString(CDCMetadata.STREAMING));
+
+ public static final Set SCHEMAS = ImmutableSet.of("test");
+
+ public static final Set TABLES = ImmutableSet.of("test_tidb_type");
+
+ public static final CDCMetadata[] METADATA =
+ new CDCMetadata[] {
+ CDCMetadata.COMMIT_VERSION, CDCMetadata.COMMIT_TIMESTAMP, CDCMetadata.SOURCE_EVENT
+ };
+
+ @Test
+ public void testJsonDecode() throws IOException {
+ ListCollector collector = new ListCollector();
+ CanalJsonDeserializationSchema canalJsonDeserializationSchema =
+ new CanalJsonDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false);
+ canalJsonDeserializationSchema.deserialize(JSON_DATA, collector);
+ RowData row = collector.getRows().get(0);
+ Assert.assertEquals(row, JSON_DECODE_RESULT);
+ }
+
+ @Test
+ public void testProtobufDecode() throws IOException {
+ ListCollector collector = new ListCollector();
+ CanalProtobufDeserializationSchema canalProtobufDeserializationSchema =
+ new CanalProtobufDeserializationSchema(TYPES, SCHEMAS, TABLES, METADATA, 0, null, false);
+ canalProtobufDeserializationSchema.deserialize(PROTOBUF_DATA, collector);
+ Assert.assertEquals(PROTOBUF_DECODE_RESULT0, collector.getRows().get(0));
+ Assert.assertEquals(PROTOBUF_DECODE_RESULT1, collector.getRows().get(1));
+ }
+
+ public static class ListCollector implements Collector {
+
+ private final List rows = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ rows.add(record);
+ }
+
+ @Override
+ public void close() {}
+
+ public List getRows() {
+ return rows;
+ }
+ }
+}
diff --git a/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java b/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java
index 430f3821..cc0cc32f 100644
--- a/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java
+++ b/tidb/src/main/java/io/tidb/bigdata/tidb/ClientSession.java
@@ -492,6 +492,17 @@ public TiTimestamp getSnapshotVersion() {
return session.getTimestamp();
}
+ /**
+ * Most of the time, if the user does not explicitly specify a snapshot time, we don't need to
+ * fetch the latest snapshot. A "relatively recent" snapshot is sufficient, which is crucial for
+ * merging CDC data.
+ *
+ * @return Current version with zero logical time.
+ */
+ public TiTimestamp getApproximateSnapshotVersion() {
+ return new TiTimestamp(session.getTimestamp().getPhysical(), 0);
+ }
+
public TiSession getTiSession() {
return session;
}