From d2614904abebe3634dc78b079f719c846b50271e Mon Sep 17 00:00:00 2001 From: hilite Date: Thu, 10 Aug 2023 13:51:36 +0800 Subject: [PATCH] [master_0804_fix_issue_1103][scheduler][fix issue 1103] (#1108) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #1103 解决RDB SQL 执行中 进程重启 任务状态卡死 不结束问题 Co-authored-by: haier Former-commit-id: 61a17475c2e4e1ddbfcfc3ca282a6a8717e79567 --- .../scheduler/jobdealer/JobStatusDealer.java | 15 ++++++++++++--- .../taier/scheduler/jobdealer/JobStopDealer.java | 12 ++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStatusDealer.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStatusDealer.java index 4a2580e52d..98fb4bccfe 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStatusDealer.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStatusDealer.java @@ -21,6 +21,9 @@ import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.dtstack.taier.common.BlockCallerPolicy; +import com.dtstack.taier.common.enums.EJobCacheStage; +import com.dtstack.taier.common.enums.EJobClientType; +import com.dtstack.taier.common.enums.EScheduleJobType; import com.dtstack.taier.common.enums.EScheduleType; import com.dtstack.taier.common.env.EnvironmentContext; import com.dtstack.taier.common.util.LogCountUtil; @@ -189,7 +192,13 @@ private void dealJob(String jobId) throws Exception { JobIdentifier jobIdentifier = new JobIdentifier(engineTaskId, appId, jobId, scheduleJob.getTenantId(), taskType, deployMode.getType(), null, MapUtils.isEmpty(pluginInfo) ? null : JSONObject.toJSONString(pluginInfo), paramAction.getComponentVersion(), paramAction.getQueueName()); - TaskStatus taskStatus = workerOperator.getJobStatus(jobIdentifier); + TaskStatus taskStatus; + EJobClientType jobClientType = EJobClientType.getJobClientTypeByTask(taskType); + if(EJobClientType.DATASOURCE_PLUGIN == jobClientType){ + taskStatus = TaskStatus.FAILED; + } else { + taskStatus = workerOperator.getJobStatus(jobIdentifier); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("------ jobId:{} dealJob status:{}", jobId, taskStatus); @@ -262,7 +271,7 @@ private void updateJobStatusWithPredicate(ScheduleJob scheduleJob, String jobId, } } } - + private TaskStatus checkNotFoundStatus(TaskStatus taskStatus, String jobId) { JobStatusFrequency statusPair = updateJobStatusFrequency(jobId, taskStatus.getStatus()); //如果状态为NotFound,则对频次进行判断 @@ -344,4 +353,4 @@ public void start() { LOGGER.info("{} thread start ...", jobResource + this.getClass().getSimpleName()); } -} \ No newline at end of file +} diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java index d57e79fd72..e36dda2936 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/jobdealer/JobStopDealer.java @@ -97,7 +97,7 @@ public class JobStopDealer implements ApplicationListener stoppedJob) { try { if (!checkExpired(stoppedJob.getJob())) { - ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(stoppedJob.getJob().jobId); + ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(stoppedJob.getJob().jobId); StoppedStatus stoppedStatus = this.stopJob(stoppedJob.getJob()); switch (stoppedStatus) { case STOPPED: @@ -313,7 +313,7 @@ public void close() { } private StoppedStatus stopJob(JobElement jobElement) throws Exception { - ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(jobElement.jobId); + ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(jobElement.jobId); ScheduleJob scheduleJob = scheduleJobService.lambdaQuery() .eq(ScheduleJob::getJobId, jobElement.jobId) .eq(ScheduleJob::getIsDeleted, Deleted.NORMAL.getStatus()) @@ -377,7 +377,7 @@ private StoppedStatus stopJob(JobElement jobElement) throws Exception { } private boolean checkExpired(JobElement jobElement) { - ScheduleJobCache jobCache = ScheduleJobCacheService.getByJobId(jobElement.jobId); + ScheduleJobCache jobCache = scheduleJobCacheService.getByJobId(jobElement.jobId); ScheduleJobOperatorRecord scheduleJobOperatorRecord = scheduleJobOperatorRecordService.getById(jobElement.stopJobId); if (jobCache != null && scheduleJobOperatorRecord != null && scheduleJobOperatorRecord.getGmtCreate() != null) { @@ -389,7 +389,7 @@ private boolean checkExpired(JobElement jobElement) { private void removeMemStatusAndJobCache(String jobId) { shardCache.removeIfPresent(jobId); - ScheduleJobCacheService.deleteByJobId(jobId); + scheduleJobCacheService.deleteByJobId(jobId); //修改任务状态 scheduleJobService.updateStatusAndLogInfoById(jobId, TaskStatus.CANCELED.getStatus(), ""); LOGGER.info("jobId:{} delete jobCache and update job status:{}, job set finished.", jobId, TaskStatus.CANCELED.getStatus()); @@ -424,7 +424,7 @@ public void run() { break; } List jobIds = jobStopRecords.stream().map(ScheduleJobOperatorRecord::getJobId).collect(Collectors.toList()); - List jobCaches = ScheduleJobCacheService.getByJobIds(jobIds); + List jobCaches = scheduleJobCacheService.getByJobIds(jobIds); //为了下面兼容异常状态的任务停止 Map jobCacheMap = new HashMap<>(jobCaches.size());