Skip to content

Commit

Permalink
[Improvement-17001] Once workflow is not exist, delete scheduler task
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Feb 8, 2025
1 parent 806f051 commit ca90f2c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository;

import org.apache.dolphinscheduler.dao.entity.Schedule;

public interface ScheduleDao extends IDao<Schedule> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository.impl;

import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;

import lombok.NonNull;

import org.springframework.stereotype.Repository;

@Repository
public class ScheduleDaoImpl extends BaseDao<Schedule, ScheduleMapper> implements ScheduleDao {

public ScheduleDaoImpl(@NonNull ScheduleMapper scheduleMapper) {
super(scheduleMapper);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.Date;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -44,7 +46,10 @@
public class ProcessScheduleTask extends QuartzJobBean {

@Autowired
private ProcessService processService;
private ScheduleDao scheduleDao;

@Autowired
private WorkflowDefinitionDao workflowDefinitionDao;

@Autowired
private IWorkflowControlClient workflowInstanceController;
Expand All @@ -53,34 +58,48 @@ public class ProcessScheduleTask extends QuartzJobBean {
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
protected void executeInternal(JobExecutionContext context) {
QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
int projectId = quartzJobData.getProjectId();
int scheduleId = quartzJobData.getScheduleId();
final QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
final int projectId = quartzJobData.getProjectId();
final int scheduleId = quartzJobData.getScheduleId();

Date scheduledFireTime = context.getScheduledFireTime();
final Date scheduledFireTime = context.getScheduledFireTime();
final Date fireTime = context.getFireTime();

Date fireTime = context.getFireTime();
log.info("Scheduler: {} fired expect fire time is {}, actual fire time is {}",
scheduleId,
scheduledFireTime,
fireTime);

log.info("scheduled fire time :{}, fire time :{}, scheduleId :{}", scheduledFireTime, fireTime, scheduleId);

// query schedule
Schedule schedule = processService.querySchedule(scheduleId);
// If the schedule does not exist or offline, then delete the corn job
final Schedule schedule = scheduleDao.queryById(scheduleId);
if (schedule == null || ReleaseState.OFFLINE == schedule.getReleaseState()) {
log.warn("Scheduler: {} does not exist in db,will delete job in quartz", scheduleId);
deleteJob(context, projectId, scheduleId);
return;
}

final Optional<WorkflowDefinition> workflowDefinitionOptional =
workflowDefinitionDao.queryByCode(schedule.getWorkflowDefinitionCode());
if (!workflowDefinitionOptional.isPresent()) {
log.warn(
"process schedule does not exist in db or process schedule offline,delete schedule job in quartz, projectId:{}, scheduleId:{}",
projectId, scheduleId);
"Scheduler: {} bind workflow: {} does not exist in db,will delete the schedule and delete schedule job in quartz",
scheduleId,
schedule.getWorkflowDefinitionCode());
scheduleDao.deleteById(scheduleId);
deleteJob(context, projectId, scheduleId);
return;
}

WorkflowDefinition workflowDefinition =
processService.findWorkflowDefinitionByCode(schedule.getWorkflowDefinitionCode());
// release state : online/offline
ReleaseState releaseState = workflowDefinition.getReleaseState();
if (releaseState == ReleaseState.OFFLINE) {
final WorkflowDefinition workflowDefinition = workflowDefinitionOptional.get();
if (workflowDefinition.getReleaseState() == ReleaseState.OFFLINE) {
log.warn(
"process definition does not exist in db or offline,need not to create command, projectId:{}, processDefinitionId:{}",
projectId, workflowDefinition.getId());
"Scheduler: {} bind workflow: {} state is OFFLINE,will update the schedule status to OFFLINE and delete schedule job in quartz",
scheduleId,
schedule.getWorkflowDefinitionCode());
schedule.setReleaseState(ReleaseState.OFFLINE);
schedule.setUpdateTime(new Date());
scheduleDao.updateById(schedule);
deleteJob(context, projectId, scheduleId);
return;
}

Expand All @@ -106,14 +125,14 @@ protected void executeInternal(JobExecutionContext context) {

private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
final Scheduler scheduler = context.getScheduler();
JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
final JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
try {
if (scheduler.checkExists(jobKey)) {
log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId);
log.info("Try to delete job: {}, projectId: {}, schedulerId: {}", jobKey, projectId, scheduleId);
scheduler.deleteJob(jobKey);
}
} catch (Exception e) {
log.error("Failed to delete job: {}", jobKey);
log.error("Failed to delete job: {}", jobKey, e);
}
}
}

0 comments on commit ca90f2c

Please sign in to comment.