diff --git a/README.md b/README.md index 2c5e95f440..01bbc3ea45 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 # Quick Start -##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz) +##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz) ##### 请点击:[Quick Start](https://github.com/alibaba/DataX/blob/master/userGuid.md) @@ -70,6 +70,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N | | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | | | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | | 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | | | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | | | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | @@ -108,6 +109,12 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 +- [datax_v202303](https://github.com/alibaba/DataX/releases/tag/datax_v202303) + - 精简代码 + - 新增插件(adbmysqlwriter、databendwriter、selectdbwriter) + - 优化插件、修复问题(sqlserver、hdfs、cassandra、kudu、oss) + - fastjson 升级到 fastjson2 + - [datax_v202210](https://github.com/alibaba/DataX/releases/tag/datax_v202210) - 涉及通道能力更新(OceanBase、Tdengine、Doris等) diff --git a/adswriter/doc/adswriter.md b/adswriter/doc/adswriter.md index 4a0fd9619f..c02f80187a 100644 --- a/adswriter/doc/adswriter.md +++ b/adswriter/doc/adswriter.md @@ -110,7 +110,6 @@ DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。 "account": "xxx@aliyun.com", "odpsServer": "xxx", "tunnelServer": "xxx", - "accountType": "aliyun", "project": "transfer_project" }, "writeMode": "load", diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java index bff4b7b900..3d28a8339e 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java @@ -12,7 +12,6 @@ public class TransferProjectConf { public final static String KEY_ACCOUNT = "odps.account"; public final static String KEY_ODPS_SERVER = "odps.odpsServer"; public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer"; - public final static String KEY_ACCOUNT_TYPE = "odps.accountType"; public final static String KEY_PROJECT = "odps.project"; private String accessId; @@ -20,7 +19,6 @@ public class TransferProjectConf { private String account; private String odpsServer; private String odpsTunnel; - private String accountType; private String project; public static TransferProjectConf create(Configuration adsWriterConf) { @@ -30,7 +28,6 @@ public static TransferProjectConf create(Configuration adsWriterConf) { res.account = adsWriterConf.getString(KEY_ACCOUNT); res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER); res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL); - res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun"); res.project = adsWriterConf.getString(KEY_PROJECT); return res; } @@ -55,9 +52,6 @@ public String getOdpsTunnel() { return odpsTunnel; } - public String getAccountType() { - return accountType; - } public String getProject() { return project; diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java index ea9aa42110..cf0457bca1 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java @@ -31,7 +31,6 @@ public class PerfTrace { private int taskGroupId; private int channelNumber; - private int priority; private int batchSize = 500; private volatile boolean perfReportEnable = true; @@ -54,12 +53,12 @@ public class PerfTrace { * @param taskGroupId * @return */ - public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) { + public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) { if (instance == null) { synchronized (lock) { if (instance == null) { - instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable); + instance = new PerfTrace(isJob, jobId, taskGroupId, enable); } } } @@ -76,22 +75,21 @@ public static PerfTrace getInstance() { LOG.error("PerfTrace instance not be init! must have some error! "); synchronized (lock) { if (instance == null) { - instance = new PerfTrace(false, -1111, -1111, 0, false); + instance = new PerfTrace(false, -1111, -1111, false); } } } return instance; } - private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) { + private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) { try { this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId); this.enable = enable; this.isJob = isJob; this.taskGroupId = taskGroupId; this.instId = jobId; - this.priority = priority; - LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority)); + LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable)); } catch (Exception e) { // do nothing @@ -398,7 +396,6 @@ public synchronized JobStatisticsDto2 getReports(String mode) { jdo.setWindowEnd(this.windowEnd); jdo.setJobStartTime(jobStartTime); jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime()); - jdo.setJobPriority(this.priority); jdo.setChannelNum(this.channelNumber); jdo.setCluster(this.cluster); jdo.setJobDomain(this.jobDomain); @@ -609,7 +606,6 @@ class JobStatisticsDto2 { private Date jobStartTime; private Date jobEndTime; private Long jobRunTimeMs; - private Integer jobPriority; private Integer channelNum; private String cluster; private String jobDomain; @@ -680,10 +676,6 @@ public Long getJobRunTimeMs() { return jobRunTimeMs; } - public Integer getJobPriority() { - return jobPriority; - } - public Integer getChannelNum() { return channelNum; } @@ -816,10 +808,6 @@ public void setJobRunTimeMs(Long jobRunTimeMs) { this.jobRunTimeMs = jobRunTimeMs; } - public void setJobPriority(Integer jobPriority) { - this.jobPriority = jobPriority; - } - public void setChannelNum(Integer channelNum) { this.channelNum = channelNum; } diff --git a/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java b/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java deleted file mode 100644 index 8bab301e6f..0000000000 --- a/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.alibaba.datax.common.util; - -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.datax.common.exception.DataXException; - -public class IdAndKeyRollingUtil { - private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class); - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - - public final static String ACCESS_ID = "accessId"; - public final static String ACCESS_KEY = "accessKey"; - - public static String parseAkFromSkynetAccessKey() { - Map envProp = System.getenv(); - String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID); - String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY); - String accessKey = null; - // follow 原有的判断条件 - // 环境变量中,如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy(只要有其中一个变量,则认为一定是两个都存在的! - // if (StringUtils.isNotBlank(skynetAccessID) || - // StringUtils.isNotBlank(skynetAccessKey)) { - // 检查严格,只有加密串不为空的时候才进去,不过 之前能跑的加密串都不应该为空 - if (StringUtils.isNotBlank(skynetAccessKey)) { - LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY."); - accessKey = DESCipher.decrypt(skynetAccessKey); - if (StringUtils.isBlank(accessKey)) { - // 环境变量里面有,但是解析不到 - throw DataXException.asDataXException(String.format( - "Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]", - skynetAccessID)); - } - } - if (StringUtils.isNotBlank(accessKey)) { - LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully."); - } - return accessKey; - } - - public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) { - String accessId = null; - Map envProp = System.getenv(); - accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID); - String accessKey = null; - if (StringUtils.isBlank(accessKey)) { - // 老的没有出异常,只是获取不到ak - accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey(); - } - - if (StringUtils.isNotBlank(accessKey)) { - // 确认使用这个的都是 accessId、accessKey的命名习惯 - originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId); - originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey); - } - return accessKey; - } -} diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index 3834253285..4ba9fc18ec 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -79,16 +79,9 @@ public void start(Configuration allConf) { perfReportEnable = false; } - int priority = 0; - try { - priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY")); - }catch (NumberFormatException e){ - LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY")); - } - Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace - PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable); + PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable); perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); container.start(); diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java index bff3509fbd..df87284240 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -134,7 +134,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu break; case BOOLEAN: synchronized (lock) { - row.addBoolean(name, Boolean.getBoolean(rawData)); + row.addBoolean(name, Boolean.parseBoolean(rawData)); } break; case STRING: diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index 99d25731b4..297e598cd6 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J "accessKey": "********************", "truncate": true, "odpsServer": "xxx/api", - "tunnelServer": "xxx", - "accountType": "aliyun" + "tunnelServer": "xxx" } } } diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java index dee2ef5c27..cf34762ddf 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java @@ -14,20 +14,9 @@ public class Constant { public static final String PARTITION_SPLIT_MODE = "partition"; - public static final String DEFAULT_ACCOUNT_TYPE = "aliyun"; - - public static final String TAOBAO_ACCOUNT_TYPE = "taobao"; - // 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可 public final static String COLUMN_CONSTANT_FLAG = "'"; - /** - * 以下是获取accesskey id 需要用到的常量值 - */ - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - public static final String PARTITION_COLUMNS = "partitionColumns"; public static final String PARSED_COLUMNS = "parsedColumns"; diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java index 2cee65d17f..6f8c7d92df 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java @@ -24,9 +24,6 @@ public class Key { // 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分 public final static String SPLIT_MODE = "splitMode"; - // 账号类型,默认为aliyun,也可能为taobao等其他类型 - public final static String ACCOUNT_TYPE = "accountType"; - public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject"; public final static String IS_COMPRESS = "isCompress"; diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java index 206b013580..615cee502a 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java @@ -42,12 +42,6 @@ public void init() { this.originalConfig = super.getPluginJobConf(); this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false); - //如果用户没有配置accessId/accessKey,尝试从环境变量获取 - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE); - if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - } - //检查必要的参数配置 OdpsUtil.checkNecessaryConfig(this.originalConfig); //重试次数的配置检查 diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java deleted file mode 100644 index 05722b59f9..0000000000 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * (C) 2010-2022 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.datax.plugin.reader.odpsreader.util; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.common.util.IdAndKeyRollingUtil; -import com.alibaba.datax.common.util.MessageSource; -import com.alibaba.datax.plugin.reader.odpsreader.Key; -import com.alibaba.datax.plugin.reader.odpsreader.OdpsReaderErrorCode; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class IdAndKeyUtil { - private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class); - private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class); - - public static Configuration parseAccessIdAndKey(Configuration originalConfig) { - String accessId = originalConfig.getString(Key.ACCESS_ID); - String accessKey = originalConfig.getString(Key.ACCESS_KEY); - - // 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey - if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) { - LOG.info("Try to get accessId/accessKey from your config."); - //通过如下语句,进行检查是否确实配置了 - accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE); - accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE); - //检查完毕,返回即可 - return originalConfig; - } else { - Map envProp = System.getenv(); - return getAccessIdAndKeyFromEnv(originalConfig, envProp); - } - } - - private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig, - Map envProp) { - // 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了 - String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig); - if (StringUtils.isBlank(accessKey)) { - // 无处获取(既没有配置在作业中,也没用在环境变量中) - throw DataXException.asDataXException(OdpsReaderErrorCode.GET_ID_KEY_FAIL, - MESSAGE_SOURCE.message("idandkeyutil.2")); - } - return originalConfig; - } -} diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java index f2ad8e0f97..0ff34a81ba 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java @@ -76,19 +76,12 @@ public static Odps initOdps(Configuration originalConfig) { defaultProject = packageAuthorizedProject; } - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, - Constant.DEFAULT_ACCOUNT_TYPE); Account account = null; - if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) { - if (StringUtils.isNotBlank(securityToken)) { - account = new StsAccount(accessId, accessKey, securityToken); - } else { - account = new AliyunAccount(accessId, accessKey); - } + if (StringUtils.isNotBlank(securityToken)) { + account = new StsAccount(accessId, accessKey, securityToken); } else { - throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpsutil.3", accountType)); + account = new AliyunAccount(accessId, accessKey); } Odps odps = new Odps(account); diff --git a/odpswriter/doc/odpswriter.md b/odpswriter/doc/odpswriter.md index d81672b02c..845dd1d3d0 100644 --- a/odpswriter/doc/odpswriter.md +++ b/odpswriter/doc/odpswriter.md @@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据,主要提供给etl "accessKey": "xxxx", "truncate": true, "odpsServer": "http://sxxx/api", - "tunnelServer": "http://xxx", - "accountType": "aliyun" + "tunnelServer": "http://xxx" } } } diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java index f4d9734b9c..efedfea9d1 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java @@ -2,13 +2,6 @@ public class Constant { - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - - public static final String DEFAULT_ACCOUNT_TYPE = "aliyun"; - - public static final String TAOBAO_ACCOUNT_TYPE = "taobao"; public static final String COLUMN_POSITION = "columnPosition"; diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java index 7ee11128ff..8dff8a4cd3 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java @@ -30,8 +30,6 @@ public final class Key { //boolean 类型,default:false public final static String EMPTY_AS_NULL = "emptyAsNull"; - public final static String ACCOUNT_TYPE = "accountType"; - public final static String IS_COMPRESS = "isCompress"; // preSql diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java index 06476463e9..9b7276fa51 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java @@ -62,7 +62,6 @@ public static class Job extends Writer.Job { private String tableName; private String tunnelServer; private String partition; - private String accountType; private boolean truncate; private String uploadId; private TableTunnel.UploadSession masterUpload; @@ -104,8 +103,6 @@ public void init() { this.tableName = this.originalConfig.getString(Key.TABLE); this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null); - this.dealAK(); - // init odps config this.odps = OdpsUtil.initOdpsProject(this.originalConfig); @@ -153,31 +150,6 @@ public void init() { } } - private void dealAK() { - this.accountType = this.originalConfig.getString(Key.ACCOUNT_TYPE, - Constant.DEFAULT_ACCOUNT_TYPE); - - if (!Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType) && - !Constant.TAOBAO_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType)) { - throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpswriter.1", accountType)); - } - this.originalConfig.set(Key.ACCOUNT_TYPE, this.accountType); - - //检查accessId,accessKey配置 - if (Constant.DEFAULT_ACCOUNT_TYPE - .equalsIgnoreCase(this.accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - String accessId = this.originalConfig.getString(Key.ACCESS_ID); - String accessKey = this.originalConfig.getString(Key.ACCESS_KEY); - if (IS_DEBUG) { - LOG.debug("accessId:[{}], accessKey:[{}] .", accessId, - accessKey); - } - LOG.info("accessId:[{}] .", accessId); - } - } - private void dealDynamicPartition() { /* * 如果显示配置了 supportDynamicPartition,则以配置为准 @@ -241,20 +213,6 @@ private void dealDynamicPartition() { @Override public void prepare() { - String accessId = null; - String accessKey = null; - if (Constant.DEFAULT_ACCOUNT_TYPE - .equalsIgnoreCase(this.accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - accessId = this.originalConfig.getString(Key.ACCESS_ID); - accessKey = this.originalConfig.getString(Key.ACCESS_KEY); - if (IS_DEBUG) { - LOG.debug("accessId:[{}], accessKey:[{}] .", accessId, - accessKey); - } - LOG.info("accessId:[{}] .", accessId); - } - // init odps config this.odps = OdpsUtil.initOdpsProject(this.originalConfig); diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java deleted file mode 100755 index 98c9afdd95..0000000000 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * (C) 2010-2022 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.datax.plugin.writer.odpswriter.util; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.common.util.IdAndKeyRollingUtil; -import com.alibaba.datax.common.util.MessageSource; -import com.alibaba.datax.plugin.writer.odpswriter.Key; -import com.alibaba.datax.plugin.writer.odpswriter.OdpsWriterErrorCode; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class IdAndKeyUtil { - private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class); - private static final MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class); - - public static Configuration parseAccessIdAndKey(Configuration originalConfig) { - String accessId = originalConfig.getString(Key.ACCESS_ID); - String accessKey = originalConfig.getString(Key.ACCESS_KEY); - - // 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey - if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) { - LOG.info("Try to get accessId/accessKey from your config."); - //通过如下语句,进行检查是否确实配置了 - accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsWriterErrorCode.REQUIRED_VALUE); - accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsWriterErrorCode.REQUIRED_VALUE); - //检查完毕,返回即可 - return originalConfig; - } else { - Map envProp = System.getenv(); - return getAccessIdAndKeyFromEnv(originalConfig, envProp); - } - } - - private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig, - Map envProp) { - // 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了 - String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig); - if (StringUtils.isBlank(accessKey)) { - // 无处获取(既没有配置在作业中,也没用在环境变量中) - throw DataXException.asDataXException(OdpsWriterErrorCode.GET_ID_KEY_FAIL, - MESSAGE_SOURCE.message("idandkeyutil.2")); - } - return originalConfig; - } -} diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java index a663da85b1..a3a372af80 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java @@ -79,7 +79,6 @@ public static String formatPartition(String partitionString, Boolean printLog) { public static Odps initOdpsProject(Configuration originalConfig) { - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE); String accessId = originalConfig.getString(Key.ACCESS_ID); String accessKey = originalConfig.getString(Key.ACCESS_KEY); @@ -88,15 +87,10 @@ public static Odps initOdpsProject(Configuration originalConfig) { String securityToken = originalConfig.getString(Key.SECURITY_TOKEN); Account account; - if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) { - if (StringUtils.isNotBlank(securityToken)) { - account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken); - } else { - account = new AliyunAccount(accessId, accessKey); - } + if (StringUtils.isNotBlank(securityToken)) { + account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken); } else { - throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpsutil.4", accountType)); + account = new AliyunAccount(accessId, accessKey); } Odps odps = new Odps(account); diff --git a/pom.xml b/pom.xml index bd1c7f3a43..957c60ee43 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 3.3.2 1.10 1.2 - 2.0.19 + 2.0.23 16.0.1 3.7.2.1-SNAPSHOT diff --git a/starrockswriter/pom.xml b/starrockswriter/pom.xml index 67e3b91956..73a5142240 100755 --- a/starrockswriter/pom.xml +++ b/starrockswriter/pom.xml @@ -64,7 +64,6 @@ com.alibaba.fastjson2 fastjson2 - 2.0.23 mysql @@ -98,10 +97,6 @@ true - - com.alibaba.fastjson - com.starrocks.shade.com.alibaba.fastjson - org.apache.http com.starrocks.shade.org.apache.http @@ -118,7 +113,6 @@ commons-logging:* org.apache.httpcomponents:httpclient org.apache.httpcomponents:httpcore - com.alibaba:fastjson