Skip to content

Commit

Permalink
[master_0804_fix_issue_1103][scheduler][fix issue 1103] (#1108)
Browse files Browse the repository at this point in the history
#1103 解决RDB SQL 执行中 进程重启 任务状态卡死 不结束问题

Co-authored-by: haier <[email protected]>
Former-commit-id: 61a17475c2e4e1ddbfcfc3ca282a6a8717e79567
  • Loading branch information
hilitee and haier authored Aug 10, 2023
1 parent aaae5b5 commit d261490
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.dtstack.taier.common.BlockCallerPolicy;
import com.dtstack.taier.common.enums.EJobCacheStage;
import com.dtstack.taier.common.enums.EJobClientType;
import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.enums.EScheduleType;
import com.dtstack.taier.common.env.EnvironmentContext;
import com.dtstack.taier.common.util.LogCountUtil;
Expand Down Expand Up @@ -189,7 +192,13 @@ private void dealJob(String jobId) throws Exception {
JobIdentifier jobIdentifier = new JobIdentifier(engineTaskId, appId, jobId, scheduleJob.getTenantId(), taskType, deployMode.getType(),
null, MapUtils.isEmpty(pluginInfo) ? null : JSONObject.toJSONString(pluginInfo), paramAction.getComponentVersion(), paramAction.getQueueName());

TaskStatus taskStatus = workerOperator.getJobStatus(jobIdentifier);
TaskStatus taskStatus;
EJobClientType jobClientType = EJobClientType.getJobClientTypeByTask(taskType);
if(EJobClientType.DATASOURCE_PLUGIN == jobClientType){
taskStatus = TaskStatus.FAILED;
} else {
taskStatus = workerOperator.getJobStatus(jobIdentifier);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("------ jobId:{} dealJob status:{}", jobId, taskStatus);
Expand Down Expand Up @@ -262,7 +271,7 @@ private void updateJobStatusWithPredicate(ScheduleJob scheduleJob, String jobId,
}
}
}

private TaskStatus checkNotFoundStatus(TaskStatus taskStatus, String jobId) {
JobStatusFrequency statusPair = updateJobStatusFrequency(jobId, taskStatus.getStatus());
//如果状态为NotFound,则对频次进行判断
Expand Down Expand Up @@ -344,4 +353,4 @@ public void start() {
LOGGER.info("{} thread start ...", jobResource + this.getClass().getSimpleName());

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class JobStopDealer implements ApplicationListener<ApplicationStartedEven
private EnvironmentContext environmentContext;

@Autowired
private ScheduleJobCacheService ScheduleJobCacheService;
private ScheduleJobCacheService scheduleJobCacheService;

@Autowired
private ScheduleJobOperatorRecordService scheduleJobOperatorRecordService;
Expand Down Expand Up @@ -248,7 +248,7 @@ public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent)
private void asyncDealStopJob(StoppedJob<JobElement> stoppedJob) {
try {
if (!checkExpired(stoppedJob.getJob())) {
ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(stoppedJob.getJob().jobId);
ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(stoppedJob.getJob().jobId);
StoppedStatus stoppedStatus = this.stopJob(stoppedJob.getJob());
switch (stoppedStatus) {
case STOPPED:
Expand Down Expand Up @@ -313,7 +313,7 @@ public void close() {
}

private StoppedStatus stopJob(JobElement jobElement) throws Exception {
ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(jobElement.jobId);
ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(jobElement.jobId);
ScheduleJob scheduleJob = scheduleJobService.lambdaQuery()
.eq(ScheduleJob::getJobId, jobElement.jobId)
.eq(ScheduleJob::getIsDeleted, Deleted.NORMAL.getStatus())
Expand Down Expand Up @@ -377,7 +377,7 @@ private StoppedStatus stopJob(JobElement jobElement) throws Exception {
}

private boolean checkExpired(JobElement jobElement) {
ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(jobElement.jobId);
ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(jobElement.jobId);
ScheduleJobOperatorRecord scheduleJobOperatorRecord = scheduleJobOperatorRecordService.getById(jobElement.stopJobId);

if (jobCache != null && scheduleJobOperatorRecord != null && scheduleJobOperatorRecord.getGmtCreate() != null) {
Expand All @@ -389,7 +389,7 @@ private boolean checkExpired(JobElement jobElement) {

private void removeMemStatusAndJobCache(String jobId) {
shardCache.removeIfPresent(jobId);
ScheduleJobCacheService.deleteByJobId(jobId);
scheduleJobCacheService.deleteByJobId(jobId);
//修改任务状态
scheduleJobService.updateStatusAndLogInfoById(jobId, TaskStatus.CANCELED.getStatus(), "");
LOGGER.info("jobId:{} delete jobCache and update job status:{}, job set finished.", jobId, TaskStatus.CANCELED.getStatus());
Expand Down Expand Up @@ -424,7 +424,7 @@ public void run() {
break;
}
List<String> jobIds = jobStopRecords.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toList());
List<ScheduleJobCache> jobCaches = ScheduleJobCacheService.getByJobIds(jobIds);
List<ScheduleJobCache> jobCaches = scheduleJobCacheService.getByJobIds(jobIds);

//为了下面兼容异常状态的任务停止
Map<String, ScheduleJobCache> jobCacheMap = new HashMap<>(jobCaches.size());
Expand Down

0 comments on commit d261490

Please sign in to comment.