diff --git a/docs/content.zh/docs/connectors/doris.md b/docs/content.zh/docs/connectors/doris.md index 61b88d0dfc..186bdd1ab1 100644 --- a/docs/content.zh/docs/connectors/doris.md +++ b/docs/content.zh/docs/connectors/doris.md @@ -26,9 +26,9 @@ under the License. # Doris Connector -This article introduces of Doris Connector +本文介绍了Pipeline Doris Connector的使用。 -## Example +## 示例 ```yaml @@ -49,145 +49,145 @@ pipeline: ``` -## Connector Options +## 连接器配置项
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - - - - - + + + + + + - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
typerequired(none)StringSpecify the Sink to use, here is 'doris'.
nameoptional(none)String Name of PipeLine
fenodesrequired(none)StringHttp address of Doris cluster FE, such as 127.0.0.1:8030
benodesoptional(none)StringHttp address of Doris cluster BE, such as 127.0.0.1:8040
jdbc-urloptional(none)StringJDBC address of Doris cluster, for example: jdbc:mysql://127.0.0.1:9030/db
usernamerequired(none)StringUsername of Doris cluster
passwordoptional(none)StringPassword for Doris cluster
auto-redirectoptionalfalseString Whether to write through FE redirection and directly connect to BE to write
sink.enable.batch-modeoptionaltrueBoolean Whether to use the batch method to write to Doris
sink.flush.queue-sizeoptional2Integer Queue size for batch writing -
sink.buffer-flush.max-rowsoptional50000IntegerMaximum number of Flush records in a single batch
sink.buffer-flush.max-bytesoptional10485760(10MB)IntegerMaximum number of bytes flushed in a single batch
sink.buffer-flush.intervaloptional10sStringFlush interval duration. If this time is exceeded, the data will be flushed asynchronously
OptionRequiredDefaultTypeDescription
typerequired(none)String指定要使用的Sink, 这里是 'doris'.
nameoptional(none)String PipeLine的名称
sink.properties.optional(none)String Parameters of StreamLoad. - For example: sink.properties.strict_mode: true. - See more about StreamLoad Properties properties
fenodesrequired(none)StringDoris集群FE的Http地址, 比如 127.0.0.1:8030
table.create.properties.*optional(none)StringCreate the Properties configuration of the table. - For example: table.create.properties.replication_num: 1. - See more about Doris Table Properties properties
benodesoptional(none)StringDoris集群BE的Http地址, 比如 127.0.0.1:8040
jdbc-urloptional(none)StringDoris集群的JDBC地址,比如:jdbc:mysql://127.0.0.1:9030/db
usernamerequired(none)StringDoris集群的用户名
passwordoptional(none)StringDoris集群的密码
auto-redirectoptionalfalseString 是否通过FE重定向写入,直连BE写入
sink.enable.batch-modeoptionaltrueBoolean 是否使用攒批方式写入Doris
sink.flush.queue-sizeoptional2Integer 攒批写入的队列大小 +
sink.buffer-flush.max-rowsoptional50000Integer单个批次最大Flush的记录数
sink.buffer-flush.max-bytesoptional10485760(10MB)Integer单个批次最大Flush的字节数
sink.buffer-flush.intervaloptional10sStringFlush的间隔时长,超过这个时间,将异步Flush数据
sink.properties.optional(none)StringStreamLoad的参数。 + For example: sink.properties.strict_mode: true. + 查看更多关于 StreamLoad的Properties 属性
table.create.properties.*optional(none)String创建表的Properties配置。 + For example: table.create.properties.replication_num: 1. + 查看更多关于 Doris Table 的 Properties 属性
-## Data Type Mapping +## 数据类型映射
- - - + + + @@ -251,12 +251,12 @@ pipeline: - + - +
Flink CDC TypeDoris TypeNoteCDC typeDoris typeNOTE
CHAR(n) CHAR(n*3)In Doris, strings are stored in UTF-8 encoding, so English characters occupy 1 byte and Chinese characters occupy 3 bytes. The length here is multiplied by 3. The maximum length of CHAR is 255. Once exceeded, it will automatically be converted to VARCHAR type.在Doris中,字符串是以UTF-8编码存储的,所以英文字符占1个字节,中文字符占3个字节。这里的长度统一乘3,CHAR最大的长度是255,超过后会自动转为VARCHAR类型
VARCHAR(n) VARCHAR(n*3)Same as above. The length here is multiplied by 3. The maximum length of VARCHAR is 65533. Once exceeded, it will automatically be converted to STRING type.同上,这里的长度统一乘3,VARCHAR最大的长度是65533,超过后会自动转为STRING类型
diff --git a/docs/content.zh/docs/connectors/mysql.md b/docs/content.zh/docs/connectors/mysql.md index 455e22edfe..21dcce7676 100644 --- a/docs/content.zh/docs/connectors/mysql.md +++ b/docs/content.zh/docs/connectors/mysql.md @@ -26,13 +26,12 @@ under the License. # MySQL Connector -MySQL connector allows reading snapshot data and incremental data from MySQL database and provides end-to-end full-database data synchronization capabilities. -This document describes how to setup the MySQL connector. +MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。 -## Example +## 示例 -An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows: +从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下: ```yaml source: @@ -57,7 +56,7 @@ pipeline: parallelism: 4 ``` -## Connector Options +## 连接器配置项
@@ -76,195 +75,187 @@ pipeline: - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - - - - + + + + + - - - - - + + + + + - - - - - + + + + + - - - - - + + + + + - - - - - + + + + + - + - +
required (none) StringIP address or hostname of the MySQL database server. MySQL 数据库服务器的 IP 地址或主机名。
port optional 3306 IntegerInteger port number of the MySQL database server.MySQL 数据库服务器的整数端口号。
username required (none) StringName of the MySQL database to use when connecting to the MySQL database server.连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。
password required (none) StringPassword to use when connecting to the MySQL database server.连接 MySQL 数据库服务器时使用的密码。
tables required (none) StringTable name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions.
- It is important to note that the dot (.) is treated as a delimiter for database and table names. - If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.
- eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
+ 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
+ 例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
schema-change.enabled optional true BooleanWhether to send schema change events, so that downstream sinks can respond to schema changes and achieve table structure synchronization.是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。
server-id optional (none) StringA numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', - the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. - Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster - as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, - though we recommend setting an explicit value. 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', + 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 +
scan.incremental.snapshot.chunk.size optional 8096 IntegerThe chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。
scan.snapshot.fetch.size optional 1024 IntegerThe maximum fetch size for per poll when read table snapshot.读取表快照时每次读取数据的最大条数。
scan.startup.mode optional initial StringOptional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset" and "timestamp". - Please see Startup Reading Position section for more detailed information. MySQL CDC 消费者可选的启动模式, + 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 + 请查阅 启动模式 章节了解更多详细信息。
scan.startup.specific-offset.file optional (none) StringOptional binlog file name used in case of "specific-offset" startup mode在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。
scan.startup.specific-offset.pos optional (none) LongOptional binlog file position used in case of "specific-offset" startup mode在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。
scan.startup.specific-offset.gtid-set optional (none) StringOptional GTID set used in case of "specific-offset" startup mode在 "specific-offset" 启动模式下,启动位点的 GTID 集合。
scan.startup.specific-offset.skip-events optional (none) LongOptional number of events to skip after the specific starting offset在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows optional (none) LongOptional number of rows to skip after the specific starting offset在指定的启动位点后需要跳过的数据行数量。
connect.timeoutoptional30sDurationThe maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.connect.timeoutoptional30sDuration连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。
connect.max-retriesoptional3IntegerThe max retry times that the connector should retry to build MySQL database server connection.connect.max-retriesoptional3Integer连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。
connection.pool.sizeoptional20IntegerThe connection pool size.connection.pool.sizeoptional20Integer连接池大小。
jdbc.properties.*optional20StringOption to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.jdbc.properties.*optional20String传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.
heartbeat.intervaloptional30sDurationThe interval of sending heartbeat event for tracing the latest available binlog offsets.heartbeat.intervaloptional30sDuration用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。
debezium.* optional (none) StringPass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. - For example: 'debezium.snapshot.mode' = 'never'. - See more about the Debezium's MySQL Connector properties将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 + 例如: 'debezium.snapshot.mode' = 'never'. + 查看更多关于 Debezium 的 MySQL 连接器属性
scan.incremental.close-idle-reader.enabled optional false BooleanWhether to close idle readers at the end of the snapshot phase.
- The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.
- If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, - so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' -
是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
+ 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。
-## Startup Reading Position - -The config option `scan.startup.mode` specifies the startup mode for MySQL CDC consumer. The valid enumerations are: +## 启动模式 -- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. -- `earliest-offset`: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset. -- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from - the end of the binlog which means only have the changes since the connector was started. -- `specific-offset`: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be - specified with binlog filename and position, or a GTID set if GTID is enabled on server. -- `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp. +配置选项`scan.startup.mode`指定 MySQL CDC 使用者的启动模式。有效枚举包括: +- `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。 +- `earliest-offset`:跳过快照阶段,从可读取的最早 binlog 位点开始读取 +- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。 +- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。 +- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。 -## Data Type Mapping +## 数据类型映射
- - - + + + @@ -367,8 +358,7 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c where 38 < p <= 65
- + - + - + - +
MySQL typeFlink CDC typeNoteMySQL typeCDC typeNOTE
STRINGThe precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. - So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。
@@ -456,7 +446,7 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c LONGBLOB
BYTESCurrently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。
@@ -470,14 +460,14 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c JSON STRINGThe JSON data type will be converted into STRING with JSON format in Flink. JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。
SET -Not supported yet. 暂不支持
@@ -494,34 +484,34 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c STRING - The spatial data types in MySQL will be converted into STRING with a fixed Json format. - Please see MySQL Spatial Data Types Mapping section for more detailed information. + MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 + 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。
-### MySQL Spatial Data Types Mapping -The spatial data types except for `GEOMETRYCOLLECTION` in MySQL will be converted into Json String with a fixed format like:
+### 空间数据类型映射 +MySQL中除`GEOMETRYCOLLECTION`之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
```json {"srid": 0 , "type": "xxx", "coordinates": [0, 0]} ``` -The field `srid` identifies the SRS in which the geometry is defined, SRID 0 is the default for new geometry values if no SRID is specified. -As only MySQL 8+ support to specific SRID when define spatial data type, the field `srid` will always be 0 in MySQL with a lower version. +字段`srid`标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 +由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段`srid`将始终为 0。 -The field `type` identifies the spatial data type, such as `POINT`/`LINESTRING`/`POLYGON`. +字段`type`标识空间数据类型,例如`POINT`/`LINESTRING`/`POLYGON`。 -The field `coordinates` represents the `coordinates` of the spatial data. +字段`coordinates`表示空间数据的`坐标`。 -For `GEOMETRYCOLLECTION`, it will be converted into Json String with a fixed format like:
+对于`GEOMETRYCOLLECTION`,它将转换为 Json 字符串,格式固定,如:
```json {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]} ``` -The field `geometries` is an array contains all spatial data. +`Geometrics`字段是一个包含所有空间数据的数组。 -The example for different spatial data types mapping is as follows: +不同空间数据类型映射的示例如下:
diff --git a/docs/content.zh/docs/connectors/overview.md b/docs/content.zh/docs/connectors/overview.md index 60740568c9..3743d72bc1 100644 --- a/docs/content.zh/docs/connectors/overview.md +++ b/docs/content.zh/docs/connectors/overview.md @@ -26,10 +26,7 @@ under the License. # Connectors -Flink CDC provides several source and sink connectors to interact with external -systems. You can use these connectors out-of-box, by adding released JARs to -your Flink CDC environment, and specifying the connector in your YAML pipeline -definition. +Flink CDC 提供了多个Source和Sink连接器来与外部系统交互。您可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在您的 YAML Pipeline定义中指定所需的连接器。 ## Supported Connectors @@ -41,16 +38,9 @@ definition. ## Develop Your Own Connector -If provided connectors cannot fulfill your requirement, you can always develop -your own connector to get your external system involved in Flink CDC pipelines. -Check out [Flink CDC APIs]({{< ref "docs/developer-guide/understand-flink-cdc-api" >}}) -to learn how to develop your own connectors. +如果现有的连接器无法满足您的需求,您可以自行开发自己的连接器,以将您的外部系统集成到 Flink CDC 数据管道中。查阅 [Flink CDC APIs]({{< ref "docs/developer-guide/understand-flink-cdc-api" >}}) 了解如何开发您自己的连接器。 ## Legacy Flink CDC Sources - -Flink CDC sources introduces before 3.0 are still available as normal Flink -connector sources. You can find more details in the -[overview page]({{< ref "docs/connectors/legacy-flink-cdc-sources/overview" >}}) -of legacy Flink CDC sources. +在 3.0 版本之前引入的 Flink CDC Source 仍然可以作为普通的 Flink 连接器使用。您可以在 [Legacy CDC Sources/概览]({{< ref "docs/connectors/legacy-flink-cdc-sources/overview" >}}) 中找到更多详细信息。 {{< top >}} diff --git a/docs/content.zh/docs/connectors/starrocks.md b/docs/content.zh/docs/connectors/starrocks.md index 84a4a99d1f..6a72a25ff5 100644 --- a/docs/content.zh/docs/connectors/starrocks.md +++ b/docs/content.zh/docs/connectors/starrocks.md @@ -26,16 +26,16 @@ under the License. # StarRocks Connector -StarRocks connector can be used as the *Data Sink* of the pipeline, and write data to [StarRocks](https://github.com/StarRocks/starrocks). This document describes how to set up the StarRocks connector. +StarRocks Pipeline 连接器可以用作 Pipeline 的 *Data Sink*,将数据写入[StarRocks](https://github.com/StarRocks/starrocks)。 本文档介绍如何设置 StarRocks Pipeline 连接器。 -## What can the connector do? -* Create table automatically if not exist -* Schema change synchronization -* Data synchronization +## 连接器的功能 +* 自动建表 +* 表结构变更同步 +* 数据实时同步 -## Example +## 示例 -The pipeline for reading data from MySQL and sink to StarRocks can be defined as follows: +从 MySQL 读取数据同步到 StarRocks 的 Pipeline 可以定义如下: ```yaml source: @@ -61,7 +61,7 @@ pipeline: parallelism: 2 ``` -## Connector Options +## 连接器配置项
@@ -80,177 +80,168 @@ pipeline: - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - +
required (none) StringSpecify what connector to use, here should be 'starrocks'.指定要使用的连接器, 这里需要设置成 'starrocks'.
name optional (none) StringThe name of the sink.Sink 的名称.
jdbc-url required (none) StringThe address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3`.用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:`jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2`。
load-url required (none) StringThe address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `fe_host1:fe_http_port1;fe_host2:fe_http_port2`.用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:`fe_host1:fe_http_port1;fe_host2:fe_http_port2`。
username required (none) StringUser name to use when connecting to the StarRocks database.StarRocks 集群的用户名。
password required (none) StringPassword to use when connecting to the StarRocks database.StarRocks 集群的用户密码。
sink.label-prefix optional (none) StringThe label prefix used by Stream Load.指定 Stream Load 使用的 label 前缀。
sink.connect.timeout-ms optional 30000 StringThe timeout for establishing HTTP connection. Valid values: 100 to 60000.与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。
sink.wait-for-continue.timeout-ms optional 30000 StringTimeout in millisecond to wait for 100-continue response from FE http server. - Valid values: 3000 to 600000.等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。
sink.buffer-flush.max-bytes optional 157286400 LongThe maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. - The value ranges from 64 MB to 10 GB. This buffer is shared by all tables in the sink. If the buffer - is full, the connector will choose one or more tables to flush.内存中缓冲的数据量大小,缓冲区由所有导入的表共享,达到阈值后将选择一个或多个表的数据写入到StarRocks。 + 达到阈值后取值范围:[64MB, 10GB]。
sink.buffer-flush.interval-ms optional 300000 LongThe interval at which data is flushed for each table. The unit is in millisecond.每个表缓冲数据发送的间隔,用于控制数据写入 StarRocks 的延迟。单位是毫秒,取值范围:[1000, 3600000]。
sink.scan-frequency.ms optional 50 LongScan frequency in milliseconds to check whether the buffered data for a table should be flushed - because of reaching the flush interval.连接器会定期检查每个表是否到达发送间隔,该配置控制检查频率,单位为毫秒。
sink.io.thread-count optional 2 IntegerNumber of threads used for concurrent stream loads among different tables.用来执行 Stream Load 的线程数,不同表之间的导入可以并发执行。
sink.at-least-once.use-transaction-stream-load optional true BooleanWhether to use transaction stream load for at-least-once when it's available.at-least-once 下是否使用 transaction stream load。
sink.properties.* optional (none) StringThe parameters that control Stream Load behavior. For example, the parameter `sink.properties.timeout` - specifies the timeout of Stream Load. For a list of supported parameters and their descriptions, - see - STREAM LOAD.Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 `sink.properties.timeout` 用来控制导入的超时时间。 + 全部参数和解释请参考 + STREAM LOAD
table.create.num-buckets optional (none) IntegerNumber of buckets when creating a StarRocks table automatically. For StarRocks 2.5 or later, it's not required - to set the option because StarRocks can - - determine the number of buckets automatically. For StarRocks prior to 2.5, you must set this option. 自动创建 StarRocks 表时使用的桶数。对于 StarRocks 2.5 及之后的版本可以不设置,StarRocks 将会 + + 自动设置分桶数量;对于 StarRocks 2.5 之前的版本必须设置。
table.create.properties.* optional (none) StringProperties used for creating a StarRocks table. For example: 'table.create.properties.fast_schema_evolution' = 'true' - will enable fast schema evolution if you are using StarRocks 3.2 or later. For more information, - see how to create a primary key table.自动创建 StarRocks 表时使用的属性。比如: 如果使用 StarRocks 3.2 及之后的版本,'table.create.properties.fast_schema_evolution' = 'true' + 将会打开 fast schema evolution 功能。 更多信息请参考 + 主键模型
table.schema-change.timeout optional 30min DurationTimeout for a schema change on StarRocks side, and must be an integral multiple of - seconds. StarRocks will cancel the schema change after timeout which will - cause the sink failure. StarRocks 侧执行 schema change 的超时时间,必须是秒的整数倍。超时后 StarRocks 将会取消 schema change,从而导致作业失败。
+## 使用说明 -## Usage Notes +* 只支持主键表,因此源表必须有主键 -* Only support StarRocks primary key table, so the source table must have primary keys. +* 暂不支持 exactly-once,连接器 通过 at-least-once 和主键表实现幂等写 -* Not support exactly-once. The connector uses at-least-once + primary key table for idempotent writing. +* 对于自动建表 + * 分桶键和主键相同 + * 没有分区键 + * 分桶数由 `table.create.num-buckets` 控制。如果使用的 StarRocks 2.5 及之后的版本可以不设置,StarRocks 能够 + + 自动设置分桶数量。对于 StarRocks 2.5 之前的版本必须设置,否则无法自动创建表。 -* For creating table automatically - * the distribution keys are the same as the primary keys - * there is no partition key - * the number of buckets is controlled by `table.create.num-buckets`. If you are using StarRocks 2.5 or later, - it's not required to set the option because StarRocks can [determine the number of buckets automatically](https://docs.starrocks.io/docs/table_design/Data_distribution/#determine-the-number-of-buckets), - otherwise you must set the option. +* 对于表结构变更同步 + * 只支持增删列 + * 新增列只能添加到最后一列 + * 如果使用 StarRocks 3.2 及之后版本,并且通过连接器来自动建表, 可以通过配置 `table.create.properties.fast_schema_evolution` 为 `true` + 来加速 StarRocks 执行变更。 -* For schema change synchronization - * only supports add/drop columns - * the new column will always be added to the last position - * if your StarRocks version is 3.2 or later, and using the connector to create table automatically, - you can set `table.create.properties.fast_schema_evolution` to `true` to speed up the schema change. +* 对于数据同步,pipeline 连接器使用 [StarRocks Sink 连接器](https://github.com/StarRocks/starrocks-connector-for-apache-flink) + 将数据写入 StarRocks,具体可以参考 [Sink 文档](https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/docs/content/connector-sink.md)。 -* For data synchronization, the pipeline connector uses [StarRocks Sink Connector](https://github.com/StarRocks/starrocks-connector-for-apache-flink) - to write data to StarRocks. You can see [sink documentation](https://github.com/StarRocks/starrocks-connector-for-apache-flink/blob/main/docs/content/connector-sink.md) - for how it works. - -## Data Type Mapping +## 数据类型映射
- + - + @@ -312,22 +303,20 @@ pipeline: - + - + - +
Flink CDC typeCDC type StarRocks typeNoteNOTE
CHAR(n) where n <= 85 CHAR(n * 3)CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese - character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks - CHAR is 255, map CDC CHAR to StarRocks CHAR only when the CDC length is no larger than 85.CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks + 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以只有当 CDC 中长度不超过85时,才将 CDC CHAR 映射到 StarRocks CHAR。
CHAR(n) where n > 85 VARCHAR(n * 3)CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese - character is equal to three bytes, so the length for StarRocks is n * 3. Because the max length of StarRocks - CHAR is 255, map CDC CHAR to StarRocks VARCHAR if the CDC length is larger than 85.CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks + 中为 n * 3。由于 StarRocks CHAR 类型的最大长度为255,所以当 CDC 中长度超过85时,才将 CDC CHAR 映射到 StarRocks VARCHAR。
VARCHAR(n) VARCHAR(n * 3)CDC defines the length by characters, and StarRocks defines it by bytes. According to UTF-8, one Chinese - character is equal to three bytes, so the length for StarRocks is n * 3.CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 中的长度对应到 StarRocks + 中为 n * 3。