diff --git a/core/src/main/job/job.json b/core/src/main/job/job.json
index cc35387778..ad5d4a85c5 100755
--- a/core/src/main/job/job.json
+++ b/core/src/main/job/job.json
@@ -2,11 +2,10 @@
"job": {
"setting": {
"speed": {
- "channel":1
+ "channel": 2
},
"errorLimit": {
- "record": 0,
- "percentage": 0.02
+ "record": 0
}
},
"content": [
@@ -14,17 +13,17 @@
"reader": {
"name": "streamreader",
"parameter": {
- "column" : [
+ "column": [
{
"value": "DataX",
"type": "string"
},
{
- "value": 19890604,
+ "value": 1724154616370,
"type": "long"
},
{
- "value": "1989-06-04 00:00:00",
+ "value": "2024-01-01 00:00:00",
"type": "date"
},
{
@@ -32,11 +31,11 @@
"type": "bool"
},
{
- "value": "test",
+ "value": "TestRawData",
"type": "bytes"
}
],
- "sliceRecordCount": 100000
+ "sliceRecordCount": 100
}
},
"writer": {
@@ -49,4 +48,4 @@
}
]
}
-}
+}
\ No newline at end of file
diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md
index 58a688b8dc..2070113b4d 100644
--- a/doriswriter/doc/doriswriter.md
+++ b/doriswriter/doc/doriswriter.md
@@ -36,8 +36,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
"name": "doriswriter",
"parameter": {
"loadUrl": ["172.16.0.13:8030"],
- "loadProps": {
- },
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"username": "root",
"password": "xxxxxx",
@@ -178,4 +176,4 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
}
```
-更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
\ No newline at end of file
+更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
diff --git a/elasticsearchwriter/doc/elasticsearchwriter.md b/elasticsearchwriter/doc/elasticsearchwriter.md
index 9a22f13c22..3a3315edc3 100644
--- a/elasticsearchwriter/doc/elasticsearchwriter.md
+++ b/elasticsearchwriter/doc/elasticsearchwriter.md
@@ -167,79 +167,4 @@
* dynamic
* 描述: 不使用datax的mappings,使用es自己的自动mappings
* 必选: 否
- * 默认值: false
-
-
-
-## 4 性能报告
-
-### 4.1 环境准备
-
-* 总数据量 1kw条数据, 每条0.1kb
-* 1个shard, 0个replica
-* 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升
-
-#### 4.1.1 输入数据类型(streamreader)
-
-```
-{"value": "1.1.1.1", "type": "string"},
-{"value": 19890604.0, "type": "double"},
-{"value": 19890604, "type": "long"},
-{"value": 19890604, "type": "long"},
-{"value": "hello world", "type": "string"},
-{"value": "hello world", "type": "string"},
-{"value": "41.12,-71.34", "type": "string"},
-{"value": "2017-05-25", "type": "string"},
-```
-
-#### 4.1.2 输出数据类型(eswriter)
-
-```
-{ "name": "col_ip","type": "ip" },
-{ "name": "col_double","type": "double" },
-{ "name": "col_long","type": "long" },
-{ "name": "col_integer","type": "integer" },
-{ "name": "col_keyword", "type": "keyword" },
-{ "name": "col_text", "type": "text"},
-{ "name": "col_geo_point", "type": "geo_point" },
-{ "name": "col_date", "type": "date"}
-```
-
-#### 4.1.2 机器参数
-
-1. cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
-2. mem: 128G
-3. net: 千兆双网卡
-
-#### 4.1.3 DataX jvm 参数
-
--Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
-
-### 4.2 测试报告
-
-| 通道数| 批量提交行数| DataX速度(Rec/s)|DataX流量(MB/s)|
-|--------|--------| --------|--------|
-| 4| 256| 11013| 0.828|
-| 4| 1024| 19417| 1.43|
-| 4| 4096| 23923| 1.76|
-| 4| 8172| 24449| 1.80|
-| 8| 256| 21459| 1.58|
-| 8| 1024| 37037| 2.72|
-| 8| 4096| 45454| 3.34|
-| 8| 8172| 45871| 3.37|
-| 16| 1024| 67567| 4.96|
-| 16| 4096| 78125| 5.74|
-| 16| 8172| 77519| 5.69|
-| 32| 1024| 94339| 6.93|
-| 32| 4096| 96153| 7.06|
-| 64| 1024| 91743| 6.74|
-
-### 4.3 测试总结
-
-* 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
-* 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展
-
-## 5 约束限制
-
-* 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
-* 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)
\ No newline at end of file
+ * 默认值: false
\ No newline at end of file
diff --git a/obhbasereader/doc/obhbasereader.md b/obhbasereader/doc/obhbasereader.md
new file mode 100644
index 0000000000..675f6ce795
--- /dev/null
+++ b/obhbasereader/doc/obhbasereader.md
@@ -0,0 +1,178 @@
+OceanBase的table api为应用提供了ObHBase的访问接口,因此,OceanBase的table api的reader与HBase Reader的结构和配置方法类似。
+obhbasereader插件支持sql和hbase api两种读取方式,两种方式存在如下区别:
+
+1. sql方式可以按照分区或者K值进行数据切片,而hbase api方式的数据切片需要用户手动设置。
+2. sql方式会将从obhbase读取的kqtv形式的数据转换为单一横行,而hbase api则不做行列转换,直接以kqtv形式将数据传递给下游。
+3. sql方式需要配置column属性,hbase api则不需要配置,数据均为固定的kqtv四列。
+4. sql方式仅支持获取获得最新或者最旧版本的数据,而hbase api支持获得多版本数据。
+#### 脚本配置
+```json
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 3,
+ "byte": 104857600
+ },
+ "errorLimit": {
+ "record": 10
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "obhbasereader",
+ "parameter": {
+ "username": "username",
+ "password": "password",
+ "encoding": "utf8",
+ "column": [
+ {
+ "name": "f1:column1_1",
+ "type": "string"
+ },
+ {
+ "name": "f1:column2_2",
+ "type": "string"
+ },
+ {
+ "name": "f1:column1_1",
+ "type": "string"
+ },
+ {
+ "name": "f1:column2_2",
+ "type": "string"
+ }
+ ],
+ "range": [
+ {
+ "startRowkey": "aaa",
+ "endRowkey": "ccc",
+ "isBinaryRowkey": false
+ },
+ {
+ "startRowkey": "eee",
+ "endRowkey": "zzz",
+ "isBinaryRowkey": false
+ }
+ ],
+ "mode": "normal",
+ "readByPartition": "true",
+ "scanCacheSize": "",
+ "readerHint": "",
+ "readBatchSize": "1000",
+ "connection": [
+ {
+ "table": [
+ "htable1",
+ "htable2"
+ ],
+ "jdbcUrl": [
+ "||_dsc_ob10_dsc_||集群:租户||_dsc_ob10_dsc_||jdbc:mysql://ip:port/dbName1"
+ ],
+ "username": "username",
+ "password": "password"
+ },
+ {
+ "table": [
+ "htable1",
+ "htable2"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://ip:port/database"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "txtfilewriter",
+ "parameter": {
+ "path": "/Users/xujing/datax/txtfile",
+ "charset": "UTF-8",
+ "fieldDelimiter": ",",
+ "fileName": "hbase",
+ "nullFormat": "null",
+ "writeMode": "truncate"
+ }
+ }
+ }
+ ]
+ }
+}
+```
+##### 参数解释
+
+- **connection**
+ - 描述:配置分库分表的jdbcUrl和分表名。如果一个分库中有多个分表可以用逗号隔开,也可以写成表名[起始序号-截止序号]
+ - 必须:是
+ - 默认值:无
+- **jdbcUrl**
+ - 描述:连接ob使用的jdbc url,支持如下两种格式:
+ - jdbc:mysql://obproxyIp:obproxyPort/db
+ - 此格式下username需要写成三段式格式
+ - ||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/db
+ - 此格式下username仅填写用户名本身,无需三段式写法
+
+ - 必选:是
+ - 默认值:无
+- **table**
+ - 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,obhbasereader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
+ - 必选:是
+ - 默认值:无
+- **readByPartition**
+ - 描述:使用sql方式读取时,配置**仅**按照分区进行切片。
+ - 必须:否
+ - 默认值:false
+- **partitionName**
+ - 描述:使用sql方式读取时,标识仅读取指定分区名的数据,用户需要保证配置的分区名在表结构中真实存在(要求严格大小写)。
+ - 必须:否
+ - 默认值:无
+- **readBatchSize**
+ - 描述:使用sql方式读取时,分页大小。
+ - 必须:否
+ - 默认值:10w
+- **fetchSize**
+ - 描述:使用sql方式读取时,控制每次读取数据时从结果集中获取的数据行数。
+ - 必须:否
+ - 默认值:-2147483648
+- **scanCacheSize**
+ - 描述:使用hbase api读取时,每次rpc从服务器端读取的行数
+ - 必须:否
+ - 默认值:256
+- **readerHint**
+ - 描述:obhbasereader使用sql方式读取时使用的hint
+ - 必须:否
+ - 默认值:/*+READ_CONSISTENCY(weak),QUERY_TIMEOUT(86400000000)*/
+- **column**
+ - 描述:使用sql方式读取数据时,所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。
+ - 支持列裁剪,即列可以挑选部分列进行导出。
+```
+支持列换序,即列可以不按照表schema信息进行导出,同时支持通配符*,在使用之前需仔细核对列信息。
+```
+
+- 必选:sql方式读取时必选
+ - 默认值:无
+- **range**
+ - 描述**:**指定hbasereader读取的rowkey范围
+ - 必须:否
+ - 默认值:无
+- **username**
+ - 描述:访问OceanBase的用户名
+ - 必选:是
+ - 默认值:无
+- **mode**
+ - 描述:读取obhbase的模式,normal 模式,即仅读取一个版本的数据。
+ - 必选:是
+ - 默认值:normal
+- **version**
+ - 描述:读取obhbase的版本,当前支持oldest、latest模式,分别表示读取最旧和最新的数据。
+ - 必须:是
+ - 默认值:oldest
+
+一些注意点:
+注:如果配置了**partitionName**,则无需再配置readByPartition,即便配置了也会忽略readByPartition选项,而是仅会读取指定分区的数据。
+注:如果配置了**readByPartition**,任务将仅按照分区切分任务,而不会再按照K值进行切分。如果是非分区表,则整张表会被当作一个任务而不会再切分。
+
+
+
diff --git a/obhbasereader/pom.xml b/obhbasereader/pom.xml
new file mode 100755
index 0000000000..62afc3444b
--- /dev/null
+++ b/obhbasereader/pom.xml
@@ -0,0 +1,151 @@
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ obhbasereader
+ com.alibaba.datax
+ obhbasereader
+ 0.0.1-SNAPSHOT
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+ provided
+
+
+ com.alibaba.datax
+ oceanbasev10reader
+ 0.0.1-SNAPSHOT
+
+
+ guava
+ com.google.guava
+
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.3.2
+
+
+ log4j
+ log4j
+
+
+
+
+ commons-collections
+ commons-collections
+ 3.2.1
+
+
+
+
+
+
+
+
+
+ com.oceanbase
+ obkv-hbase-client
+ 0.1.4.2
+
+
+ guava
+ com.google.guava
+
+
+
+
+
+ com.google.guava
+ guava
+ ${guava-version}
+
+
+ com.alibaba.toolkit.common
+ toolkit-common-logging
+ 1.14
+
+
+ org.json
+ json
+ 20160810
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ 1.4.10
+ test
+
+
+ org.powermock
+ powermock-api-mockito
+ 1.4.10
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.8.5
+ test
+
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/obhbasereader/src/main/assembly/package.xml b/obhbasereader/src/main/assembly/package.xml
new file mode 100755
index 0000000000..43da622d5c
--- /dev/null
+++ b/obhbasereader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/obhbasereader
+
+
+ target/
+
+ obhbasereader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/obhbasereader
+
+
+
+
+
+ false
+ plugin/reader/obhbasereader/libs
+ runtime
+
+
+
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Constant.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Constant.java
new file mode 100755
index 0000000000..40dd32d282
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Constant.java
@@ -0,0 +1,34 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+import ch.qos.logback.classic.Level;
+
+public final class Constant {
+ public static final String ROWKEY_FLAG = "rowkey";
+ public static final int DEFAULT_SCAN_CACHE = 256;
+ public static final int DEFAULT_FETCH_SIZE = Integer.MIN_VALUE;
+ public static final int DEFAULT_READ_BATCH_SIZE = 100000;
+ // timeout:24 * 3600 = 86400s
+ public static final String OB_READ_HINT = "/*+READ_CONSISTENCY(weak),QUERY_TIMEOUT(86400000000)*/";
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ public static final String DEFAULT_ENCODING = "UTF-8";
+ public static final String DEFAULT_TIMEZONE = "UTC";
+ public static final boolean DEFAULT_USE_SQLREADER = true;
+ public static final boolean DEFAULT_USE_ODPMODE = true;
+ public static final String OB_TABLE_CLIENT_PROPERTY = "logging.path.com.alipay.oceanbase-table-client";
+ public static final String OB_TABLE_HBASE_PROPERTY = "logging.path.com.alipay.oceanbase-table-hbase";
+ public static final String OB_TABLE_CLIENT_LOG_LEVEL = "logging.level.oceanbase-table-client";
+ public static final String OB_TABLE_HBASE_LOG_LEVEL = "logging.level.oceanbase-table-hbase";
+ public static final String OB_COM_ALIPAY_TABLE_CLIENT_LOG_LEVEL = "logging.level.com.alipay.oceanbase-table-client";
+ public static final String OB_COM_ALIPAY_TABLE_HBASE_LOG_LEVEL = "logging.level.com.alipay.oceanbase-table-hbase";
+ public static final String OB_HBASE_LOG_PATH = System.getProperty("datax.home") + "/log/";
+ public static final String DEFAULT_OB_TABLE_CLIENT_LOG_LEVEL = Level.OFF.toString();
+ public static final String DEFAULT_OB_TABLE_HBASE_LOG_LEVEL = Level.OFF.toString();
+ public static final String OBMYSQL_KEYWORDS =
+ "CUME_DIST,DENSE_RANK,EMPTY,FIRST_VALUE,GROUPING,GROUPS,INTERSECT,JSON_TABLE,LAG,LAST_VALUE,LATERAL,LEAD,NTH_VALUE,NTILE,OF,OVER,PERCENT_RANK,RANK,RECURSIVE,ROW_NUMBER,SYSTEM,WINDOW,ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,"
+ + "CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,"
+ + "INDEXES," + "INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKE,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,"
+ + "MAX_USER_CONNECTIONS,"
+ + "MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,"
+ + "RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,"
+ + "START," + "STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE";
+}
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HTableManager.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HTableManager.java
new file mode 100755
index 0000000000..c36114fbb8
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HTableManager.java
@@ -0,0 +1,19 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+import com.alipay.oceanbase.hbase.OHTable;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+public final class HTableManager {
+
+ public static OHTable createHTable(Configuration config, String tableName) throws IOException {
+ return new OHTable(config, tableName);
+ }
+
+ public static void closeHTable(OHTable hTable) throws IOException {
+ if (hTable != null) {
+ hTable.close();
+ }
+ }
+}
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseColumnCell.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseColumnCell.java
new file mode 100755
index 0000000000..1f794ae0aa
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseColumnCell.java
@@ -0,0 +1,124 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+import com.alibaba.datax.common.base.BaseObject;
+import com.alibaba.datax.plugin.reader.obhbasereader.enums.ColumnType;
+import com.alibaba.datax.plugin.reader.obhbasereader.util.ObHbaseReaderUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 描述 hbasereader 插件中,column 配置中的一个单元项实体
+ */
+public class HbaseColumnCell extends BaseObject {
+ private ColumnType columnType;
+
+ // columnName 格式为:列族:列名
+ private String columnName;
+
+ private byte[] cf;
+ private byte[] qualifier;
+
+ //对于常量类型,其常量值放到 columnValue 里
+ private String columnValue;
+
+ //当配置了 columnValue 时,isConstant=true(这个成员变量是用于方便使用本类的地方判断是否是常量类型字段)
+ private boolean isConstant;
+
+ // 只在类型是时间类型时,才会设置该值,无默认值。形式如:yyyy-MM-dd HH:mm:ss
+ private String dateformat;
+
+ private HbaseColumnCell(Builder builder) {
+ this.columnType = builder.columnType;
+
+ //columnName 和 columnValue 必须有一个为 null
+ Validate.isTrue(builder.columnName == null || builder.columnValue == null, "In obhbasereader, column cannot configure both column name and column value. Choose one of them.");
+
+ //columnName 和 columnValue 不能都为 null
+ Validate.isTrue(builder.columnName != null || builder.columnValue != null, "In obhbasereader, column cannot configure both column name and column value. Choose one of them.");
+
+ if (builder.columnName != null) {
+ this.isConstant = false;
+ this.columnName = builder.columnName;
+
+ // 如果 columnName 不是 rowkey,则必须配置为:列族:列名 格式
+ if (!ObHbaseReaderUtil.isRowkeyColumn(this.columnName)) {
+
+ String promptInfo = "In obhbasereader, the column configuration format of column should be: 'family:column'. The column you configured is wrong:" + this.columnName;
+ String[] cfAndQualifier = this.columnName.split(":");
+ Validate.isTrue(cfAndQualifier.length == 2 && StringUtils.isNotBlank(cfAndQualifier[0]) && StringUtils.isNotBlank(cfAndQualifier[1]), promptInfo);
+
+ this.cf = Bytes.toBytes(cfAndQualifier[0].trim());
+ this.qualifier = Bytes.toBytes(cfAndQualifier[1].trim());
+ }
+ } else {
+ this.isConstant = true;
+ this.columnValue = builder.columnValue;
+ }
+
+ if (builder.dateformat != null) {
+ this.dateformat = builder.dateformat;
+ }
+ }
+
+ public ColumnType getColumnType() {
+ return columnType;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public byte[] getCf() {
+ return cf;
+ }
+
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+ public String getDateformat() {
+ return dateformat;
+ }
+
+ public String getColumnValue() {
+ return columnValue;
+ }
+
+ public boolean isConstant() {
+ return isConstant;
+ }
+
+ // 内部 builder 类
+ public static class Builder {
+ private ColumnType columnType;
+ private String columnName;
+ private String columnValue;
+
+ private String dateformat;
+
+ public Builder(ColumnType columnType) {
+ this.columnType = columnType;
+ }
+
+ public Builder columnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ public Builder columnValue(String columnValue) {
+ this.columnValue = columnValue;
+ return this;
+ }
+
+ public Builder dateformat(String dateformat) {
+ this.dateformat = dateformat;
+ return this;
+ }
+
+ public HbaseColumnCell build() {
+ return new HbaseColumnCell(this);
+ }
+ }
+}
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseReaderErrorCode.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseReaderErrorCode.java
new file mode 100755
index 0000000000..551b19b630
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/HbaseReaderErrorCode.java
@@ -0,0 +1,36 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+public enum HbaseReaderErrorCode implements ErrorCode {
+ REQUIRED_VALUE("ObHbaseReader-00", "Missing required parameters."),
+ ILLEGAL_VALUE("ObHbaseReader-01", "Illegal configuration."),
+ PREPAR_READ_ERROR("ObHbaseReader-02", "Preparing to read ObHBase error."),
+ SPLIT_ERROR("ObHbaseReader-03", "Splitting ObHBase table error."),
+ INIT_TABLE_ERROR("ObHbaseReader-04", "Initializing ObHBase extraction table error"),
+ PARSE_COLUMN_ERROR("ObHbaseReader-05", "Parse column failed."),
+ READ_ERROR("ObHbaseReader-06", "Read ObHBase error.");
+
+ private final String code;
+ private final String description;
+
+ private HbaseReaderErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
+ }
+}
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Key.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Key.java
new file mode 100755
index 0000000000..6415efd098
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/Key.java
@@ -0,0 +1,103 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+public final class Key {
+
+ public final static String HBASE_CONFIG = "hbaseConfig";
+
+ /**
+ * mode 可以取 normal 或者 multiVersionFixedColumn 或者 multiVersionDynamicColumn 三个值,无默认值。
+ *
+ * normal 配合 column(Map 结构的)使用
+ *
+ * multiVersionFixedColumn 配合 maxVersion,tetradType, column(List 结构的)使用
+ *
+ * multiVersionDynamicColumn 配合 maxVersion,tetradType, columnFamily(List 结构的)使用
+ */
+ public final static String MODE = "mode";
+
+ /**
+ * 配合 mode = multiVersion 时使用,指明需要读取的版本个数。无默认值
+ * -1 表示去读全部版本
+ * 不能为0,1
+ * >1 表示最多读取对应个数的版本数(不能超过 Integer 的最大值)
+ */
+ public final static String MAX_VERSION = "maxVersion";
+
+ /**
+ * 多版本情况下,必须配置 四元组的类型(rowkey,column,timestamp,value)
+ */
+ public final static String TETRAD_TYPE = "tetradType";
+
+ /**
+ * 默认为 utf8
+ */
+ public final static String ENCODING = "encoding";
+
+ public final static String TABLE = "table";
+
+ public final static String USERNAME = "username";
+
+ public final static String OB_SYS_USERNAME = "obSysUser";
+
+ public final static String CONFIG_URL = "obConfigUrl";
+
+ public final static String ODP_HOST = "odpHost";
+
+ public final static String ODP_PORT = "odpPort";
+
+ public final static String DB_NAME = "dbName";
+
+ public final static String PASSWORD = "password";
+
+ public final static String OB_SYS_PASSWORD = "obSysPassword";
+
+ public final static String COLUMN_FAMILY = "columnFamily";
+
+ public final static String COLUMN = "column";
+
+ public final static String START_ROWKEY = "startRowkey";
+
+ public final static String END_ROWKEY = "endRowkey";
+
+ public final static String IS_BINARY_ROWKEY = "isBinaryRowkey";
+
+ public final static String SCAN_CACHE = "scanCache";
+
+ public final static String RS_URL = "rsUrl";
+
+ public final static String MAX_ACTIVE_CONNECTION = "maxActiveConnection";
+
+ public final static int DEFAULT_MAX_ACTIVE_CONNECTION = 2000;
+
+ public final static String TIMEOUT = "timeout";
+
+ public final static long DEFAULT_TIMEOUT = 30;
+
+ public final static String PARTITION_NAME = "partitionName";
+
+ public final static String JDBC_URL = "jdbcUrl";
+
+ public final static String TIMEZONE = "timezone";
+
+ public final static String FETCH_SIZE = "fetchSize";
+
+ public final static String READ_BATCH_SIZE = "readBatchSize";
+
+ public final static String SESSION = "session";
+
+ public final static String READER_HINT = "readerHint";
+
+ public final static String QUERY_SQL = "querySql";
+
+ public final static String SAMPLE_PERCENTAGE = "samplePercentage";
+ // 是否使用独立密码
+ public final static String USE_SPECIAL_SECRET = "useSpecialSecret";
+
+ public final static String USE_SQL_READER = "useSqlReader";
+
+ public final static String USE_ODP_MODE = "useOdpMode";
+
+ public final static String RANGE = "range";
+
+ public final static String READ_BY_PARTITION = "readByPartition";
+}
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_en_US.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_en_US.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_ja_JP.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_ja_JP.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_CN.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_CN.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_HK.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_HK.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_TW.properties b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/LocalStrings_zh_TW.properties
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/ObHbaseReader.java b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/ObHbaseReader.java
new file mode 100755
index 0000000000..15472d6eaf
--- /dev/null
+++ b/obhbasereader/src/main/java/com/alibaba/datax/plugin/reader/obhbasereader/ObHbaseReader.java
@@ -0,0 +1,445 @@
+package com.alibaba.datax.plugin.reader.obhbasereader;
+
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.DEFAULT_OB_TABLE_CLIENT_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.DEFAULT_OB_TABLE_HBASE_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.DEFAULT_USE_ODPMODE;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_COM_ALIPAY_TABLE_CLIENT_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_COM_ALIPAY_TABLE_HBASE_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_HBASE_LOG_PATH;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_TABLE_CLIENT_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_TABLE_CLIENT_PROPERTY;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_TABLE_HBASE_LOG_LEVEL;
+import static com.alibaba.datax.plugin.reader.obhbasereader.Constant.OB_TABLE_HBASE_PROPERTY;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.reader.Constant;
+import com.alibaba.datax.plugin.rdbms.reader.util.ObVersion;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.TableExpandUtil;
+import com.alibaba.datax.plugin.reader.obhbasereader.enums.ModeType;
+import com.alibaba.datax.plugin.reader.obhbasereader.ext.ServerConnectInfo;
+import com.alibaba.datax.plugin.reader.obhbasereader.task.AbstractHbaseTask;
+import com.alibaba.datax.plugin.reader.obhbasereader.task.SQLNormalModeReader;
+import com.alibaba.datax.plugin.reader.obhbasereader.task.ScanMultiVersionReader;
+import com.alibaba.datax.plugin.reader.obhbasereader.task.ScanNormalModeReader;
+import com.alibaba.datax.plugin.reader.obhbasereader.util.HbaseSplitUtil;
+import com.alibaba.datax.plugin.reader.obhbasereader.util.ObHbaseReaderUtil;
+import com.alibaba.datax.plugin.reader.obhbasereader.util.SqlReaderSplitUtil;
+import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
+
+import com.google.common.base.Preconditions;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * ObHbaseReader 支持分库分表
+ * 仅支持ob3.x及以上版本
+ */
+public class ObHbaseReader extends Reader {
+
+ public static class Job extends Reader.Job {
+ static private final String ACCESS_DENIED_ERROR = "Access denied for user";
+ private static Logger LOG = LoggerFactory.getLogger(ObHbaseReader.class);
+ private Configuration originalConfig;
+
+ @Override
+ public void init() {
+ if (System.getProperty(OB_TABLE_CLIENT_PROPERTY) == null) {
+ LOG.info(OB_TABLE_CLIENT_PROPERTY + " not set");
+ System.setProperty(OB_TABLE_CLIENT_PROPERTY, OB_HBASE_LOG_PATH);
+ }
+ if (System.getProperty(OB_TABLE_HBASE_PROPERTY) == null) {
+ LOG.info(OB_TABLE_HBASE_PROPERTY + " not set");
+ System.setProperty(OB_TABLE_HBASE_PROPERTY, OB_HBASE_LOG_PATH);
+ }
+ if (System.getProperty(OB_TABLE_CLIENT_LOG_LEVEL) == null) {
+ LOG.info(OB_TABLE_CLIENT_LOG_LEVEL + " not set");
+ System.setProperty(OB_TABLE_CLIENT_LOG_LEVEL, DEFAULT_OB_TABLE_CLIENT_LOG_LEVEL);
+ }
+ if (System.getProperty(OB_TABLE_HBASE_LOG_LEVEL) == null) {
+ LOG.info(OB_TABLE_HBASE_LOG_LEVEL + " not set");
+ System.setProperty(OB_TABLE_HBASE_LOG_LEVEL, DEFAULT_OB_TABLE_HBASE_LOG_LEVEL);
+ }
+ if (System.getProperty(OB_COM_ALIPAY_TABLE_CLIENT_LOG_LEVEL) == null) {
+ LOG.info(OB_COM_ALIPAY_TABLE_CLIENT_LOG_LEVEL + " not set");
+ System.setProperty(OB_COM_ALIPAY_TABLE_CLIENT_LOG_LEVEL, DEFAULT_OB_TABLE_CLIENT_LOG_LEVEL);
+ }
+ if (System.getProperty(OB_COM_ALIPAY_TABLE_HBASE_LOG_LEVEL) == null) {
+ LOG.info(OB_COM_ALIPAY_TABLE_HBASE_LOG_LEVEL + " not set");
+ System.setProperty(OB_COM_ALIPAY_TABLE_HBASE_LOG_LEVEL, DEFAULT_OB_TABLE_HBASE_LOG_LEVEL);
+ }
+
+ LOG.info("{} is set to {}, {} is set to {}",
+ OB_TABLE_CLIENT_PROPERTY, OB_HBASE_LOG_PATH, OB_TABLE_HBASE_PROPERTY, OB_HBASE_LOG_PATH);
+ this.originalConfig = super.getPluginJobConf();
+ ObHbaseReaderUtil.doPretreatment(originalConfig);
+ List