From 0fd73ce1915f06e9f06460d559adcf3889fc5ca3 Mon Sep 17 00:00:00 2001 From: tinker Date: Thu, 23 Jan 2025 17:53:10 +0800 Subject: [PATCH 1/2] revert commit and catch exception in alterScheduleTask --- .../odc/service/schedule/ScheduleService.java | 112 +++++++++--------- .../schedule/flowtask/AlterScheduleTask.java | 12 +- 2 files changed, 64 insertions(+), 60 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 84db4be8d2..0d22ee6c65 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -433,70 +433,64 @@ private void validateTriggerConfig(TriggerConfig triggerConfig) { @Transactional(rollbackFor = Exception.class) public void executeChangeSchedule(ScheduleChangeParams req) { - try { - Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId()); - // start to change schedule - switch (req.getOperationType()) { - case CREATE: - case RESUME: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); - break; - } - case UPDATE: { - ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); - entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); - entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); - entity.setDescription(req.getUpdateScheduleReq().getDescription()); - entity.setStatus(ScheduleStatus.ENABLED); - PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); - if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { - DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); - } - if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { - DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); - } - targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); - break; - } - case PAUSE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); - break; - } - case TERMINATE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); - break; + Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId()); + // start to change schedule + switch (req.getOperationType()) { + case CREATE: + case RESUME: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); + break; + } + case UPDATE: { + ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); + entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); + entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); + entity.setDescription(req.getUpdateScheduleReq().getDescription()); + entity.setStatus(ScheduleStatus.ENABLED); + PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); + if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { + DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); } - case DELETE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); - break; + if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { + DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); } - default: - throw new UnsupportedException(); + targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); + break; } - - // start change quartzJob - ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); - quartzJobReq.setOperationType(req.getOperationType()); - quartzJobReq.setJobName(targetSchedule.getId().toString()); - quartzJobReq.setJobGroup(targetSchedule.getType().name()); - quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); - quartzJobService.changeQuartzJob(quartzJobReq); - scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.SUCCESS); - log.info("Change schedule success,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), - req.getOperationType(), req.getScheduleChangeLogId()); - } catch (Exception e) { - log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", req.getScheduleId(), - req.getOperationType(), req.getScheduleChangeLogId(), e); - scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.FAILED); - throw e; + case PAUSE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); + break; + } + case TERMINATE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); + break; + } + case DELETE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); + break; + } + default: + throw new UnsupportedException(); } + // start change quartzJob + ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); + quartzJobReq.setOperationType(req.getOperationType()); + quartzJobReq.setJobName(targetSchedule.getId().toString()); + quartzJobReq.setJobGroup(targetSchedule.getType().name()); + quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); + quartzJobService.changeQuartzJob(quartzJobReq); + + scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.SUCCESS); + log.info("Change schedule success,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), + req.getOperationType(), req.getScheduleChangeLogId()); + } public ScheduleEntity create(ScheduleEntity scheduleConfig) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java index d04dfcef63..392395f18c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java @@ -23,7 +23,9 @@ import com.oceanbase.odc.service.flow.task.BaseODCFlowTaskDelegate; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.schedule.ScheduleChangeLogService; import com.oceanbase.odc.service.schedule.ScheduleService; +import com.oceanbase.odc.service.schedule.model.ScheduleChangeStatus; import com.oceanbase.odc.service.task.TaskService; import lombok.extern.slf4j.Slf4j; @@ -39,6 +41,8 @@ public class AlterScheduleTask extends BaseODCFlowTaskDelegate Date: Thu, 23 Jan 2025 18:26:41 +0800 Subject: [PATCH 2/2] rsp comments --- .../odc/service/schedule/ScheduleService.java | 125 ++++++++++-------- .../schedule/flowtask/AlterScheduleTask.java | 12 +- 2 files changed, 71 insertions(+), 66 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 0d22ee6c65..96de819b19 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -51,6 +51,7 @@ import org.springframework.integration.jdbc.lock.JdbcLockRegistry; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import com.alibaba.fastjson.JSONObject; @@ -226,6 +227,8 @@ public class ScheduleService { @Autowired private ScheduleDescriptionGenerator descriptionGenerator; + @Autowired + private TransactionTemplate txTemplate; private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; @@ -431,65 +434,77 @@ private void validateTriggerConfig(TriggerConfig triggerConfig) { } } - @Transactional(rollbackFor = Exception.class) public void executeChangeSchedule(ScheduleChangeParams req) { - Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId()); - // start to change schedule - switch (req.getOperationType()) { - case CREATE: - case RESUME: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); - break; - } - case UPDATE: { - ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); - entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); - entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); - entity.setDescription(req.getUpdateScheduleReq().getDescription()); - entity.setStatus(ScheduleStatus.ENABLED); - PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); - if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { - DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); - } - if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { - DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + // start change quartzJob + boolean isSuccess = Boolean.TRUE.equals(txTemplate.execute(status -> { + Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId()); + try { + // start to change schedule + switch (req.getOperationType()) { + case CREATE: + case RESUME: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); + break; + } + case UPDATE: { + ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); + entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); + entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); + entity.setDescription(req.getUpdateScheduleReq().getDescription()); + entity.setStatus(ScheduleStatus.ENABLED); + PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); + if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { + DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + } + if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { + DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + } + targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); + break; + } + case PAUSE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); + break; + } + case TERMINATE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); + break; + } + case DELETE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); + break; + } + default: + throw new UnsupportedException(); } - targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); - break; - } - case PAUSE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); - break; - } - case TERMINATE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); - break; - } - case DELETE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); - break; + + // start change quartzJob + ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); + quartzJobReq.setOperationType(req.getOperationType()); + quartzJobReq.setJobName(targetSchedule.getId().toString()); + quartzJobReq.setJobGroup(targetSchedule.getType().name()); + quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); + quartzJobService.changeQuartzJob(quartzJobReq); + return true; + } catch (Exception e) { + log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), + req.getOperationType(), req.getScheduleChangeLogId(), e); + status.setRollbackOnly(); + return false; } - default: - throw new UnsupportedException(); - } + })); - // start change quartzJob - ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); - quartzJobReq.setOperationType(req.getOperationType()); - quartzJobReq.setJobName(targetSchedule.getId().toString()); - quartzJobReq.setJobGroup(targetSchedule.getType().name()); - quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); - quartzJobService.changeQuartzJob(quartzJobReq); - - scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.SUCCESS); - log.info("Change schedule success,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), - req.getOperationType(), req.getScheduleChangeLogId()); + scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), + isSuccess ? ScheduleChangeStatus.SUCCESS : ScheduleChangeStatus.FAILED); + log.info("Change schedule completed,scheduleId={},operationType={},changelogId={},status={}", + req.getScheduleId(), + req.getOperationType(), req.getScheduleChangeLogId(), isSuccess ? "SUCCESS" : "FAILED"); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java index 392395f18c..d04dfcef63 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/AlterScheduleTask.java @@ -23,9 +23,7 @@ import com.oceanbase.odc.service.flow.task.BaseODCFlowTaskDelegate; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; -import com.oceanbase.odc.service.schedule.ScheduleChangeLogService; import com.oceanbase.odc.service.schedule.ScheduleService; -import com.oceanbase.odc.service.schedule.model.ScheduleChangeStatus; import com.oceanbase.odc.service.task.TaskService; import lombok.extern.slf4j.Slf4j; @@ -41,8 +39,6 @@ public class AlterScheduleTask extends BaseODCFlowTaskDelegate