From 9d8fe30f6d8140415348941f218e18824bf0dc0b Mon Sep 17 00:00:00 2001 From: Administrator <1> Date: Wed, 16 Nov 2022 18:03:53 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Phoenix4=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?where=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hbase11xsqlreader/HbaseSQLHelper.java | 22 +++++++---- .../HbaseSQLReaderConfig.java | 38 ++++++++++++++++--- .../hbase11xsqlreader/HbaseSQLReaderTask.java | 13 ++++--- .../plugin/reader/hbase11xsqlreader/Key.java | 13 +++++++ 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java index 5309d1d99a..8c25fcc80e 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java @@ -26,9 +26,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class HbaseSQLHelper { @@ -50,11 +48,15 @@ public static org.apache.hadoop.conf.Configuration generatePhoenixConf(HbaseSQLR String zkUrl = readerConfig.getZkUrl(); PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class); - PhoenixConfigurationUtil.setInputTableName(conf, table); + + PhoenixConfigurationUtil.setInputTableName(conf, readerConfig.getSchema()+"."+table); if (!columns.isEmpty()) { PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()])); } + if(Objects.nonNull(readerConfig.getWhere())){ + PhoenixConfigurationUtil.setInputTableConditions(conf,readerConfig.getWhere()); + } PhoenixEmbeddedDriver.ConnectionInfo info = null; try { info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl); @@ -67,15 +69,19 @@ public static org.apache.hadoop.conf.Configuration generatePhoenixConf(HbaseSQLR conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort()); if (info.getRootNode() != null) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode()); + conf.set(Key.NAME_SPACE_MAPPING_ENABLED,"true"); + conf.set(Key.SYSTEM_TABLES_TO_NAMESPACE,"true"); return conf; } - public static List getPColumnNames(String connectionString, String tableName) throws SQLException { - Connection con = - DriverManager.getConnection(connectionString); + public static List getPColumnNames(String connectionString, String tableName,String schema) throws SQLException { + Properties pro = new Properties(); + pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true); + pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true); + Connection con = DriverManager.getConnection(connectionString,pro); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); - PTable table = metaDataClient.updateCache("", tableName).getTable(); + PTable table = metaDataClient.updateCache(schema, tableName).getTable(); List columnNames = new ArrayList(); for (PColumn pColumn : table.getColumns()) { if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME)) diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java index ab06f6e1a8..37060986f3 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java @@ -9,6 +9,7 @@ import java.sql.SQLException; import java.util.List; +import java.util.StringJoiner; public class HbaseSQLReaderConfig { private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class); @@ -27,6 +28,9 @@ public String getZkUrl() { private String tableName; private List columns; // 目的表的所有列的列名,包括主键和非主键,不包括时间列 + private String where;//条件 + + private String schema;// /** * @return 获取原始的datax配置 */ @@ -96,22 +100,27 @@ private static void parseClusterConfig(HbaseSQLReaderConfig cfg, Configuration d } String zkQuorum = zkCfg.getFirst(); String znode = zkCfg.getSecond(); + if (zkQuorum == null || zkQuorum.isEmpty()) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" ); } // 生成sql使用的连接字符串, 格式: jdbc:hbase:zk_quorum:2181:/znode_parent - cfg.connectionString = "jdbc:phoenix:" + zkQuorum; - cfg.zkUrl = zkQuorum + ":2181"; + StringBuilder connectionString=new StringBuilder("jdbc:phoenix:"); + connectionString.append(zkQuorum); + cfg.connectionString = connectionString.toString(); + StringBuilder zkUrl =new StringBuilder(zkQuorum); + cfg.zkUrl = zkUrl.append(":2181").toString(); if (!znode.isEmpty()) { - cfg.connectionString += cfg.connectionString + ":" + znode; - cfg.zkUrl += cfg.zkUrl + ":" + znode; + cfg.connectionString = connectionString.append(":").append(znode).toString(); + cfg.zkUrl=zkUrl.append(":").append(znode).toString(); } } private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) { // 解析并检查表名 cfg.tableName = dataxCfg.getString(Key.TABLE); + cfg.schema = dataxCfg.getString(Key.SCHEMA); if (cfg.tableName == null || cfg.tableName.isEmpty()) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." ); @@ -124,13 +133,14 @@ private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dat HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置."); } else if (cfg.columns.isEmpty()) { try { - cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName); + cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName,cfg.schema); dataxCfg.set(Key.COLUMN, cfg.columns); } catch (SQLException e) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e); } } + cfg.where=dataxCfg.getString(Key.WHERE); } @Override @@ -151,6 +161,8 @@ public String toString() { ret.append(","); } ret.setLength(ret.length() - 1); + ret.append("[where=]").append(getWhere()); + ret.append("[schema=]").append(getSchema()); ret.append("\n"); return ret.toString(); @@ -161,4 +173,20 @@ public String toString() { */ private HbaseSQLReaderConfig() { } + + public String getWhere() { + return where; + } + + public void setWhere(String where) { + this.where = where; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } } diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java index 1ca22c6fb9..461649d1e9 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java @@ -19,10 +19,8 @@ import java.io.IOException; import java.math.BigDecimal; import java.sql.*; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.sql.Date; +import java.util.*; /** * Created by admin on 1/3/18. @@ -42,11 +40,14 @@ public HbaseSQLReaderTask(Configuration config) { } private void getPColumns() throws SQLException { + Properties pro = new Properties(); + pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true); + pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true); Connection con = - DriverManager.getConnection(this.readerConfig.getConnectionString()); + DriverManager.getConnection(this.readerConfig.getConnectionString(),pro); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); - PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable(); + PTable table = metaDataClient.updateCache(this.readerConfig.getSchema(), this.readerConfig.getTableName()).getTable(); List columnNames = this.readerConfig.getColumns(); for (PColumn pColumn : table.getColumns()) { if (columnNames.contains(pColumn.getName().getString())) { diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java index 7987d6c8b2..f8453add8f 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java @@ -24,5 +24,18 @@ public final class Key { * 【必选】列配置 */ public final static String COLUMN = "column"; + /** + * + */ + public static final String WHERE = "where"; + + /** + * 【可选】Phoenix表所属schema,默认为空 + */ + public static final String SCHEMA = "schema"; + + public static final String NAME_SPACE_MAPPING_ENABLED = "phoenix.schema.isNamespaceMappingEnabled"; + + public static final String SYSTEM_TABLES_TO_NAMESPACE = "phoenix.schema.mapSystemTablesToNamespace"; } From f047e927bc459a00c9896f65382235e8dea96238 Mon Sep 17 00:00:00 2001 From: Administrator <1> Date: Wed, 16 Nov 2022 18:11:56 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Phoenix4=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?where=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hbase11xsqlreader/doc/hbase11xsqlreader.md | 42 +++++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/hbase11xsqlreader/doc/hbase11xsqlreader.md b/hbase11xsqlreader/doc/hbase11xsqlreader.md index 03261a1f06..9f70077f4e 100644 --- a/hbase11xsqlreader/doc/hbase11xsqlreader.md +++ b/hbase11xsqlreader/doc/hbase11xsqlreader.md @@ -60,12 +60,16 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 //填写连接Phoenix的hbase集群zk地址 "hbaseConfig": { "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" - }, + }, + //填写要读取的phoenix的命名空间 + "schema": "TAG", //填写要读取的phoenix的表名 "table": "US_POPULATION", //填写要读取的列名,不填读取所有列 "column": [ - ] + ], + //查询条件 + "where": "id=" } }, "writer": { @@ -92,11 +96,18 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 * 必选:是
+ * 默认值:无
+* **schema** + + * 描述:编写Phoenix中的namespace,该值设置为'' + + * 必选:是
+ * 默认值:无
* **table** - * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename' + * 描述:编写Phoenix中的表名,该值设置为'tablename' * 必选:是
@@ -109,7 +120,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 * 必选:是
* 默认值:无
+* **where** + + * 描述:填写需要从phoenix表中读取条件判断。 + + * 可选:是
+ * 默认值:无
### 3.3 类型转换 @@ -172,11 +189,14 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 "hbaseConfig": { "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" }, + "schema": "TAG", //填写要读取的phoenix的表名 "table": "US_POPULATION", //填写要读取的列名,不填读取所有列 "column": [ - ] + ], + //查询条件 + "where": "id=" } }, "writer": { @@ -204,7 +224,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 * 必选:是
* 默认值:无
- +* **schema** + + * 描述:编写Phoenix中的namespace,该值设置为'' + + * 必选:是
+ + * 默认值:无
* **table** * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename' @@ -220,7 +246,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实 * 必选:是
* 默认值:无
+ * **where** + * 描述:填写需要从phoenix表中读取条件判断。 + + * 可选:是
+ + * 默认值:无
### 3.3 类型转换