Skip to content

Commit

Permalink
datax 202302
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaobo committed Mar 21, 2023
1 parent f01836a commit f79f742
Show file tree
Hide file tree
Showing 20 changed files with 16 additions and 321 deletions.
1 change: 0 additions & 1 deletion adswriter/doc/adswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。
"account": "[email protected]",
"odpsServer": "xxx",
"tunnelServer": "xxx",
"accountType": "aliyun",
"project": "transfer_project"
},
"writeMode": "load",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ public class TransferProjectConf {
public final static String KEY_ACCOUNT = "odps.account";
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
public final static String KEY_ACCOUNT_TYPE = "odps.accountType";
public final static String KEY_PROJECT = "odps.project";

private String accessId;
private String accessKey;
private String account;
private String odpsServer;
private String odpsTunnel;
private String accountType;
private String project;

public static TransferProjectConf create(Configuration adsWriterConf) {
Expand All @@ -30,7 +28,6 @@ public static TransferProjectConf create(Configuration adsWriterConf) {
res.account = adsWriterConf.getString(KEY_ACCOUNT);
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun");
res.project = adsWriterConf.getString(KEY_PROJECT);
return res;
}
Expand All @@ -55,9 +52,6 @@ public String getOdpsTunnel() {
return odpsTunnel;
}

public String getAccountType() {
return accountType;
}

public String getProject() {
return project;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class PerfTrace {
private int taskGroupId;
private int channelNumber;

private int priority;
private int batchSize = 500;
private volatile boolean perfReportEnable = true;

Expand All @@ -54,12 +53,12 @@ public class PerfTrace {
* @param taskGroupId
* @return
*/
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {

if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
}
}
}
Expand All @@ -76,22 +75,21 @@ public static PerfTrace getInstance() {
LOG.error("PerfTrace instance not be init! must have some error! ");
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(false, -1111, -1111, 0, false);
instance = new PerfTrace(false, -1111, -1111, false);
}
}
}
return instance;
}

private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) {
try {
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
this.enable = enable;
this.isJob = isJob;
this.taskGroupId = taskGroupId;
this.instId = jobId;
this.priority = priority;
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable));

} catch (Exception e) {
// do nothing
Expand Down Expand Up @@ -398,7 +396,6 @@ public synchronized JobStatisticsDto2 getReports(String mode) {
jdo.setWindowEnd(this.windowEnd);
jdo.setJobStartTime(jobStartTime);
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
jdo.setJobPriority(this.priority);
jdo.setChannelNum(this.channelNumber);
jdo.setCluster(this.cluster);
jdo.setJobDomain(this.jobDomain);
Expand Down Expand Up @@ -609,7 +606,6 @@ class JobStatisticsDto2 {
private Date jobStartTime;
private Date jobEndTime;
private Long jobRunTimeMs;
private Integer jobPriority;
private Integer channelNum;
private String cluster;
private String jobDomain;
Expand Down Expand Up @@ -680,10 +676,6 @@ public Long getJobRunTimeMs() {
return jobRunTimeMs;
}

public Integer getJobPriority() {
return jobPriority;
}

public Integer getChannelNum() {
return channelNum;
}
Expand Down Expand Up @@ -816,10 +808,6 @@ public void setJobRunTimeMs(Long jobRunTimeMs) {
this.jobRunTimeMs = jobRunTimeMs;
}

public void setJobPriority(Integer jobPriority) {
this.jobPriority = jobPriority;
}

public void setChannelNum(Integer channelNum) {
this.channelNum = channelNum;
}
Expand Down

This file was deleted.

9 changes: 1 addition & 8 deletions core/src/main/java/com/alibaba/datax/core/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,9 @@ public void start(Configuration allConf) {
perfReportEnable = false;
}

int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}

Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.getBoolean(rawData));
row.addBoolean(name, Boolean.parseBoolean(rawData));
}
break;
case STRING:
Expand Down
3 changes: 1 addition & 2 deletions mongodbreader/doc/mongodbreader.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
"accessKey": "********************",
"truncate": true,
"odpsServer": "xxx/api",
"tunnelServer": "xxx",
"accountType": "aliyun"
"tunnelServer": "xxx"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,9 @@ public class Constant {

public static final String PARTITION_SPLIT_MODE = "partition";

public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";

public static final String TAOBAO_ACCOUNT_TYPE = "taobao";

// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
public final static String COLUMN_CONSTANT_FLAG = "'";

/**
* 以下是获取accesskey id 需要用到的常量值
*/
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";

public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";

public static final String PARTITION_COLUMNS = "partitionColumns";

public static final String PARSED_COLUMNS = "parsedColumns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ public class Key {
// 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分
public final static String SPLIT_MODE = "splitMode";

// 账号类型,默认为aliyun,也可能为taobao等其他类型
public final static String ACCOUNT_TYPE = "accountType";

public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";

public final static String IS_COMPRESS = "isCompress";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ public void init() {
this.originalConfig = super.getPluginJobConf();
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);

//如果用户没有配置accessId/accessKey,尝试从环境变量获取
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE);
if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
}

//检查必要的参数配置
OdpsUtil.checkNecessaryConfig(this.originalConfig);
//重试次数的配置检查
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,12 @@ public static Odps initOdps(Configuration originalConfig) {
defaultProject = packageAuthorizedProject;
}

String accountType = originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);

Account account = null;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.3", accountType));
account = new AliyunAccount(accessId, accessKey);
}

Odps odps = new Odps(account);
Expand Down
3 changes: 1 addition & 2 deletions odpswriter/doc/odpswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据,主要提供给etl
"accessKey": "xxxx",
"truncate": true,
"odpsServer": "http://sxxx/api",
"tunnelServer": "http://xxx",
"accountType": "aliyun"
"tunnelServer": "http://xxx"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@


public class Constant {
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";

public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";

public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";

public static final String TAOBAO_ACCOUNT_TYPE = "taobao";

public static final String COLUMN_POSITION = "columnPosition";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public final class Key {
//boolean 类型,default:false
public final static String EMPTY_AS_NULL = "emptyAsNull";

public final static String ACCOUNT_TYPE = "accountType";

public final static String IS_COMPRESS = "isCompress";

// preSql
Expand Down
Loading

0 comments on commit f79f742

Please sign in to comment.