From ef2f09061732f1a5ef84adcf4cdfb38e944126d5 Mon Sep 17 00:00:00 2001 From: guowl3 Date: Thu, 23 Jan 2025 18:37:06 +0800 Subject: [PATCH] fix(changelog): cannot rollback update changelog status (#4198) * revert commit and catch exception in alterScheduleTask * rsp comments --- .../odc/service/schedule/ScheduleService.java | 125 ++++++++++-------- 1 file changed, 67 insertions(+), 58 deletions(-) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 84db4be8d2..96de819b19 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -51,6 +51,7 @@ import org.springframework.integration.jdbc.lock.JdbcLockRegistry; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import com.alibaba.fastjson.JSONObject; @@ -226,6 +227,8 @@ public class ScheduleService { @Autowired private ScheduleDescriptionGenerator descriptionGenerator; + @Autowired + private TransactionTemplate txTemplate; private final ScheduleMapper scheduleMapper = ScheduleMapper.INSTANCE; @@ -431,71 +434,77 @@ private void validateTriggerConfig(TriggerConfig triggerConfig) { } } - @Transactional(rollbackFor = Exception.class) public void executeChangeSchedule(ScheduleChangeParams req) { - try { + // start change quartzJob + boolean isSuccess = Boolean.TRUE.equals(txTemplate.execute(status -> { Schedule targetSchedule = nullSafeGetModelById(req.getScheduleId()); - // start to change schedule - switch (req.getOperationType()) { - case CREATE: - case RESUME: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); - break; - } - case UPDATE: { - ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); - entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); - entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); - entity.setDescription(req.getUpdateScheduleReq().getDescription()); - entity.setStatus(ScheduleStatus.ENABLED); - PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); - if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { - DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + try { + // start to change schedule + switch (req.getOperationType()) { + case CREATE: + case RESUME: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.ENABLED); + break; } - if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { - DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() - .getParameters(); - parameters.getRateLimit().setOrderId(req.getScheduleId()); - dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + case UPDATE: { + ScheduleEntity entity = nullSafeGetById(req.getScheduleId()); + entity.setJobParametersJson(JsonUtils.toJson(req.getUpdateScheduleReq().getParameters())); + entity.setTriggerConfigJson(JsonUtils.toJson(req.getUpdateScheduleReq().getTriggerConfig())); + entity.setDescription(req.getUpdateScheduleReq().getDescription()); + entity.setStatus(ScheduleStatus.ENABLED); + PreConditions.notNull(req.getUpdateScheduleReq(), "req.updateScheduleReq"); + if (req.getUpdateScheduleReq().getParameters() instanceof DataArchiveParameters) { + DataArchiveParameters parameters = (DataArchiveParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + } + if (req.getUpdateScheduleReq().getParameters() instanceof DataDeleteParameters) { + DataDeleteParameters parameters = (DataDeleteParameters) req.getUpdateScheduleReq() + .getParameters(); + parameters.getRateLimit().setOrderId(req.getScheduleId()); + dlmLimiterService.updateByOrderId(req.getScheduleId(), parameters.getRateLimit()); + } + targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); + break; } - targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); - break; - } - case PAUSE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); - break; - } - case TERMINATE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); - break; - } - case DELETE: { - scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); - break; + case PAUSE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.PAUSE); + break; + } + case TERMINATE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.TERMINATED); + break; + } + case DELETE: { + scheduleRepository.updateStatusById(targetSchedule.getId(), ScheduleStatus.DELETED); + break; + } + default: + throw new UnsupportedException(); } - default: - throw new UnsupportedException(); + + // start change quartzJob + ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); + quartzJobReq.setOperationType(req.getOperationType()); + quartzJobReq.setJobName(targetSchedule.getId().toString()); + quartzJobReq.setJobGroup(targetSchedule.getType().name()); + quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); + quartzJobService.changeQuartzJob(quartzJobReq); + return true; + } catch (Exception e) { + log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), + req.getOperationType(), req.getScheduleChangeLogId(), e); + status.setRollbackOnly(); + return false; } + })); - // start change quartzJob - ChangeQuartJobParam quartzJobReq = new ChangeQuartJobParam(); - quartzJobReq.setOperationType(req.getOperationType()); - quartzJobReq.setJobName(targetSchedule.getId().toString()); - quartzJobReq.setJobGroup(targetSchedule.getType().name()); - quartzJobReq.setTriggerConfig(targetSchedule.getTriggerConfig()); - quartzJobService.changeQuartzJob(quartzJobReq); - scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.SUCCESS); - log.info("Change schedule success,scheduleId={},operationType={},changelogId={}", targetSchedule.getId(), - req.getOperationType(), req.getScheduleChangeLogId()); - } catch (Exception e) { - log.warn("Change schedule failed,scheduleId={},operationType={},changelogId={}", req.getScheduleId(), - req.getOperationType(), req.getScheduleChangeLogId(), e); - scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), ScheduleChangeStatus.FAILED); - throw e; - } + scheduleChangeLogService.updateStatusById(req.getScheduleChangeLogId(), + isSuccess ? ScheduleChangeStatus.SUCCESS : ScheduleChangeStatus.FAILED); + log.info("Change schedule completed,scheduleId={},operationType={},changelogId={},status={}", + req.getScheduleId(), + req.getOperationType(), req.getScheduleChangeLogId(), isSuccess ? "SUCCESS" : "FAILED"); }