Skip to content

Commit

Permalink
Support native canal format (#265)
Browse files Browse the repository at this point in the history
* Support native canal format
  • Loading branch information
humengyu2012 authored May 31, 2023
1 parent 0f0972a commit 72b54ec
Show file tree
Hide file tree
Showing 34 changed files with 2,469 additions and 74 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/flink-stream-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ jobs:
kafka/bin/kafka-server-start.sh kafka/config/server.properties &
- name: deploy ticdc
run: /home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc &
run: |
/home/runner/.tiup/bin/tiup cdc server --pd=http://localhost:2379 --log-file=/tmp/ticdc/ticdc.log --addr=0.0.0.0:8301 --advertise-addr=127.0.0.1:8301 --data-dir=/tmp/log/ticdc &
sleep 10
- name: create cdc changefeed
run: |
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format

**Compiling TiBigData requires git and downloading source code directly is not supported.**

[Flink-TiDB-Connector](./flink/README.md)
[Flink-TiDB-Connector(Batch)](./flink/README.md)

[Flink-TiDB-Connector(Unified Batch & Streaming)](./flink/README_unified_batch_streaming.md)

[PrestoSQL-TiDB-Connector](./prestosql/README.md)

Expand Down
4 changes: 3 additions & 1 deletion README_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ mvn com.coveo:fmt-maven-plugin:format

**编译 TiBigData 需要 git, 直接下载源码编译暂不支持。**

[TiDB 与 Flink 集成](./flink/README_zh_CN.md)
[TiDB 与 Flink 集成(批处理)](./flink/README_zh_CN.md)

[TiDB 与 Flink 集成(流批一体)](./flink/README_unified_batch_streaming_zh_CN.md)

[TiDB 与 PrestoSQL 集成 ***- 已废弃***](./prestosql/README_zh_CN.md)

Expand Down
27 changes: 14 additions & 13 deletions flink/README_unified_batch_streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,26 @@ Keypoints

In addition to supporting the configuration in [TiDB Batch Mode](./README.md), the streaming mode adds the following configuration:

| Configuration | Default Value | Description |
|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------------------------------------------|
| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. |
| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. |
| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json. See [Codec](#8-Codec) |
| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address |
| tidb.streaming.kafka.topic | - | Kafka topic |
| tidb.streaming.kafka.group.id | - | Kafka group id |
| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure |
| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) |
| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` |
| Configuration | Default Value | Description |
|:---------------------------------------|:--------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| tidb.source.semantic | at-least-once | TiDB batch stage consumption semantics, which takes effect when the read data fails, optionally at-least-once and exactly-once. |
| tidb.streaming.source | - | The data source(messaging system) where TiDB's change logs are stored, currently only supports Kafka and Pulsar will be supported later. |
| tidb.streaming.codec | craft | TiDB's change log encoding method, currently supports json(called default in the lower version of tidb), craft, canal-json, canal-protobuf. See [Codec](#8-Codec) |
| tidb.streaming.kafka.bootstrap.servers | - | Kafka server address |
| tidb.streaming.kafka.topic | - | Kafka topic |
| tidb.streaming.kafka.group.id | - | Kafka group id |
| tidb.streaming.ignore-parse-errors | false | Whether to ignore exceptions in case of decoding failure |
| tidb.metadata.included | - | TiDB Metadata, see [TiDB Metadata](#9-TiDB-Metadata) |
| tikv.sink.delete_enable | false | Whether enable delete in streaming, this config only works in `tidb.sink.impl=TIKV` |

## 8 Codec

TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json.
TiBigData supports several TiCDC encoding types, namely json(called default in the lower version of tidb), craft, and canal-json, canal-protobuf.

1. json is the default implementation of TiCDC and is highly readable;
2. craft sacrifices readability, is fully binary encoded, has higher compression, and requires a high version of TiDB(5.x). Currently, craft is still incubating, but it is working properly;
3. canal-json is compatible with canal and must be used with the TiDB extension field enabled to read `commitTs`. Lower versions of TiDB do not have `commitTs`, so it cannot be used.
3. canal-json is the compatibility for the canal json format;
4. canal-protobuf is the compatibility for the canal protobuf format;

## 9 TiDB Metadata

Expand Down
27 changes: 14 additions & 13 deletions flink/README_unified_batch_streaming_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,26 @@ DELETE FROM `test`.`source_table` WHERE id = 1 or id = 2;

除了支持 [TiDB 批模式](./README_zh_CN.md) 中的配置外,流模式新增了以下配置:

| Configuration | Default Value | Description |
|:---------------------------------------|:--------------|:-----------------------------------------------------------------------------------------------|
| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 |
| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 |
| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json 三种格式,详细信息参考 [Codec](#8-Codec) |
| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 |
| tidb.streaming.kafka.topic | - | Kafka topic |
| tidb.streaming.kafka.group.id | - | Kafka group id |
| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 |
| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) |
| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 |
| Configuration | Default Value | Description |
|:---------------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------------|
| tidb.source.semantic | at-least-once | TiDB 批阶段的消费语义,读取数据失败时才会生效,可选 at-least-once 与 exactly-once。 |
| tidb.streaming.source | - | TiDB 的变更日志存放的数据源(消息系统),当前只支持配置 Kafka,后续会支持 Pulsar。 |
| tidb.streaming.codec | craft | TiDB 的变更日志选取的编码方式,当前支持 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf 四种格式,详细信息参考 [Codec](#8-Codec) |
| tidb.streaming.kafka.bootstrap.servers | - | Kafka server 地址 |
| tidb.streaming.kafka.topic | - | Kafka topic |
| tidb.streaming.kafka.group.id | - | Kafka group id |
| tidb.streaming.ignore-parse-errors | false | 在解码失败时,是否忽略异常 |
| tidb.metadata.included | - | TiDB 元数据列,详细信息参考 [TiDB Metadata](#9-TiDB-Metadata) |
| tidb.sink.delete_enable | false | 是否在流模式中开启删除,这个配置只有当 `tidb.sink.impl=TIKV` 时才会生效 |

## 8 Codec

TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json.
TiBigData 支持多种 TiCDC 的编码类型,分别是 json(低版本 TiDB 叫 default),craft,canal-json,canal-protobuf.

1. json 是 TiCDC 的默认实现,具有很强的可读性;
2. craft 牺牲了可读性,是完全二进制的编码方式,具有更高的压缩率,需要高版本 TiDB(5.x),当前还在孵化中,但是已经能够正常使用;
3. canal-json 是对 canal 的兼容,使用时必须开启 TiDB 扩展字段以读取 commitTs,低版本的 TiDB 没有这个字段,所以不能使用。
3. canal-json 是对 canal json 格式的兼容;
4. canal-protobuf 是对 canal protobuf 格式的兼容。

## 9 TiDB Metadata

Expand Down
29 changes: 29 additions & 0 deletions flink/flink-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<properties>
<dep.flink.version>1.13.0</dep.flink.version>
<dep.canal.protocol.version>1.1.6</dep.canal.protocol.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -43,6 +44,26 @@
<artifactId>commons-beanutils</artifactId>
<version>${dep.apache.commons.version}</version>
</dependency>
<!-- canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${dep.canal.protocol.version}</version>
<exclusions>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
<exclusion>
<artifactId>logback-core</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -132,6 +153,14 @@
<pattern>oshi</pattern>
<shadedPattern>io.tidb.bigdata.telemetry.shade.oshi</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>io.tidb.bigdata.shade.com.google.protobuf</shadedPattern>
</relocation>
<relocation>
<pattern>com.alibaba.otter.canal</pattern>
<shadedPattern>io.tidb.bigdata.shade.com.alibaba.otter.canal</shadedPattern>
</relocation>
</relocations>
<finalName>${project.artifactId}-${project.version}</finalName>
<transformers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableSet;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -55,21 +56,34 @@ public CDCSource<SplitT, EnumChkT> canalJson() {
return doBuild(builder.canalJson());
}

public CDCSource<SplitT, EnumChkT> canalProtobuf() {
return doBuild(builder.canalProtobuf());
}

private final CDCDeserializationSchemaBuilder builder;

protected CDCSourceBuilder(CDCDeserializationSchemaBuilder builder) {
this.builder = builder;
}

public static KafkaCDCSourceBuilder kafka(
String database, String table, TiTimestamp ts, TiDBSchemaAdapter schema) {
return new KafkaCDCSourceBuilder(
new CDCDeserializationSchemaBuilder(
schema.getPhysicalRowDataType(), schema.getCDCMetadata())
.startTs(ts.getVersion())
.types(ROW_CHANGED_EVENT)
.schemas(ImmutableSet.of(database))
.tables(ImmutableSet.of(table)));
String database,
String table,
TiTimestamp ts,
TiDBSchemaAdapter schema,
long startingOffsetTs) {
KafkaCDCSourceBuilder kafkaCDCSourceBuilder =
new KafkaCDCSourceBuilder(
new CDCDeserializationSchemaBuilder(
schema.getPhysicalRowDataType(), schema.getCDCMetadata())
.startTs(ts.getVersion())
.types(ROW_CHANGED_EVENT)
.schemas(ImmutableSet.of(database))
.tables(ImmutableSet.of(table)));
if (startingOffsetTs > 0) {
kafkaCDCSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startingOffsetTs));
}
return kafkaCDCSourceBuilder;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public SnapshotSource(
tiTableInfo, Arrays.asList(schema.getPhysicalFieldNamesWithoutMeta()));
this.timestamp =
getOptionalVersion()
.orElseGet(() -> getOptionalTimestamp().orElseGet(session::getSnapshotVersion));
.orElseGet(
() -> getOptionalTimestamp().orElseGet(session::getApproximateSnapshotVersion));
TableHandleInternal tableHandleInternal =
new TableHandleInternal(this.databaseName, tiTableInfo);
this.splits =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ private static ConfigOption<String> optional(String key) {
public static final String STREAMING_CODEC_JSON = "json";
public static final String STREAMING_CODEC_CRAFT = "craft";
public static final String STREAMING_CODEC_CANAL_JSON = "canal-json";
public static final String STREAMING_CODEC_CANAL_PROTOBUF = "canal-protobuf";
public static final Set<String> VALID_STREAMING_CODECS =
ImmutableSet.of(STREAMING_CODEC_CRAFT, STREAMING_CODEC_JSON, STREAMING_CODEC_CANAL_JSON);
ImmutableSet.of(
STREAMING_CODEC_CRAFT,
STREAMING_CODEC_JSON,
STREAMING_CODEC_CANAL_JSON,
STREAMING_CODEC_CANAL_PROTOBUF);

// Options for catalog
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
Expand Down Expand Up @@ -118,6 +123,12 @@ private static ConfigOption<String> optional(String key) {
public static final String METADATA_INCLUDED = "tidb.metadata.included";
public static final String METADATA_INCLUDED_ALL = "*";

// During the streaming phase, set a timestamp to avoid consuming expired data,
// with the default being one minute subtracted from the snapshot time.
// Additionally, we can set this value to less than or equal to 0 to disable this feature.
public static final String STARTING_OFFSETS_TS = "tidb.streaming.source.starting.offsets.ts";
public static final long MINUTE = 60 * 1000L;

public static Set<ConfigOption<?>> requiredOptions() {
return withMoreRequiredOptions();
}
Expand Down
Loading

0 comments on commit 72b54ec

Please sign in to comment.