Skip to content

Commit

Permalink
[cdc-connector][oceanbase] support debezium deserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Dec 28, 2023
1 parent 7ab1e87 commit e99a5bc
Show file tree
Hide file tree
Showing 26 changed files with 1,763 additions and 1,518 deletions.
112 changes: 42 additions & 70 deletions docs/content/connectors/oceanbase-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.2</version>
<version>2.4.7.1</version>
</dependency>
```

Expand All @@ -31,9 +31,9 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。

下载[flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.5-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.5-SNAPSHOT.jar)`<FLINK_HOME>/lib/` 目录下。

**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取
**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,当前已发布的所有版本都可以在 [Maven 中央仓库](https://central.sonatype.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc/versions)获取

对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar)下载,然后放到 `<FLINK_HOME>/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`
对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://central.sonatype.com/artifact/com.oceanbase/oceanbase-client/versions)下载,然后放到 `<FLINK_HOME>/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`

### 配置 OceanBase 数据库和 oblogproxy 服务

Expand All @@ -47,7 +47,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
```

3. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。
3. 为你想要监控的非 sys 租户创建一个用户,这个用户用来读取快照数据和变化事件数据。
4. OceanBase 社区版用户需要获取`rootserver-list`,可以使用以下命令获取:

```bash
Expand All @@ -59,7 +59,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
mysql> show parameters like 'obconfig_url';
```

5. 按照 [文档](https://github.com/oceanbase/oblogproxy#getting-started) 配置 oblogproxy。
5. 按照 [文档](https://www.oceanbase.com/docs/oblogproxy-doc) 配置 oblogproxy。

## 创建 OceanBase CDC 表

Expand Down Expand Up @@ -128,15 +128,15 @@ Flink SQL> CREATE TABLE orders (
);
```

您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/oceanbase-tutorial-zh.html)。
您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](../快速上手/oceanbase-tutorial-zh.md)。

## OceanBase CDC 连接器选项

OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表所示。

*注意*:连接器支持两种方式来指定需要监听的表,两种方式同时使用时会监听两种方式匹配的所有表
1. 使用 `database-name``table-name` 匹配正则表达式中的数据库和表名。 由于`obcdc`(以前的`liboblog`)现在只支持`fnmatch`匹配,我们不能直接使用正则过滤 changelog 事件,所以通过两个选项去匹配去指定监听表只能在`initial`启动模式下使用
2. 使用 `table-list` 去匹配数据库名和表名的准确列表
*注意*:连接器支持两种方式来指定需要监听的表,最终的监听范围会取两者的并集
1. 使用 `database-name``table-name` 通过正则表达式匹配正数据库名和表名
2. 使用 `table-list` 精确匹配数据库名和表名

<div class="highlight">
<table class="colwidths-auto docutils">
Expand Down Expand Up @@ -198,14 +198,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用</td>
<td>待监控 OceanBase 数据库的数据库名,应该是正则表达式。</td>
</tr>
<tr>
<td>table-name</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用</td>
<td>待监控 OceanBase 数据库的表名,应该是正则表达式。</td>
</tr>
<tr>
<td>table-list</td>
Expand All @@ -216,14 +216,14 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
</tr>
<tr>
<td>hostname</td>
<td></td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。</td>
</tr>
<tr>
<td>port</td>
<td></td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>Integer</td>
<td>
Expand All @@ -244,8 +244,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>
数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。<br>
合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,<br>
如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。
可以是时区或格式为"±hh:mm"的 UTC 时区偏移量。<br>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -274,7 +273,7 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`<br>多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。</td>
<td>OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。</td>
</tr>
<tr>
<td>config-url</td>
Expand Down Expand Up @@ -318,6 +317,19 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>传递参数到<code>libobcdc</code>,如 'obcdc.properties.sort_trans_participants' = '1'。更多参数信息见 <a href="https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000221094">obcdc 配置项说明</a></td>
</tr>
<tr>
<td>debezium.*</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>
传入 Debezium 属性,目前支持以下属性:<br>
<li><code>debezium.decimal.handling.mode</code></li>
<li><code>debezium.time.precision.mode</code></li>
<li><code>debezium.binary.handling.mode</code></li>
详情可参照 debezium 的 <a href="https://debezium.io/documentation/reference/nightly/connectors/mysql.html">mysql</a><a href="https://debezium.io/documentation/reference/nightly/connectors/oracle.html">oracle</a> 连接器的文档。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -413,49 +425,18 @@ OceanBase CDC 连接器使用 [oblogclient](https://github.com/oceanbase/oblogcl
OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以按照如下创建一个 SourceFunction:

```java
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.time.Duration;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception {
ResolvedSchema resolvedSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("name", DataTypes.STRING().notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
RowType physicalDataType =
(RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType();
TypeInformation<RowData> resultTypeInfo = InternalTypeInfo.of(physicalDataType);
String serverTimeZone = "+00:00";
OceanBaseDeserializationSchema<RowData> deserializer =
RowDataOceanBaseDeserializationSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setResultTypeInfo(resultTypeInfo)
.setServerTimeZone(ZoneId.of(serverTimeZone))
.build();
SourceFunction<RowData> oceanBaseSource =
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<RowData>builder()
.rsList("127.0.0.1:2882:2881")
.startupMode(StartupMode.INITIAL)
Expand All @@ -470,8 +451,8 @@ public class OceanBaseSourceExample {
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
.deserializer(deserializer)
.serverTimeZone("+00:00")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -573,9 +554,8 @@ public class OceanBaseSourceExample {
where 38 < p <=65 </td>
<td>STRING</td>
<td>
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。<br>
但在 Flink 中,DECIMAL 的最高精度为 38。因此,<br>
如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。
DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的最高精度为 38。
因此,如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。
</td>
</tr>
<tr>
Expand All @@ -589,13 +569,11 @@ public class OceanBaseSourceExample {
<td></td>
</tr>
<tr>
<td>DATETIME [(p)]</td>
<td>TIMESTAMP [(p)]</td>
<td></td>
</tr>
<tr>
<td>
DATETIME [(p)]<br>
TIMESTAMP [(p)]
</td>
<td>TIMESTAMP [(p)]</td>
<td>TIMESTAMP_LTZ [(p)]</td>
<td></td>
</tr>
<tr>
Expand Down Expand Up @@ -657,8 +635,7 @@ public class OceanBaseSourceExample {
<td>SET</td>
<td>ARRAY&lt;STRING&gt;</td>
<td>
因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,<br>
所以映射到 Flink 时是一个字符串数组
因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,所以映射到 Flink 时是一个字符串数组。
</td>
</tr>
<tr>
Expand Down Expand Up @@ -746,20 +723,15 @@ public class OceanBaseSourceExample {
<td>
CHAR(n)<br>
NCHAR(n)<br>
VARCHAR(n)<br>
VARCHAR2(n)<br>
NVARCHAR2(n)<br>
CLOB<br>
CLOB
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
RAW<br>
BLOB<br>
ROWID
</td>
<td>BLOB</td>
<td>BYTES</td>
<td></td>
</tr>
Expand Down
Loading

0 comments on commit e99a5bc

Please sign in to comment.