diff --git a/README.md b/README.md index 2cb080de07..0ab8559429 100644 --- a/README.md +++ b/README.md @@ -37,50 +37,48 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels) -| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | -| ------------------ | ------------------------------- | :--------: | :--------: | :----------------------------------------------------------: | -| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | -| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| +| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | +| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | | | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | -| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | -| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | -| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | -| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | -| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | -| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | -| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | -| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | -| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | -| | AnalyticDB For PostgreSQL | | √ | 写 | -| 阿里云中间件 | datahub | √ | √ | 读 、写 | -| | SLS | √ | √ | 读 、写 | -| 图数据库 | 阿里云 GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | -| | Neo4j | | √ | 写 | -| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | -| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | -| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | -| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | -| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | -| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | -| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | -| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | -| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | -| | ClickHouse | | √ | 写 | -| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | -| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | -| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | -| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | -| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | -| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | -| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | -| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | - - +| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | +| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | +| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | +| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | +| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | +| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | +| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | +| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | +| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | +| | AnalyticDB For PostgreSQL | | √ | 写 | +| 阿里云中间件 | datahub | √ | √ | 读 、写 | +| | SLS | √ | √ | 读 、写 | +| 图数据库 | 阿里云 GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | +| | Neo4j | | √ | [写](https://github.com/alibaba/DataX/blob/master/neo4jwriter/doc/neo4jwriter.md) | +| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | +| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | +| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | +| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | +| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | +| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | +| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | +| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | +| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | +| | ClickHouse | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/clickhousereader/doc/clickhousereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/clickhousewriter/doc/clickhousewriter.md) | +| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | +| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | +| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | +| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | +| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | +| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | +| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | +| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | # 阿里云DataWorks数据集成 @@ -97,11 +95,11 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N - 新增比如:DB2、Kafka、Hologres、MetaQ、SAPHANA、达梦等等,持续扩充中 - 离线同步支持的数据源:https://help.aliyun.com/document_detail/137670.html - 具备同步解决方案: - - 解决方案系统:https://help.aliyun.com/document_detail/171765.html - - 一键全增量:https://help.aliyun.com/document_detail/175676.html - - 整库迁移:https://help.aliyun.com/document_detail/137809.html - - 批量上云:https://help.aliyun.com/document_detail/146671.html - - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html + - 解决方案系统:https://help.aliyun.com/document_detail/171765.html + - 一键全增量:https://help.aliyun.com/document_detail/175676.html + - 整库迁移:https://help.aliyun.com/document_detail/137809.html + - 批量上云:https://help.aliyun.com/document_detail/146671.html + - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html # 我要开发新的插件 @@ -111,6 +109,11 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N # 重要版本更新说明 DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 +- [datax_v202306](https://github.com/alibaba/DataX/releases/tag/datax_v202306) + - 精简代码 + - 新增插件(neo4jwriter、clickhousewriter) + - 优化插件、修复问题(oceanbase、hdfs、databend、txtfile) + - [datax_v202303](https://github.com/alibaba/DataX/releases/tag/datax_v202303) - 精简代码 @@ -122,10 +125,10 @@ DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull re - 涉及通道能力更新(OceanBase、Tdengine、Doris等) - [datax_v202209](https://github.com/alibaba/DataX/releases/tag/datax_v202209) - - 涉及通道能力更新(MaxCompute、Datahub、SLS等)、安全漏洞更新、通用打包更新等 + - 涉及通道能力更新(MaxCompute、Datahub、SLS等)、安全漏洞更新、通用打包更新等 - [datax_v202205](https://github.com/alibaba/DataX/releases/tag/datax_v202205) - - 涉及通道能力更新(MaxCompute、Hologres、OSS、Tdengine等)、安全漏洞更新、通用打包更新等 + - 涉及通道能力更新(MaxCompute、Hologres、OSS、Tdengine等)、安全漏洞更新、通用打包更新等 # 项目成员 diff --git a/clickhousereader/doc/clickhousereader.md b/clickhousereader/doc/clickhousereader.md new file mode 100644 index 0000000000..bf3cd203f5 --- /dev/null +++ b/clickhousereader/doc/clickhousereader.md @@ -0,0 +1,344 @@ + +# ClickhouseReader 插件文档 + + +___ + + +## 1 快速介绍 + +ClickhouseReader插件实现了从Clickhouse读取数据。在底层实现上,ClickhouseReader通过JDBC连接远程Clickhouse数据库,并执行相应的sql语句将数据从Clickhouse库中SELECT出来。 + +## 2 实现原理 + +简而言之,ClickhouseReader通过JDBC连接器连接到远程的Clickhouse数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Clickhouse数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 + +对于用户配置Table、Column、Where的信息,ClickhouseReader将其拼接为SQL语句发送到Clickhouse数据库;对于用户配置querySql信息,Clickhouse直接将其发送到Clickhouse数据库。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从Clickhouse数据库同步抽取数据到本地的作业: + +``` +{ + "job": { + "setting": { + "speed": { + //设置传输速度 byte/s 尽量逼近这个速度但是不高于它. + // channel 表示通道数量,byte表示通道速度,如果单通道速度1MB,配置byte为1048576表示一个channel + "byte": 1048576 + }, + //出错限制 + "errorLimit": { + //先选择record + "record": 0, + //百分比 1表示100% + "percentage": 0.02 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + // 数据库连接用户名 + "username": "root", + // 数据库连接密码 + "password": "root", + "column": [ + "id","name" + ], + "connection": [ + { + "table": [ + "table" + ], + "jdbcUrl": [ + "jdbc:clickhouse://[HOST_NAME]:PORT/[DATABASE_NAME]" + ] + } + ] + } + }, + "writer": { + //writer类型 + "name": "streamwriter", + // 是否打印内容 + "parameter": { + "print": true + } + } + } + ] + } +} + +``` + +* 配置一个自定义SQL的数据库同步任务到本地内容的作业: + +``` +{ + "job": { + "setting": { + "speed": { + "channel": 5 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + "username": "root", + "password": "root", + "where": "", + "connection": [ + { + "querySql": [ + "select db_id,on_line_flag from db_info where db_id < 10" + ], + "jdbcUrl": [ + "jdbc:clickhouse://1.1.1.1:8123/default" + ] + } + ] + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "visible": false, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + + +### 3.2 参数说明 + +* **jdbcUrl** + + * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,ClickhouseReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,ClickhouseReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。 + + jdbcUrl按照Clickhouse官方规范,并可以填写连接附件控制信息。具体请参看[Clickhouse官方文档](https://clickhouse.com/docs/en/engines/table-engines/integrations/jdbc)。 + + * 必选:是
+ + * 默认值:无
+ +* **username** + + * 描述:数据源的用户名
+ + * 必选:是
+ + * 默认值:无
+ +* **password** + + * 描述:数据源指定用户名的密码
+ + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,ClickhouseReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+ + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用\*代表默认使用所有列配置,例如['\*']。 + + 支持列裁剪,即列可以挑选部分列进行导出。 + + 支持列换序,即列可以不按照表schema信息进行导出。 + + 支持常量配置,用户需要按照JSON格式: + ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] + id为普通列名,\`table\`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。 + + Column必须显示填写,不允许为空! + + * 必选:是
+ + * 默认值:无
+ +* **splitPk** + + * 描述:ClickhouseReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。 + + 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 + + 目前splitPk仅支持整形数据切分,`不支持浮点、日期等其他类型`。如果用户指定其他非支持类型,ClickhouseReader将报错! + + splitPk如果不填写,将视作用户不对单表进行切分,ClickhouseReader使用单通道同步全量数据。 + + * 必选:否
+ + * 默认值:无
+ +* **where** + + * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
+ + where条件可以有效地进行业务增量同步。 + + * 必选:否
+ + * 默认值:无
+ +* **querySql** + + * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
+ + `当用户配置querySql时,ClickhouseReader直接忽略table、column、where条件的配置`。 + + * 必选:否
+ + * 默认值:无
+ +* **fetchSize** + + * 描述:该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能。
+ + `注意,该值过大(>2048)可能造成DataX进程OOM。`。 + + * 必选:否
+ + * 默认值:1024
+ +* **session** + + * 描述:控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 clickhouse 的时间格式。通常配置的参数为:NLS_DATE_FORMAT,NLS_TIME_FORMAT。其配置的值为 json 格式,例如: +``` +"session": [ + "alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'", + "alter session set TIME_ZONE='US/Pacific'" + ] +``` + `(注意"是 " 的转义字符串)`。 + + * 必选:否
+ + * 默认值:无
+ + +### 3.3 类型转换 + +目前ClickhouseReader支持大部分Clickhouse类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出ClickhouseReader针对Clickhouse类型转换列表: + + +| DataX 内部类型| Clickhouse 数据类型 | +| -------- |--------------------------------------------------------------------------------------------| +| Long | UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 | +| Double | Float32, Float64, Decimal | +| String | String, FixedString | +| Date | DATE, Date32, DateTime, DateTime64 | +| Boolean | Boolean | +| Bytes | BLOB,BFILE,RAW,LONG RAW | + + + +请注意: + +* `除上述罗列字段类型外,其他类型均不支持`。 + + +## 4 性能报告 + +### 4.1 环境准备 + +#### 4.1.1 数据特征 + +为了模拟线上真实数据,我们设计两个Clickhouse数据表,分别为: + +#### 4.1.2 机器参数 + +* 执行DataX的机器参数为: + +* Clickhouse数据库机器参数为: + +### 4.2 测试报告 + +#### 4.2.1 表1测试报告 + + +| 并发任务数| DataX速度(Rec/s)|DataX流量|网卡流量|DataX运行负载|DB运行负载| +|--------| --------|--------|--------|--------|--------| +|1| DataX 统计速度(Rec/s)|DataX统计流量|网卡流量|DataX运行负载|DB运行负载| + +## 5 约束限制 + +### 5.1 主备同步数据恢复问题 + +主备同步问题指Clickhouse使用主从灾备,备库从主库不间断通过binlog恢复数据。由于主备数据同步存在一定的时间差,特别在于某些特定情况,例如网络延迟等问题,导致备库同步恢复的数据与主库有较大差别,导致从备库同步的数据不是一份当前时间的完整镜像。 + +针对这个问题,我们提供了preSql功能,该功能待补充。 + +### 5.2 一致性约束 + +Clickhouse在数据存储划分中属于RDBMS系统,对外可以提供强一致性数据查询接口。例如当一次同步任务启动运行过程中,当该库存在其他数据写入方写入数据时,ClickhouseReader完全不会获取到写入更新数据,这是由于数据库本身的快照特性决定的。关于数据库快照特性,请参看[MVCC Wikipedia](https://en.wikipedia.org/wiki/Multiversion_concurrency_control) + +上述是在ClickhouseReader单线程模型下数据同步一致性的特性,由于ClickhouseReader可以根据用户配置信息使用了并发数据抽取,因此不能严格保证数据一致性:当ClickhouseReader根据splitPk进行数据切分后,会先后启动多个并发任务完成数据同步。由于多个并发任务相互之间不属于同一个读事务,同时多个并发任务存在时间间隔。因此这份数据并不是`完整的`、`一致的`数据快照信息。 + +针对多线程的一致性快照需求,在技术上目前无法实现,只能从工程角度解决,工程化的方式存在取舍,我们提供几个解决思路给用户,用户可以自行选择: + +1. 使用单线程同步,即不再进行数据切片。缺点是速度比较慢,但是能够很好保证一致性。 + +2. 关闭其他数据写入方,保证当前数据为静态数据,例如,锁表、关闭备库同步等等。缺点是可能影响在线业务。 + +### 5.3 数据库编码问题 + + +ClickhouseReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此ClickhouseReader不需用户指定编码,可以自动获取编码并转码。 + +对于Clickhouse底层写入编码和其设定的编码不一致的混乱情况,ClickhouseReader对此无法识别,对此也无法提供解决方案,对于这类情况,`导出有可能为乱码`。 + +### 5.4 增量数据同步 + +ClickhouseReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种: + +* 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,ClickhouseReader只需要WHERE条件跟上一同步阶段时间戳即可。 +* 对于新增流水型数据,ClickhouseReader可以WHERE条件后跟上一阶段最大自增ID即可。 + +对于业务上无字段区分新增、修改数据情况,ClickhouseReader也无法进行增量数据同步,只能同步全量数据。 + +### 5.5 Sql安全性 + +ClickhouseReader提供querySql语句交给用户自己实现SELECT抽取语句,ClickhouseReader本身对querySql不做任何安全性校验。这块交由DataX用户方自己保证。 + +## 6 FAQ + +*** + +**Q: ClickhouseReader同步报错,报错信息为XXX** + + A: 网络或者权限问题,请使用Clickhouse命令行测试 + + +如果上述命令也报错,那可以证实是环境问题,请联系你的DBA。 + + +**Q: ClickhouseReader抽取速度很慢怎么办?** + + A: 影响抽取时间的原因大概有如下几个:(来自专业 DBA 卫绾) + 1. 由于SQL的plan异常,导致的抽取时间长; 在抽取时,尽可能使用全表扫描代替索引扫描; + 2. 合理sql的并发度,减少抽取时间; + 3. 抽取sql要简单,尽量不用replace等函数,这个非常消耗cpu,会严重影响抽取速度; diff --git a/clickhousereader/pom.xml b/clickhousereader/pom.xml new file mode 100644 index 0000000000..4b095796db --- /dev/null +++ b/clickhousereader/pom.xml @@ -0,0 +1,91 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + clickhousereader + clickhousereader + jar + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.4 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + + \ No newline at end of file diff --git a/clickhousereader/src/main/assembly/package.xml b/clickhousereader/src/main/assembly/package.xml new file mode 100644 index 0000000000..2053ff754b --- /dev/null +++ b/clickhousereader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + datax + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/clickhousereader + + + target/ + + clickhousereader-0.0.1-SNAPSHOT.jar + + plugin/reader/clickhousereader + + + + + + false + plugin/reader/clickhousereader/libs + runtime + + + \ No newline at end of file diff --git a/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java new file mode 100644 index 0000000000..bf3cad12c4 --- /dev/null +++ b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java @@ -0,0 +1,87 @@ +package com.alibaba.datax.plugin.reader.clickhousereader; + +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.MessageSource; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.fastjson2.JSON; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClickhouseReader extends Reader { + + private static final DataBaseType DATABASE_TYPE = DataBaseType.ClickHouse; + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseReader.class); + + public static class Job extends Reader.Job { + private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(ClickhouseReader.class); + + private Configuration jobConfig = null; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.jobConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsReaderMaster.split(this.jobConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.jobConfig); + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.jobConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.jobConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, 1000); + + this.commonRdbmsReaderSlave.startRead(this.jobConfig, recordSender, super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.jobConfig); + } + } +} diff --git a/clickhousereader/src/main/resources/plugin.json b/clickhousereader/src/main/resources/plugin.json new file mode 100644 index 0000000000..5d608f6c66 --- /dev/null +++ b/clickhousereader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "clickhousereader", + "class": "com.alibaba.datax.plugin.reader.clickhousereader.ClickhouseReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/clickhousereader/src/main/resources/plugin_job_template.json b/clickhousereader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..1814e51063 --- /dev/null +++ b/clickhousereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,16 @@ +{ + "name": "clickhousereader", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:clickhouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [] + } +} \ No newline at end of file diff --git a/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java b/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java new file mode 100644 index 0000000000..a409402045 --- /dev/null +++ b/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java @@ -0,0 +1,74 @@ +package com.alibaba.datax.plugin.reader.clickhousereader; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.dataxservice.face.eventcenter.EventLogStore; +import com.alibaba.datax.dataxservice.face.eventcenter.RuntimeContext; +import com.alibaba.datax.test.simulator.BasicReaderPluginTest; +import com.alibaba.datax.test.simulator.junit.extend.log.LoggedRunner; +import com.alibaba.datax.test.simulator.junit.extend.log.TestLogger; +import com.alibaba.fastjson.JSON; + +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + + +@RunWith(LoggedRunner.class) +@Ignore +public class ClickhouseReaderTest extends BasicReaderPluginTest { + @TestLogger(log = "测试basic1.json. 配置常量.") + @Test + public void testBasic1() { + RuntimeContext.setGlobalJobId(-1); + EventLogStore.init(); + List noteRecordForTest = new ArrayList(); + + List subjobs = super.doReaderTest("basic1.json", 1, noteRecordForTest); + + Assert.assertEquals(1, subjobs.size()); + Assert.assertEquals(1, noteRecordForTest.size()); + + Assert.assertEquals("[8,16,32,64,-8,-16,-32,-64,\"3.2\",\"6.4\",1,\"str_col\",\"abc\"," + "\"417ddc5d-e556-4d27-95dd-a34d84e46a50\",1580745600000,1580752800000,\"hello\",\"[1,2,3]\"," + "\"[\\\"abc\\\",\\\"cde\\\"]\",\"(8,'uint8_type')\",null,\"[1,2]\",\"[\\\"x\\\",\\\"y\\\"]\",\"127.0.0.1\",\"::\",\"23.345\"]", JSON.toJSONString(listData(noteRecordForTest.get(0)))); + } + + @Override + protected OutputStream buildDataOutput(String optionalOutputName) { + File f = new File(optionalOutputName + "-output.txt"); + try { + return new FileOutputStream(f); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public String getTestPluginName() { + return "clickhousereader"; + } + + private Object[] listData(Record record) { + if (null == record) { + return ArrayUtils.EMPTY_OBJECT_ARRAY; + } + Object[] arr = new Object[record.getColumnNumber()]; + for (int i = 0; i < arr.length; i++) { + Column col = record.getColumn(i); + if (null != col) { + arr[i] = col.getRawData(); + } + } + return arr; + } +} diff --git a/clickhousereader/src/test/resources/basic1.json b/clickhousereader/src/test/resources/basic1.json new file mode 100755 index 0000000000..c45a45e7f6 --- /dev/null +++ b/clickhousereader/src/test/resources/basic1.json @@ -0,0 +1,57 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 5 + } + }, + "content": [ + { + "reader": { + "name": "clickhousereader", + "parameter": { + "username": "XXXX", + "password": "XXXX", + "column": [ + "uint8_col", + "uint16_col", + "uint32_col", + "uint64_col", + "int8_col", + "int16_col", + "int32_col", + "int64_col", + "float32_col", + "float64_col", + "bool_col", + "str_col", + "fixedstr_col", + "uuid_col", + "date_col", + "datetime_col", + "enum_col", + "ary_uint8_col", + "ary_str_col", + "tuple_col", + "nullable_col", + "nested_col.nested_id", + "nested_col.nested_str", + "ipv4_col", + "ipv6_col", + "decimal_col" + ], + "connection": [ + { + "table": [ + "all_type_tbl" + ], + "jdbcUrl":["jdbc:clickhouse://XXXX:8123/default"] + } + ] + } + }, + "writer": {} + } + ] + } +} \ No newline at end of file diff --git a/clickhousereader/src/test/resources/basic1.sql b/clickhousereader/src/test/resources/basic1.sql new file mode 100644 index 0000000000..f937b889ff --- /dev/null +++ b/clickhousereader/src/test/resources/basic1.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS default.all_type_tbl +( +`uint8_col` UInt8, +`uint16_col` UInt16, +uint32_col UInt32, +uint64_col UInt64, +int8_col Int8, +int16_col Int16, +int32_col Int32, +int64_col Int64, +float32_col Float32, +float64_col Float64, +bool_col UInt8, +str_col String, +fixedstr_col FixedString(3), +uuid_col UUID, +date_col Date, +datetime_col DateTime, +enum_col Enum('hello' = 1, 'world' = 2), +ary_uint8_col Array(UInt8), +ary_str_col Array(String), +tuple_col Tuple(UInt8, String), +nullable_col Nullable(UInt8), +nested_col Nested + ( + nested_id UInt32, + nested_str String + ), +ipv4_col IPv4, +ipv6_col IPv6, +decimal_col Decimal(5,3) +) +ENGINE = MergeTree() +ORDER BY (uint8_col); \ No newline at end of file diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 06d5310879..d7b8f2edfb 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java index 2929658ae1..ad165d9975 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/PartitionSplitUtil.java @@ -3,6 +3,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.reader.Constant; import com.alibaba.datax.plugin.rdbms.reader.Key; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ObReaderKey; diff --git a/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar deleted file mode 100644 index 38162912f8..0000000000 Binary files a/oceanbasev10reader/src/main/libs/oceanbase-client-1.1.10.jar and /dev/null differ diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java index 3bcc101944..06292db512 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java @@ -86,6 +86,7 @@ public void prepare() { if (tableNumber == 1) { this.commonJob.prepare(this.originalConfig); final String version = fetchServerVersion(originalConfig); + ObWriterUtils.setObVersion(version); originalConfig.set(Config.OB_VERSION, version); } @@ -187,8 +188,9 @@ public void destroy() { } private String fetchServerVersion(Configuration config) { - final String fetchVersionSql = "show variables like 'version'"; - return DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); + final String fetchVersionSql = "show variables like 'version_comment'"; + String versionComment = DbUtils.fetchSingleValueWithRetry(config, fetchVersionSql); + return versionComment.split(" ")[1]; } private void checkCompatibleMode(Configuration configure) { diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java index e590fe6b87..ec26e788f2 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java @@ -3,18 +3,17 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.datax.plugin.rdbms.writer.Key; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DbUtils { @@ -25,7 +24,7 @@ public static String fetchSingleValueWithRetry(Configuration config, String quer final String password = config.getString(Key.PASSWORD); String jdbcUrl = config.getString(Key.JDBC_URL); - if(jdbcUrl == null) { + if (jdbcUrl == null) { List conns = config.getList(Constant.CONN_MARK, Object.class); Configuration connConf = Configuration.from(conns.get(0).toString()); jdbcUrl = connConf.getString(Key.JDBC_URL); @@ -34,9 +33,9 @@ public static String fetchSingleValueWithRetry(Configuration config, String quer Connection conn = null; PreparedStatement stmt = null; ResultSet result = null; - boolean need_retry = false; String value = null; int retry = 0; + int failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); do { try { if (retry > 0) { @@ -58,13 +57,12 @@ public static String fetchSingleValueWithRetry(Configuration config, String quer LOG.info("value for query [{}] is [{}]", query, value); break; } catch (SQLException e) { - need_retry = true; ++retry; LOG.warn("fetch value with {} error {}", query, e); } finally { DBUtil.closeDBResources(result, stmt, conn); } - } while (need_retry); + } while (retry < failTryCount); return value; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index edc4b23657..037e4ce519 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -1,5 +1,6 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.util; +import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion; import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; @@ -18,8 +19,11 @@ public class ObWriterUtils { private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH"; private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; + private static final String CHECK_MEMSTORE_4_0 = "select 1 from %s.gv$ob_memstore t where t.MEMSTORE_USED>t.MEMSTORE_LIMIT * ?"; + private static Set databaseKeywords; private static String compatibleMode = null; + private static String obVersion = null; protected static final Logger LOG = LoggerFactory.getLogger(Task.class); private static Set keywordsFromString2HashSet(final String keywords) { return new HashSet(Arrays.asList(keywords.split(","))); @@ -61,7 +65,7 @@ public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) if (isOracleMode()) { sysDbName = "sys"; } - ps = conn.prepareStatement(String.format(CHECK_MEMSTORE, sysDbName)); + ps = conn.prepareStatement(String.format(getMemStoreSql(), sysDbName)); ps.setDouble(1, memstoreThreshold); rs = ps.executeQuery(); // 只要有满足条件的,则表示当前租户 有个机器的memstore即将满 @@ -81,6 +85,14 @@ public static boolean isOracleMode(){ return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE)); } + private static String getMemStoreSql() { + if (ObVersion.valueOf(obVersion).compareTo(ObVersion.V4000) >= 0) { + return CHECK_MEMSTORE_4_0; + } else { + return CHECK_MEMSTORE; + } + } + public static String getCompatibleMode() { return compatibleMode; } @@ -89,6 +101,10 @@ public static void setCompatibleMode(String mode) { compatibleMode = mode; } + public static void setObVersion(String version) { + obVersion = version; + } + private static String buildDeleteSql (String tableName, List columns) { StringBuilder builder = new StringBuilder("DELETE FROM "); builder.append(tableName).append(" WHERE "); diff --git a/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar b/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar deleted file mode 100644 index 38162912f8..0000000000 Binary files a/oceanbasev10writer/src/main/libs/oceanbase-client-1.1.10.jar and /dev/null differ diff --git a/package.xml b/package.xml old mode 100755 new mode 100644 index 0654a39186..41cd9c5515 --- a/package.xml +++ b/package.xml @@ -145,6 +145,13 @@ datax + + clickhousereader/target/datax/ + + **/*.* + + datax + hdfsreader/target/datax/ diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java similarity index 97% rename from oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java rename to plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java index 2fc414ce2c..0eb34feb0d 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObVersion.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java @@ -1,4 +1,4 @@ -package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; +package com.alibaba.datax.plugin.rdbms.reader.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java old mode 100755 new mode 100644 index 10cfe79571..844b6cfd5f --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/SingleTableSplitUtil.java @@ -7,6 +7,7 @@ import com.alibaba.datax.plugin.rdbms.util.*; import com.alibaba.fastjson2.JSON; +import java.text.MessageFormat; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -20,6 +21,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.List; +import static org.apache.commons.lang3.StringUtils.EMPTY; public class SingleTableSplitUtil { private static final Logger LOG = LoggerFactory @@ -277,7 +279,24 @@ private static String genPKRangeSQL(Configuration configuration) { String splitPK = configuration.getString(Key.SPLIT_PK).trim(); String table = configuration.getString(Key.TABLE).trim(); String where = configuration.getString(Key.WHERE, null); - return genPKSql(splitPK,table,where); + String obMode = configuration.getString("obCompatibilityMode"); + // OceanBase对SELECT MIN(%s),MAX(%s) FROM %s这条sql没有做查询改写,会进行全表扫描,在数据量的时候查询耗时很大甚至超时; + // 所以对于OceanBase数据库,查询模板需要改写为分别查询最大值和最小值。这样可以提升查询数量级的性能。 + if (DATABASE_TYPE == DataBaseType.OceanBase && StringUtils.isNotEmpty(obMode)) { + boolean isOracleMode = "ORACLE".equalsIgnoreCase(obMode); + + String minMaxTemplate = isOracleMode ? "select v2.id as min_a, v1.id as max_a from (" + + "select * from (select %s as id from %s {0} order by id desc) where rownum =1 ) v1," + + "(select * from (select %s as id from %s order by id asc) where rownum =1 ) v2;" : + "select v2.id as min_a, v1.id as max_a from (select %s as id from %s {0} order by id desc limit 1) v1," + + "(select %s as id from %s order by id asc limit 1) v2;"; + + String pkRangeSQL = String.format(minMaxTemplate, splitPK, table, splitPK, table); + String whereString = StringUtils.isNotBlank(where) ? String.format("WHERE (%s AND %s IS NOT NULL)", where, splitPK) : EMPTY; + pkRangeSQL = MessageFormat.format(pkRangeSQL, whereString); + return pkRangeSQL; + } + return genPKSql(splitPK, table, where); } public static String genPKSql(String splitPK, String table, String where){ diff --git a/pom.xml b/pom.xml index bbb128be43..eafde57b2f 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ ftpreader txtfilereader streamreader + clickhousereader mongodbreader tdenginereader diff --git a/userGuid.md b/userGuid.md index 876bae9903..2bd23876df 100644 --- a/userGuid.md +++ b/userGuid.md @@ -17,7 +17,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 * 工具部署 - * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz) + * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202306/datax.tar.gz) 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: