From 4789ca6c3f1217ff84d3da6e24789cc390f20c2c Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Fri, 7 Feb 2025 19:40:07 +0800 Subject: [PATCH] [Improvement-17001] Once workflow is not exist, delete scheduler task --- .../dao/repository/ScheduleDao.java | 24 +++++++ .../dao/repository/impl/ScheduleDaoImpl.java | 36 ++++++++++ .../scheduler/quartz/ProcessScheduleTask.java | 65 ++++++++++++------- 3 files changed, 102 insertions(+), 23 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ScheduleDao.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ScheduleDaoImpl.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ScheduleDao.java new file mode 100644 index 000000000000..4750aa7ef6e3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ScheduleDao.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.Schedule; + +public interface ScheduleDao extends IDao { + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ScheduleDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ScheduleDaoImpl.java new file mode 100644 index 000000000000..9b85841e3ecf --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ScheduleDaoImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.ScheduleDao; + +import lombok.NonNull; + +import org.springframework.stereotype.Repository; + +@Repository +public class ScheduleDaoImpl extends BaseDao implements ScheduleDao { + + public ScheduleDaoImpl(@NonNull ScheduleMapper scheduleMapper) { + super(scheduleMapper); + } + +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java index 09d69460927e..3523c0427975 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java @@ -22,12 +22,14 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.repository.ScheduleDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao; import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -44,7 +46,10 @@ public class ProcessScheduleTask extends QuartzJobBean { @Autowired - private ProcessService processService; + private ScheduleDao scheduleDao; + + @Autowired + private WorkflowDefinitionDao workflowDefinitionDao; @Autowired private IWorkflowControlClient workflowInstanceController; @@ -53,34 +58,48 @@ public class ProcessScheduleTask extends QuartzJobBean { @Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override protected void executeInternal(JobExecutionContext context) { - QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap()); - int projectId = quartzJobData.getProjectId(); - int scheduleId = quartzJobData.getScheduleId(); + final QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap()); + final int projectId = quartzJobData.getProjectId(); + final int scheduleId = quartzJobData.getScheduleId(); - Date scheduledFireTime = context.getScheduledFireTime(); + final Date scheduledFireTime = context.getScheduledFireTime(); + final Date fireTime = context.getFireTime(); - Date fireTime = context.getFireTime(); + log.info("Scheduler: {} fired expect fire time is {}, actual fire time is {}", + scheduleId, + scheduledFireTime, + fireTime); - log.info("scheduled fire time :{}, fire time :{}, scheduleId :{}", scheduledFireTime, fireTime, scheduleId); - - // query schedule - Schedule schedule = processService.querySchedule(scheduleId); + // If the schedule does not exist or offline, then delete the corn job + final Schedule schedule = scheduleDao.queryById(scheduleId); if (schedule == null || ReleaseState.OFFLINE == schedule.getReleaseState()) { + log.warn("Scheduler: {} does not exist in db,will delete job in quartz", scheduleId); + deleteJob(context, projectId, scheduleId); + return; + } + + final Optional workflowDefinitionOptional = + workflowDefinitionDao.queryByCode(schedule.getWorkflowDefinitionCode()); + if (!workflowDefinitionOptional.isPresent()) { log.warn( - "process schedule does not exist in db or process schedule offline,delete schedule job in quartz, projectId:{}, scheduleId:{}", - projectId, scheduleId); + "Scheduler: {} bind workflow: {} does not exist in db,will delete the schedule and delete schedule job in quartz", + scheduleId, + schedule.getWorkflowDefinitionCode()); + scheduleDao.deleteById(scheduleId); deleteJob(context, projectId, scheduleId); return; } - WorkflowDefinition workflowDefinition = - processService.findWorkflowDefinitionByCode(schedule.getWorkflowDefinitionCode()); - // release state : online/offline - ReleaseState releaseState = workflowDefinition.getReleaseState(); - if (releaseState == ReleaseState.OFFLINE) { + final WorkflowDefinition workflowDefinition = workflowDefinitionOptional.get(); + if (workflowDefinition.getReleaseState() == ReleaseState.OFFLINE) { log.warn( - "process definition does not exist in db or offline,need not to create command, projectId:{}, processDefinitionId:{}", - projectId, workflowDefinition.getId()); + "Scheduler: {} bind workflow: {} state is OFFLINE,will update the schedule status to OFFLINE and delete schedule job in quartz", + scheduleId, + schedule.getWorkflowDefinitionCode()); + schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setUpdateTime(new Date()); + scheduleDao.updateById(schedule); + deleteJob(context, projectId, scheduleId); return; } @@ -106,14 +125,14 @@ protected void executeInternal(JobExecutionContext context) { private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { final Scheduler scheduler = context.getScheduler(); - JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey(); + final JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey(); try { if (scheduler.checkExists(jobKey)) { - log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId); + log.info("Try to delete job: {}, projectId: {}, schedulerId: {}", jobKey, projectId, scheduleId); scheduler.deleteJob(jobKey); } } catch (Exception e) { - log.error("Failed to delete job: {}", jobKey); + log.error("Failed to delete job: {}", jobKey, e); } } }