Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-connector][oceanbase] support debezium deserializer #2874

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 42 additions & 70 deletions docs/content/connectors/oceanbase-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.2</version>
<version>2.4.8</version>
</dependency>
```

Expand All @@ -31,9 +31,9 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。

下载[flink-sql-connector-oceanbase-cdc-3.0-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.5-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar) 到 `<FLINK_HOME>/lib/` 目录下。

**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取
**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,当前已发布的所有版本都可以在 [Maven 中央仓库](https://central.sonatype.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc/versions)获取

对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar)下载,然后放到 `<FLINK_HOME>/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`。
对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://central.sonatype.com/artifact/com.oceanbase/oceanbase-client/versions)下载,然后放到 `<FLINK_HOME>/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`。

### 配置 OceanBase 数据库和 oblogproxy 服务

Expand All @@ -47,7 +47,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
```

3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。
3. 为你想要监控的非 sys 租户创建一个用户,这个用户用来读取快照数据和变化事件数据。
4. OceanBase 社区版用户需要获取`rootserver-list`,可以使用以下命令获取:

```bash
Expand All @@ -59,7 +59,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
mysql> show parameters like 'obconfig_url';
```

5. 按照 [文档](https://github.com/oceanbase/oblogproxy#getting-started) 配置 oblogproxy。
5. 按照 [文档](https://www.oceanbase.com/docs/oblogproxy-doc) 配置 oblogproxy (CDC 模式)

## 创建 OceanBase CDC 表

Expand Down Expand Up @@ -128,15 +128,15 @@ Flink SQL> CREATE TABLE orders (
);
```

您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/oceanbase-tutorial-zh.html)。
您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](../快速上手/oceanbase-tutorial-zh.md)。

## OceanBase CDC 连接器选项

OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表所示。

*注意*:连接器支持两种方式来指定需要监听的表,两种方式同时使用时会监听两种方式匹配的所有表
1. 使用 `database-name` 和 `table-name` 匹配正则表达式中的数据库和表名。 由于`obcdc`(以前的`liboblog`)现在只支持`fnmatch`匹配,我们不能直接使用正则过滤 changelog 事件,所以通过两个选项去匹配去指定监听表只能在`initial`启动模式下使用
2. 使用 `table-list` 去匹配数据库名和表名的准确列表
*注意*:连接器支持两种方式来指定需要监听的表,最终的监听范围会取两者的并集
1. 使用 `database-name` 和 `table-name` 通过正则表达式匹配正数据库名和表名
2. 使用 `table-list` 精确匹配数据库名和表名

<div class="highlight">
<table class="colwidths-auto docutils">
Expand Down Expand Up @@ -198,14 +198,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>否</td>
<td style="word-wrap: break-word;">无</td>
<td>String</td>
<td>待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。</td>
<td>待监控 OceanBase 数据库的数据库名,应该是正则表达式。</td>
</tr>
<tr>
<td>table-name</td>
<td>否</td>
<td style="word-wrap: break-word;">无</td>
<td>String</td>
<td>待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。</td>
<td>待监控 OceanBase 数据库的表名,应该是正则表达式。</td>
</tr>
<tr>
<td>table-list</td>
Expand All @@ -216,14 +216,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
</tr>
<tr>
<td>hostname</td>
<td></td>
<td></td>
<td style="word-wrap: break-word;">无</td>
<td>String</td>
<td>OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。</td>
</tr>
<tr>
<td>port</td>
<td></td>
<td></td>
<td style="word-wrap: break-word;">无</td>
<td>Integer</td>
<td>
Expand All @@ -244,8 +244,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>
数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。<br>
合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,<br>
如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。
可以是时区或格式为"±hh:mm"的 UTC 时区偏移量。<br>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -274,7 +273,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>否</td>
<td style="word-wrap: break-word;">无</td>
<td>String</td>
<td>OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,<br>多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。</td>
<td>OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。</td>
</tr>
<tr>
<td>config-url</td>
Expand Down Expand Up @@ -318,6 +317,19 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>传递参数到<code>libobcdc</code>,如 'obcdc.properties.sort_trans_participants' = '1'。更多参数信息见 <a href="https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000221094">obcdc 配置项说明</a>。</td>
</tr>
<tr>
<td>debezium.*</td>
<td>否</td>
<td style="word-wrap: break-word;">无</td>
<td>String</td>
<td>
传入 Debezium 属性,目前支持以下属性:<br>
<li><code>debezium.decimal.handling.mode</code></li>
<li><code>debezium.time.precision.mode</code></li>
<li><code>debezium.binary.handling.mode</code></li>
详情可参照 debezium 的 <a href="https://debezium.io/documentation/reference/nightly/connectors/mysql.html">mysql</a> 和 <a href="https://debezium.io/documentation/reference/nightly/connectors/oracle.html">oracle</a> 连接器的文档。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -413,49 +425,18 @@ OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogcl
OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction:

```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.time.Duration;

public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
ResolvedSchema resolvedSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("name", DataTypes.STRING().notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));

RowType physicalDataType =
(RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(physicalDataType);
String serverTimeZone = "+00:00";

OceanBaseDeserializationSchema<RowData> deserializer =
RowDataOceanBaseDeserializationSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(resultTypeInfo)
.setServerTimeZone(ZoneId.of(serverTimeZone))
.build();

SourceFunction<RowData> oceanBaseSource =
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<RowData>builder()
.rsList("127.0.0.1:2882:2881")
.startupMode(StartupMode.INITIAL)
Expand All @@ -470,8 +451,8 @@ public class OceanBaseSourceExample {
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer)
.serverTimeZone("+00:00")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -573,9 +554,8 @@ public class OceanBaseSourceExample {
where 38 < p <=65 </td>
<td>STRING</td>
<td>
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。<br>
但在 Flink 中,DECIMAL 的最高精度为 38。因此,<br>
如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的最高精度为 38。
因此,如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。
</td>
</tr>
<tr>
Expand All @@ -589,13 +569,11 @@ public class OceanBaseSourceExample {
<td></td>
</tr>
<tr>
<td>DATETIME [(p)]</td>
<td>TIMESTAMP [(p)]</td>
<td></td>
</tr>
<tr>
<td>
DATETIME [(p)]<br>
TIMESTAMP [(p)]
</td>
<td>TIMESTAMP [(p)]</td>
<td>TIMESTAMP_LTZ [(p)]</td>
<td></td>
</tr>
<tr>
Expand Down Expand Up @@ -657,8 +635,7 @@ public class OceanBaseSourceExample {
<td>SET</td>
<td>ARRAY&lt;STRING&gt;</td>
<td>
因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,<br>
所以映射到 Flink 时是一个字符串数组
因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,所以映射到 Flink 时是一个字符串数组。
</td>
</tr>
<tr>
Expand Down Expand Up @@ -746,20 +723,15 @@ public class OceanBaseSourceExample {
<td>
CHAR(n)<br>
NCHAR(n)<br>
VARCHAR(n)<br>
VARCHAR2(n)<br>
NVARCHAR2(n)<br>
CLOB<br>
CLOB
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
RAW<br>
BLOB<br>
ROWID
</td>
<td>BLOB</td>
<td>BYTES</td>
<td></td>
</tr>
Expand Down
Loading
Loading