diff --git a/src/backend/commons/common-k8s/src/main/java/com/tencent/bk/job/common/k8s/availability/JobApplicationAvailabilityBean.java b/src/backend/commons/common-k8s/src/main/java/com/tencent/bk/job/common/k8s/availability/JobApplicationAvailabilityBean.java index b4e680bbe3..c73e0fd216 100644 --- a/src/backend/commons/common-k8s/src/main/java/com/tencent/bk/job/common/k8s/availability/JobApplicationAvailabilityBean.java +++ b/src/backend/commons/common-k8s/src/main/java/com/tencent/bk/job/common/k8s/availability/JobApplicationAvailabilityBean.java @@ -12,8 +12,8 @@ public class JobApplicationAvailabilityBean extends ApplicationAvailabilityBean public void onApplicationEvent(AvailabilityChangeEvent event) { super.onApplicationEvent(event); if (ReadinessState.REFUSING_TRAFFIC == event.getState()) { - // SpringCloud负载均衡缓存默认为35s,等待调用方缓存刷新后再真正关闭Spring容器 - int waitSeconds = 40; + // SpringCloud负载均衡缓存设置为20s,等待调用方缓存刷新后再真正关闭Spring容器 + int waitSeconds = 30; while (waitSeconds > 0) { ThreadUtils.sleep(1000); log.info("wait for GracefulShutdown, {}s left", waitSeconds--); diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java index 4587ac73eb..cb6d818a22 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/config/FilterConfig.java @@ -25,25 +25,50 @@ package com.tencent.bk.job.common.web.config; import com.tencent.bk.job.common.web.filter.RepeatableReadWriteServletRequestResponseFilter; +import com.tencent.bk.job.common.web.filter.WebRepeatableReadServletRequestFilter; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FilterConfig { + + /** + * 给/esb/api/*, /service/* 用的过滤器,包装request和response + * + */ @Bean public FilterRegistrationBean repeatableRSRRFilterRegister() { FilterRegistrationBean registration = new FilterRegistrationBean<>(); registration.setFilter(repeatableRRRFilter()); - registration.addUrlPatterns("/esb/api/*", "/service/*", "/web/*"); + registration.addUrlPatterns("/esb/api/*", "/service/*"); registration.setName("repeatableReadRequestResponseFilter"); registration.setOrder(0); return registration; } + /** + * 给/web/* 用的过滤器,仅包装request + * + */ + @Bean + public FilterRegistrationBean webRepeatableRRFilterRegister() { + FilterRegistrationBean registration = new FilterRegistrationBean<>(); + registration.setFilter(webRepeatableReadRequestFilter()); + registration.addUrlPatterns("/web/*"); + registration.setName("webRepeatableReadRequestFilter"); + registration.setOrder(1); + return registration; + } + @Bean(name = "repeatableReadRequestResponseFilter") public RepeatableReadWriteServletRequestResponseFilter repeatableRRRFilter() { return new RepeatableReadWriteServletRequestResponseFilter(); } + + @Bean(name = "webRepeatableReadRequestFilter") + public WebRepeatableReadServletRequestFilter webRepeatableReadRequestFilter() { + return new WebRepeatableReadServletRequestFilter(); + } } diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/RepeatableReadWriteServletRequestResponseFilter.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/RepeatableReadWriteServletRequestResponseFilter.java index cce93c8017..445dbd9707 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/RepeatableReadWriteServletRequestResponseFilter.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/RepeatableReadWriteServletRequestResponseFilter.java @@ -24,10 +24,10 @@ package com.tencent.bk.job.common.web.filter; +import com.tencent.bk.job.common.web.utils.ServletUtil; import com.tencent.bk.job.common.web.model.RepeatableReadHttpServletResponse; import com.tencent.bk.job.common.web.model.RepeatableReadWriteHttpServletRequest; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import javax.servlet.Filter; import javax.servlet.FilterChain; @@ -49,35 +49,15 @@ public void init(FilterConfig filterConfig) { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - ServletRequest servletRequest = request; - ServletResponse servletResponse = response; - if (isJsonRequest(request)) { - // 仅处理 ContentType: application/json 请求 - servletRequest = new RepeatableReadWriteHttpServletRequest((HttpServletRequest) request); - } - if (isJsonResponse(response)) { - // 仅处理 ContentType: application/json 响应 - servletResponse = new RepeatableReadHttpServletResponse((HttpServletResponse) response); + if (!ServletUtil.isJsonRequest(request)) { + chain.doFilter(request, response); + return; } + ServletRequest servletRequest = new RepeatableReadWriteHttpServletRequest((HttpServletRequest) request); + ServletResponse servletResponse = new RepeatableReadHttpServletResponse((HttpServletResponse) response); chain.doFilter(servletRequest, servletResponse); } - private boolean isJsonRequest(ServletRequest request) { - return isJsonContentType(request.getContentType()); - } - - private boolean isJsonResponse(ServletResponse response) { - return isJsonContentType(response.getContentType()); - } - - private boolean isJsonContentType(String contentType) { - if (StringUtils.isBlank(contentType)) { - return false; - } - contentType = contentType.trim().toLowerCase(); - return contentType.startsWith("application/json"); - } - @Override public void destroy() { diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/WebRepeatableReadServletRequestFilter.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/WebRepeatableReadServletRequestFilter.java new file mode 100644 index 0000000000..0824a597c2 --- /dev/null +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/filter/WebRepeatableReadServletRequestFilter.java @@ -0,0 +1,68 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.web.filter; + +import com.tencent.bk.job.common.web.utils.ServletUtil; +import com.tencent.bk.job.common.web.model.RepeatableReadWriteHttpServletRequest; +import lombok.extern.slf4j.Slf4j; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; + +@Slf4j +public class WebRepeatableReadServletRequestFilter implements Filter { + @Override + public void init(FilterConfig filterConfig) throws ServletException { + // do nothing + } + + /** + * 仅包装ServletRequest,给/web使用 + * + */ + @Override + public void doFilter(ServletRequest request, ServletResponse response, + FilterChain chain) throws IOException, ServletException { + ServletRequest servletRequest = request; + + if (ServletUtil.isJsonRequest(request)) { + // 仅处理 ContentType: application/json 请求 + servletRequest = new RepeatableReadWriteHttpServletRequest((HttpServletRequest) request); + } + + chain.doFilter(servletRequest, response); + } + + @Override + public void destroy() { + // do nothing + } +} diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/utils/ServletUtil.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/utils/ServletUtil.java new file mode 100644 index 0000000000..7044b33df1 --- /dev/null +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/utils/ServletUtil.java @@ -0,0 +1,53 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.web.utils; + +import org.apache.commons.lang3.StringUtils; + +import javax.servlet.ServletRequest; + + +/** + * 处理servlet请求和响应的工具类 + */ +public class ServletUtil { + + /** + * 判断 servlet 请求的 Content-Type 是否是 application/json + * @param request 请求 + * @return 请求的 Content-Type 是否是 application/json + */ + public static boolean isJsonRequest(ServletRequest request) { + return isJsonContentType(request.getContentType()); + } + + private static boolean isJsonContentType(String contentType) { + if (StringUtils.isBlank(contentType)) { + return false; + } + contentType = contentType.trim().toLowerCase(); + return contentType.startsWith("application/json"); + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java index a7013460a5..a71ad0f346 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java @@ -72,7 +72,7 @@ PageData listPageCronJobsWithoutVarsByCondition(CronJobInfoDTO c * @param cronJobIdList 定时任务 IDs * @return 定时任务信息 */ - List getCronJobByIds(List cronJobIdList); + List listCronJobByIds(List cronJobIdList); /** * 根据定时任务 ID 查询定时任务信息 diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java index 2325d7994d..b7fdf0d9a7 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java @@ -232,7 +232,7 @@ public CronJobInfoDTO getCronJobById(long cronJobId) { } @Override - public List getCronJobByIds(List cronJobIdList) { + public List listCronJobByIds(List cronJobIdList) { List conditions = new ArrayList<>(); conditions.add(TABLE.ID.in(cronJobIdList.stream().map(ULong::valueOf).collect(Collectors.toList()))); conditions.add(TABLE.IS_DELETED.equal(UByte.valueOf(0))); diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java index bc218ad1f8..05995fcb2e 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java @@ -28,6 +28,7 @@ import com.tencent.bk.job.crontab.listener.event.CrontabEvent; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.service.CronJobService; +import com.tencent.bk.job.crontab.service.QuartzService; import lombok.extern.slf4j.Slf4j; import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; @@ -41,10 +42,12 @@ public class CrontabEventListener { private final CronJobService cronJobService; + private final QuartzService quartzService; @Autowired - public CrontabEventListener(CronJobService cronJobService) { + public CrontabEventListener(CronJobService cronJobService, QuartzService quartzService) { this.cronJobService = cronJobService; + this.quartzService = quartzService; } @@ -86,7 +89,7 @@ private void refreshCronJobInQuartz(CronJobInfoDTO cronJobInfoDTO) { } if (cronJobInfoDTO.getEnable()) { // 开启定时任务 - boolean result = cronJobService.addJobToQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); + boolean result = cronJobService.checkAndAddJobToQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); log.info( "add cronJob({},{}) to quartz, result={}", cronJobInfoDTO.getAppId(), @@ -95,7 +98,7 @@ private void refreshCronJobInQuartz(CronJobInfoDTO cronJobInfoDTO) { ); } else { // 关闭定时任务 - boolean result = cronJobService.deleteJobFromQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); + boolean result = quartzService.deleteJobFromQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); log.info( "delete cronJob({},{}) from quartz, result={}", cronJobInfoDTO.getAppId(), @@ -107,7 +110,7 @@ private void refreshCronJobInQuartz(CronJobInfoDTO cronJobInfoDTO) { private void deleteCronJobFromQuartz(long appId, long cronJobId) { // 删除定时任务 - boolean result = cronJobService.deleteJobFromQuartz(appId, cronJobId); + boolean result = quartzService.deleteJobFromQuartz(appId, cronJobId); log.info( "delete cronJob({},{}) from quartz, result={}", appId, diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/AddJobToQuartzResult.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/AddJobToQuartzResult.java new file mode 100644 index 0000000000..4d3fb4c6f2 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/AddJobToQuartzResult.java @@ -0,0 +1,97 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.model.dto; + +import lombok.Data; + +/** + * 添加任务到Quartz的结果 + */ +@Data +public class AddJobToQuartzResult { + /** + * 定时任务基本信息 + */ + private CronJobBasicInfoDTO cronJobBasicInfo; + /** + * 是否成功 + */ + private boolean success; + /** + * 提示信息 + */ + private String message; + /** + * 异常信息 + */ + private Exception exception; + + /** + * 构造失败结果 + * + * @param cronJobBasicInfo 定时任务基本信息 + * @param message 提示信息 + * @return 失败结果 + */ + public static AddJobToQuartzResult failResult(CronJobBasicInfoDTO cronJobBasicInfo, String message) { + AddJobToQuartzResult result = new AddJobToQuartzResult(); + result.setCronJobBasicInfo(cronJobBasicInfo); + result.setSuccess(false); + result.setMessage(message); + return result; + } + + /** + * 构造失败结果 + * + * @param cronJobBasicInfo 定时任务基本信息 + * @param message 提示信息 + * @param exception 异常信息 + * @return 失败结果 + */ + public static AddJobToQuartzResult failResult(CronJobBasicInfoDTO cronJobBasicInfo, + String message, + Exception exception) { + AddJobToQuartzResult result = new AddJobToQuartzResult(); + result.setCronJobBasicInfo(cronJobBasicInfo); + result.setSuccess(false); + result.setMessage(message); + result.setException(exception); + return result; + } + + /** + * 构造成功结果 + * + * @param cronJobBasicInfo 定时任务基本信息 + * @return 成功结果 + */ + public static AddJobToQuartzResult successResult(CronJobBasicInfoDTO cronJobBasicInfo) { + AddJobToQuartzResult result = new AddJobToQuartzResult(); + result.setCronJobBasicInfo(cronJobBasicInfo); + result.setSuccess(true); + return result; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/BatchAddResult.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/BatchAddResult.java new file mode 100644 index 0000000000..fe51d28d2d --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/BatchAddResult.java @@ -0,0 +1,141 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.model.dto; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 定时任务批量添加到Quartz的结果 + */ +@Slf4j +@Data +public class BatchAddResult { + /** + * 添加结果Map,key为定时任务ID,value为添加结果 + */ + private Map resultMap; + /** + * 添加成功的任务数量 + */ + private int successNum = 0; + /** + * 添加失败的任务数量 + */ + private int failNum = 0; + + /** + * 添加一个结果数据至批量结果 + * + * @param result 单个结果数据 + */ + public void addResult(AddJobToQuartzResult result) { + if (result == null) { + return; + } + if (resultMap == null) { + resultMap = new HashMap<>(); + } + CronJobBasicInfoDTO cronJobBasicInfo = result.getCronJobBasicInfo(); + if (cronJobBasicInfo == null) { + log.info("cronJobBasicInfo is null, ignore"); + return; + } + resultMap.put(cronJobBasicInfo.getId(), result); + if (result.isSuccess()) { + successNum++; + } else { + failNum++; + } + } + + /** + * 合并批量结果 + * + * @param batchAddResult 待合并的批量结果 + */ + public void merge(BatchAddResult batchAddResult) { + if (batchAddResult == null) { + return; + } + if (CollectionUtils.isEmpty(batchAddResult.resultMap)) { + return; + } + if (resultMap == null) { + resultMap = new HashMap<>(batchAddResult.resultMap); + } else { + resultMap.putAll(batchAddResult.resultMap); + } + successNum += batchAddResult.successNum; + failNum += batchAddResult.failNum; + } + + /** + * 获取批量添加失败的添加结果列表 + * + * @return 批量添加失败的添加结果列表 + */ + public List getFailedResultList() { + if (failNum == 0) { + return Collections.emptyList(); + } + List failedResultList = new ArrayList<>(); + for (Map.Entry entry : resultMap.entrySet()) { + if (!entry.getValue().isSuccess()) { + failedResultList.add(entry.getValue()); + } + } + return failedResultList; + } + + /** + * 获取批量添加的总数 + * + * @return 批量添加总数 + */ + public int getTotalNum() { + return successNum + failNum; + } + + /** + * 获取批量添加失败的比率 + * + * @return 批量添加失败的比率 + */ + public float getFailRate() { + int totalNum = successNum + failNum; + if (totalNum == 0) { + return 0; + } + return 1.0f * failNum / totalNum; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobInfoDTO.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobInfoDTO.java index d96eaa8199..1cb7203885 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobInfoDTO.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobInfoDTO.java @@ -589,4 +589,12 @@ public ServiceCronJobDTO toServiceCronJobDTO() { cronJob.setLastModifyTime(lastModifyTime); return cronJob; } + + public CronJobBasicInfoDTO toBasicInfoDTO() { + CronJobBasicInfoDTO cronJob = new CronJobBasicInfoDTO(); + cronJob.setId(id); + cronJob.setAppId(appId); + cronJob.setName(name); + return cronJob; + } } diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/BatchCronJobService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/BatchCronJobService.java new file mode 100644 index 0000000000..ae20689df7 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/BatchCronJobService.java @@ -0,0 +1,56 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service; + +import com.tencent.bk.job.common.model.User; +import com.tencent.bk.job.crontab.model.BatchUpdateCronJobReq; +import com.tencent.bk.job.crontab.model.dto.BatchAddResult; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; +import com.tencent.bk.job.crontab.model.dto.NeedScheduleCronInfo; + +import java.util.List; + +public interface BatchCronJobService { + + /** + * 批量添加定时任务到Quartz + * + * @param cronJobBasicInfoList 定时任务列表 + * @return 批量添加结果 + */ + BatchAddResult batchAddJobToQuartz(List cronJobBasicInfoList); + + /** + * 批量更新定时任务 + * + * @param user 用户 + * @param appId Job业务ID + * @param batchUpdateCronJobReq 批量更新请求 + * @return 更新结果数据 + */ + NeedScheduleCronInfo batchUpdateCronJob(User user, + Long appId, + BatchUpdateCronJobReq batchUpdateCronJobReq); +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java index 131f1e6988..9b5693cce0 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java @@ -253,9 +253,7 @@ PageData listPageCronJobInfosWithoutVars(CronJobInfoDTO cronJobC Integer countCronJob(Long appId, Boolean active, Boolean cron); - boolean addJobToQuartz(long appId, long cronJobId) throws ServiceException; - - boolean deleteJobFromQuartz(long appId, long cronJobId); + boolean checkAndAddJobToQuartz(long appId, long cronJobId) throws ServiceException; List listEnabledCronBasicInfoForUpdate(int start, int limit); diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/QuartzService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/QuartzService.java new file mode 100644 index 0000000000..4b318037ec --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/QuartzService.java @@ -0,0 +1,45 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service; + +import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; + +public interface QuartzService { + /** + * 尝试将定时任务添加到Quartz中,若失败则删除 + * + * @param cronJobInfo 定时任务信息 + */ + void tryToAddJobToQuartz(CronJobInfoDTO cronJobInfo); + + /** + * 从Quartz中删除定时任务 + * + * @param appId Job业务ID + * @param cronJobId 定时任务ID + * @return 是否删除成功 + */ + boolean deleteJobFromQuartz(long appId, long cronJobId); +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobServiceImpl.java similarity index 58% rename from src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobService.java rename to src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobServiceImpl.java index 4cb6dc85e3..ba98587f77 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobService.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/BatchCronJobServiceImpl.java @@ -33,18 +33,25 @@ import com.tencent.bk.job.crontab.exception.TaskExecuteAuthFailedException; import com.tencent.bk.job.crontab.model.BatchUpdateCronJobReq; import com.tencent.bk.job.crontab.model.CronJobCreateUpdateReq; +import com.tencent.bk.job.crontab.model.dto.AddJobToQuartzResult; +import com.tencent.bk.job.crontab.model.dto.BatchAddResult; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobVariableDTO; import com.tencent.bk.job.crontab.model.dto.NeedScheduleCronInfo; +import com.tencent.bk.job.crontab.service.BatchCronJobService; import com.tencent.bk.job.crontab.service.ExecuteTaskService; +import com.tencent.bk.job.crontab.service.QuartzService; import com.tencent.bk.job.execute.model.inner.ServiceTaskVariable; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -52,19 +59,114 @@ */ @Slf4j @Service -public class BatchCronJobService { +public class BatchCronJobServiceImpl implements BatchCronJobService { private final CronJobDAO cronJobDAO; private final CronAuthService cronAuthService; private final ExecuteTaskService executeTaskService; + private final QuartzService quartzService; @Autowired - public BatchCronJobService(CronJobDAO cronJobDAO, - CronAuthService cronAuthService, - ExecuteTaskService executeTaskService) { + public BatchCronJobServiceImpl(CronJobDAO cronJobDAO, + CronAuthService cronAuthService, + ExecuteTaskService executeTaskService, QuartzService quartzService) { this.cronJobDAO = cronJobDAO; this.cronAuthService = cronAuthService; this.executeTaskService = executeTaskService; + this.quartzService = quartzService; + } + + /** + * 批量添加定时任务到Quartz + * + * @param cronJobBasicInfoList 定时任务列表 + * @return 批量添加结果 + */ + @Override + public BatchAddResult batchAddJobToQuartz(List cronJobBasicInfoList) { + List cronJobIdList = cronJobBasicInfoList.stream() + .map(CronJobBasicInfoDTO::getId) + .distinct() + .collect(Collectors.toList()); + // 1.批量获取定时任务信息 + List cronJobList = cronJobDAO.listCronJobByIds(cronJobIdList); + Map cronJobMap = cronJobList.stream() + .collect( + Collectors.toMap( + CronJobInfoDTO::getId, + cronJobInfoDTO -> cronJobInfoDTO + ) + ); + BatchAddResult finalBatchResult = new BatchAddResult(); + BatchAddResult notFoundBatchResult = recordNotFoundCronJobs( + cronJobBasicInfoList, + cronJobMap + ); + finalBatchResult.merge(notFoundBatchResult); + // 2.过滤出开启的定时任务进行触发 + BatchAddResult addToQuartzBatchResult = addEnabledCronJobToQuartz(cronJobList); + finalBatchResult.merge(addToQuartzBatchResult); + return finalBatchResult; + } + + /** + * 批量添加开启的定时任务到Quartz + * + * @param cronJobList 定时任务列表 + * @return 批量添加结果 + */ + private BatchAddResult addEnabledCronJobToQuartz(List cronJobList) { + BatchAddResult batchAddResult = new BatchAddResult(); + for (CronJobInfoDTO cronJobInfoDTO : cronJobList) { + if (!cronJobInfoDTO.getEnable()) { + batchAddResult.addResult(AddJobToQuartzResult.failResult( + cronJobInfoDTO.toBasicInfoDTO(), + String.format("CronJob(id=%s) is not enabled, ignore", cronJobInfoDTO.getId()) + )); + continue; + } + try { + quartzService.tryToAddJobToQuartz(cronJobInfoDTO); + batchAddResult.addResult(AddJobToQuartzResult.successResult(cronJobInfoDTO.toBasicInfoDTO())); + } catch (Exception e) { + String message = MessageFormatter.format( + "Add cronJob(id={}) to quartz failed", + cronJobInfoDTO.getId() + ).getMessage(); + batchAddResult.addResult( + AddJobToQuartzResult.failResult( + cronJobInfoDTO.toBasicInfoDTO(), + message, + e + ) + ); + } + } + return batchAddResult; + } + + /** + * 记录DB中不存在的定时任务 + * + * @param cronJobBasicInfoList 需要添加到Quartz的定时任务基础信息列表 + * @param cronJobMap DB中存在的定时任务信息 + * @return 批量添加失败的定时任务信息 + */ + private BatchAddResult recordNotFoundCronJobs(List cronJobBasicInfoList, + Map cronJobMap) { + BatchAddResult batchAddResult = new BatchAddResult(); + for (CronJobBasicInfoDTO cronJobBasicInfoDTO : cronJobBasicInfoList) { + Long cronJobId = cronJobBasicInfoDTO.getId(); + if (!cronJobMap.containsKey(cronJobId)) { + batchAddResult.addResult( + AddJobToQuartzResult.failResult( + cronJobBasicInfoDTO, + "Cannot find cronJob in DB by id: " + cronJobId + ) + ); + } + } + return batchAddResult; } /** @@ -89,9 +191,9 @@ public NeedScheduleCronInfo batchUpdateCronJob(User user, List needAddCronIdList = new ArrayList<>(); List needDeleteCronIdList = new ArrayList<>(); - cronJobReqList.forEach(cronJobReq -> { - updateCronJob(user, appId, cronJobReq, needAddCronIdList, needDeleteCronIdList); - }); + cronJobReqList.forEach(cronJobReq -> + updateCronJob(user, appId, cronJobReq, needAddCronIdList, needDeleteCronIdList) + ); return new NeedScheduleCronInfo(needAddCronIdList, needDeleteCronIdList); } diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java index 23d60d9253..20e10c2ab1 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java @@ -25,7 +25,10 @@ package com.tencent.bk.job.crontab.service.impl; import com.tencent.bk.job.common.mysql.JobTransactional; +import com.tencent.bk.job.crontab.model.dto.AddJobToQuartzResult; +import com.tencent.bk.job.crontab.model.dto.BatchAddResult; import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; +import com.tencent.bk.job.crontab.service.BatchCronJobService; import com.tencent.bk.job.crontab.service.CronJobBatchLoadService; import com.tencent.bk.job.crontab.service.CronJobService; import lombok.extern.slf4j.Slf4j; @@ -41,60 +44,74 @@ public class CronJobBatchLoadServiceImpl implements CronJobBatchLoadService { private final CronJobService cronJobService; + private final BatchCronJobService batchCronJobService; @Autowired - public CronJobBatchLoadServiceImpl(CronJobService cronJobService) { + public CronJobBatchLoadServiceImpl(CronJobService cronJobService, BatchCronJobService batchCronJobService) { this.cronJobService = cronJobService; + this.batchCronJobService = batchCronJobService; } @Override @JobTransactional(transactionManager = "jobCrontabTransactionManager", timeout = 30) public CronLoadResult batchLoadCronToQuartz(int start, int limit) throws InterruptedException { - int successNum = 0; - int failedNum = 0; - List failedCronList = new ArrayList<>(); + checkInterrupt(); List cronJobBasicInfoList = cronJobService.listEnabledCronBasicInfoForUpdate(start, limit); - for (CronJobBasicInfoDTO cronJobBasicInfoDTO : cronJobBasicInfoList) { - checkInterrupt(); - boolean result = false; - try { - result = cronJobService.addJobToQuartz( - cronJobBasicInfoDTO.getAppId(), - cronJobBasicInfoDTO.getId() - ); - if (result) { - successNum += 1; - } else { - failedNum += 1; - failedCronList.add(cronJobBasicInfoDTO); - } - } catch (Exception e) { - failedNum += 1; - failedCronList.add(cronJobBasicInfoDTO); - String message = MessageFormatter.format( - "Fail to addJobToQuartz, cronJob={}", - cronJobBasicInfoDTO - ).getMessage(); - log.warn(message, e); - } - if (log.isDebugEnabled()) { - log.debug( - "load cronJob({},{},{}), result={}", - cronJobBasicInfoDTO.getAppId(), - cronJobBasicInfoDTO.getId(), - cronJobBasicInfoDTO.getName(), - result - ); - } - } + BatchAddResult batchAddResult = batchCronJobService.batchAddJobToQuartz(cronJobBasicInfoList); + List failedCronList = extractFailedCronList(batchAddResult); CronLoadResult cronLoadResult = new CronLoadResult(); cronLoadResult.setFetchNum(cronJobBasicInfoList.size()); - cronLoadResult.setSuccessNum(successNum); - cronLoadResult.setFailedNum(failedNum); + cronLoadResult.setSuccessNum(batchAddResult.getSuccessNum()); + cronLoadResult.setFailedNum(batchAddResult.getFailNum()); cronLoadResult.setFailedCronList(failedCronList); return cronLoadResult; } + /** + * 从批量添加定时任务结果数据中提取失败的定时任务信息 + * + * @param batchAddResult 批量添加定时任务结果 + * @return 失败的定时任务信息 + */ + private List extractFailedCronList(BatchAddResult batchAddResult) { + if (batchAddResult == null || batchAddResult.getTotalNum() == 0) { + return new ArrayList<>(); + } + List failedCronList = new ArrayList<>(); + int successNum = batchAddResult.getSuccessNum(); + int failedNum = batchAddResult.getFailNum(); + if (failedNum > 0) { + String message = MessageFormatter.format( + "batchAddJobToQuartz result: {} failed, {} success", + failedNum, + successNum + ).getMessage(); + if (batchAddResult.getFailRate() > 0.5) { + log.error(message); + } else { + log.warn(message); + } + List failedResultList = batchAddResult.getFailedResultList(); + for (AddJobToQuartzResult addJobToQuartzResult : failedResultList) { + CronJobBasicInfoDTO cronJobBasicInfo = addJobToQuartzResult.getCronJobBasicInfo(); + failedCronList.add(cronJobBasicInfo); + message = MessageFormatter.arrayFormat( + "Fail to load cronJob({},{},{}), reason={}", + new Object[]{ + cronJobBasicInfo.getAppId(), + cronJobBasicInfo.getId(), + cronJobBasicInfo.getName(), + addJobToQuartzResult.getMessage() + } + ).getMessage(); + log.warn(message, addJobToQuartzResult.getException()); + } + } else { + log.info("batchAddJobToQuartz result: All success, num={}", successNum); + } + return failedCronList; + } + private void checkInterrupt() throws InterruptedException { if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("batchLoadCronToQuartz thread is interrupted, exit"); diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java index e0cf7b5b12..036087e036 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java @@ -29,6 +29,7 @@ import com.tencent.bk.job.crontab.service.CronJobLoadingService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.quartz.Scheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -40,11 +41,13 @@ public class CronJobLoadingServiceImpl implements CronJobLoadingService { private final CronJobBatchLoadService cronJobBatchLoadService; + private final Scheduler scheduler; private volatile boolean loadingCronToQuartz = false; @Autowired - public CronJobLoadingServiceImpl(CronJobBatchLoadService cronJobBatchLoadService) { + public CronJobLoadingServiceImpl(CronJobBatchLoadService cronJobBatchLoadService, Scheduler scheduler) { this.cronJobBatchLoadService = cronJobBatchLoadService; + this.scheduler = scheduler; } @Override @@ -56,6 +59,7 @@ public void loadAllCronJob() { return; } loadingCronToQuartz = true; + waitUtilQuartzStarted(); loadAllCronJobToQuartz(); } catch (InterruptedException e) { log.info("loadAllCronJob interrupted, application may be closing"); @@ -67,6 +71,14 @@ public void loadAllCronJob() { } } + private void waitUtilQuartzStarted() throws Exception { + while (!scheduler.isStarted()) { + log.info("Quartz Scheduler is not started, sleep 1s and retry"); + Thread.sleep(1000); + } + log.info("Quartz Scheduler is started now"); + } + private void loadAllCronJobToQuartz() throws InterruptedException { int start = 0; int limit = 100; diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java index 06483e6d72..3c23753f36 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java @@ -63,9 +63,11 @@ import com.tencent.bk.job.crontab.model.inner.ServiceInnerCronJobInfoDTO; import com.tencent.bk.job.crontab.model.inner.request.ServiceAddInnerCronJobRequestDTO; import com.tencent.bk.job.crontab.mq.CrontabMQEventDispatcher; +import com.tencent.bk.job.crontab.service.BatchCronJobService; import com.tencent.bk.job.crontab.service.CronJobService; import com.tencent.bk.job.crontab.service.ExecuteTaskService; import com.tencent.bk.job.crontab.service.HostService; +import com.tencent.bk.job.crontab.service.QuartzService; import com.tencent.bk.job.crontab.service.TaskPlanService; import com.tencent.bk.job.crontab.timer.AbstractQuartzTaskHandler; import com.tencent.bk.job.crontab.timer.QuartzJob; @@ -73,8 +75,6 @@ import com.tencent.bk.job.crontab.timer.QuartzTrigger; import com.tencent.bk.job.crontab.timer.QuartzTriggerBuilder; import com.tencent.bk.job.crontab.timer.executor.InnerJobExecutor; -import com.tencent.bk.job.crontab.timer.executor.NotifyJobExecutor; -import com.tencent.bk.job.crontab.timer.executor.SimpleJobExecutor; import com.tencent.bk.job.execute.model.inner.ServiceTaskVariable; import com.tencent.bk.job.manage.model.inner.ServiceTaskPlanDTO; import lombok.extern.slf4j.Slf4j; @@ -110,6 +110,7 @@ public class CronJobServiceImpl implements CronJobService { private final CronJobDAO cronJobDAO; private final AbstractQuartzTaskHandler quartzTaskHandler; + private final QuartzService quartzService; private final TaskPlanService taskPlanService; private final CronAuthService cronAuthService; private final ExecuteTaskService executeTaskService; @@ -120,14 +121,16 @@ public class CronJobServiceImpl implements CronJobService { @Autowired public CronJobServiceImpl(CronJobDAO cronJobDAO, AbstractQuartzTaskHandler quartzTaskHandler, + QuartzService quartzService, TaskPlanService taskPlanService, CronAuthService cronAuthService, ExecuteTaskService executeTaskService, HostService hostService, CrontabMQEventDispatcher crontabMQEventDispatcher, - BatchCronJobService batchCronJobService) { + BatchCronJobServiceImpl batchCronJobService) { this.cronJobDAO = cronJobDAO; this.quartzTaskHandler = quartzTaskHandler; + this.quartzService = quartzService; this.taskPlanService = taskPlanService; this.cronAuthService = cronAuthService; this.executeTaskService = executeTaskService; @@ -136,18 +139,6 @@ public CronJobServiceImpl(CronJobDAO cronJobDAO, this.batchCronJobService = batchCronJobService; } - private static String getJobName(long appId, long cronJobId) { - return "job_" + cronJobId; - } - - private static String getJobGroup(long appId, long cronJobId) { - return "bk_app_" + appId; - } - - private static String getNotifyJobName(long appId, long cronJobId) { - return getJobName(appId, cronJobId) + "_notify"; - } - @Override public PageData listPageCronJobInfosWithoutVars(CronJobInfoDTO cronJobCondition, BaseSearchCondition baseSearchCondition) { @@ -161,7 +152,7 @@ public CronJobInfoDTO getCronJobInfoById(Long cronJobId) { @Override public Map getCronJobInfoMapByIds(List cronJobIdList) { - List cronJobInfoDTOList = cronJobDAO.getCronJobByIds(cronJobIdList); + List cronJobInfoDTOList = cronJobDAO.listCronJobByIds(cronJobIdList); Map map = new HashMap<>(); for (CronJobInfoDTO cronJobInfoDTO : cronJobInfoDTOList) { map.put(cronJobInfoDTO.getId(), cronJobInfoDTO); @@ -728,134 +719,20 @@ private boolean informAllToDeleteJobFromQuartz(long appId, long cronJobId) { } @Override - public boolean addJobToQuartz(long appId, long cronJobId) throws ServiceException { + public boolean checkAndAddJobToQuartz(long appId, long cronJobId) throws ServiceException { if (appId <= 0 || cronJobId <= 0) { return false; } - try { - CronJobInfoDTO cronJobInfo = getCronJobInfoById(appId, cronJobId); - if (StringUtils.isBlank(cronJobInfo.getCronExpression()) - && cronJobInfo.getExecuteTime() < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.CRON_JOB_TIME_PASSED); - } - checkCronRelatedPlan(cronJobInfo.getAppId(), cronJobInfo.getTaskPlanId()); - QuartzTrigger trigger = null; - if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { - QuartzTriggerBuilder cronTriggerBuilder = - QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.CRON) - .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .withCronExpression(cronJobInfo.getCronExpression()) - .withMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING); - if (cronJobInfo.getEndTime() > 0) { - if (cronJobInfo.getEndTime() < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); - } else { - cronTriggerBuilder = - cronTriggerBuilder.endAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getEndTime()))); - } - } - trigger = cronTriggerBuilder.build(); - } else if (cronJobInfo.getExecuteTime() > DateUtils.currentTimeSeconds()) { - trigger = QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.SIMPLE) - .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .startAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getExecuteTime()))).withRepeatCount(0) - .withIntervalInHours(1) - .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) - .build(); - } - if (trigger == null) { - throw new InvalidParamException(ErrorCode.ILLEGAL_PARAM); - } - - QuartzJob job = - QuartzJobBuilder.newJob().withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .forJob(SimpleJobExecutor.class) - .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) - .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) - .withTrigger(trigger) - .build(); - - try { - quartzTaskHandler - .deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler.addJob(job); - } catch (SchedulerException e) { - log.error("Error while add job to quartz!", e); - throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); - } - - if (cronJobInfo.getNotifyOffset() > 0) { - long notifyTime = 0L; - if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { - if (cronJobInfo.getEndTime() > 0) { - notifyTime = cronJobInfo.getEndTime() - cronJobInfo.getNotifyOffset(); - } - } else { - notifyTime = cronJobInfo.getExecuteTime() - cronJobInfo.getNotifyOffset(); - } - if (notifyTime < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); - } - - QuartzTrigger notifyTrigger = QuartzTriggerBuilder.newTrigger() - .ofType(QuartzTrigger.TriggerType.SIMPLE) - .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .startAt(Date.from(Instant.ofEpochSecond(notifyTime))).withRepeatCount(0).withIntervalInHours(1) - .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) - .build(); - - QuartzJob notifyJob = QuartzJobBuilder.newJob() - .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .forJob(NotifyJobExecutor.class) - .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) - .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) - .withTrigger(notifyTrigger) - .build(); - - try { - quartzTaskHandler.deleteJob( - JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler.addJob(notifyJob); - } catch (SchedulerException e) { - log.error("Error while add job to quartz!", e); - throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); - } - } else { - try { - quartzTaskHandler.deleteJob( - JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - } catch (SchedulerException e) { - log.error("Error while add job to quartz!", e); - throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); - } - } - return true; - } catch (ServiceException e) { - deleteJobFromQuartz(appId, cronJobId); - log.debug("Error while schedule job", e); - throw e; - } catch (Exception e) { - deleteJobFromQuartz(appId, cronJobId); - log.error("Unknown exception while process cron status change!", e); - throw new InternalException(ErrorCode.UPDATE_CRON_JOB_FAILED); + CronJobInfoDTO cronJobInfo = getCronJobInfoById(appId, cronJobId); + if (StringUtils.isBlank(cronJobInfo.getCronExpression()) + && cronJobInfo.getExecuteTime() < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.CRON_JOB_TIME_PASSED); } + checkCronRelatedPlan(cronJobInfo.getAppId(), cronJobInfo.getTaskPlanId()); + quartzService.tryToAddJobToQuartz(cronJobInfo); + return true; } - @Override - public boolean deleteJobFromQuartz(long appId, long cronJobId) { - if (appId <= 0 || cronJobId <= 0) { - return false; - } - try { - quartzTaskHandler.deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler - .deleteJob(JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - return true; - } catch (SchedulerException e) { - log.error("Error while delete job!", e); - } - return false; - } @Override public List listEnabledCronBasicInfoForUpdate(int start, int limit) { @@ -868,28 +745,29 @@ public boolean disableCronJobByAppId(Long appId) { cronJobInfoDTO.setAppId(appId); cronJobInfoDTO.setEnable(true); List cronJobIdList = cronJobDAO.listCronJobIds(cronJobInfoDTO); + if (CollectionUtils.isEmpty(cronJobIdList)) { + return true; + } List failedCronJobIds = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(cronJobIdList)) { - log.info("cron job will be disabled, appId:{}, cronJobIds:{}", appId, cronJobIdList); - for (Long cronJobId : cronJobIdList) { - try { - // TODO:tenant 需要修改实现,不能只传入系统用户 ID -// Boolean disableResult = changeCronJobEnableStatus(JobConstants.DEFAULT_SYSTEM_USER_ADMIN, appId, -// cronJobId, false); - Boolean disableResult = changeCronJobEnableStatus(null, appId, - cronJobId, false); - log.debug("disable cron job, result:{}, appId:{}, cronId:{}", disableResult, appId, cronJobId); - if (!disableResult) { - failedCronJobIds.add(cronJobId); - } - } catch (Exception e) { - log.error("Failed to disable cron job with appId:{} and cronId:{}", appId, cronJobId, e); + log.info("cron job will be disabled, appId:{}, cronJobIds:{}", appId, cronJobIdList); + for (Long cronJobId : cronJobIdList) { + try { + // TODO:tenant 需要修改实现,不能只传入系统用户 ID +// Boolean disableResult = changeCronJobEnableStatus(JobConstants.DEFAULT_SYSTEM_USER_ADMIN, appId, +// cronJobId, false); + Boolean disableResult = changeCronJobEnableStatus(null, appId, + cronJobId, false); + log.debug("disable cron job, result:{}, appId:{}, cronId:{}", disableResult, appId, cronJobId); + if (!disableResult) { failedCronJobIds.add(cronJobId); } + } catch (Exception e) { + log.error("Failed to disable cron job with appId:{} and cronId:{}", appId, cronJobId, e); + failedCronJobIds.add(cronJobId); } - if (!failedCronJobIds.isEmpty()) { - log.warn("Failed to disable cron jobs for appId:{} with cronJobIds:{}", appId, failedCronJobIds); - } + } + if (!failedCronJobIds.isEmpty()) { + log.warn("Failed to disable cron jobs for appId:{} with cronJobIds:{}", appId, failedCronJobIds); } return failedCronJobIds.isEmpty(); } diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/JobCronNameUtil.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/JobCronNameUtil.java new file mode 100644 index 0000000000..f51e277eb6 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/JobCronNameUtil.java @@ -0,0 +1,60 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service.impl; + +/** + * Job定时任务名称工具类 + */ +public class JobCronNameUtil { + + /** + * 获取Quartz Job名称 + * + * @param cronJobId 定时任务ID + * @return Quartz Job名称 + */ + public static String getJobName(long cronJobId) { + return "job_" + cronJobId; + } + + /** + * 获取Quartz Job分组 + * + * @param appId Job业务ID + * @return Quartz Job分组 + */ + public static String getJobGroup(long appId) { + return "bk_app_" + appId; + } + + /*** + * 获取通知Job名称 + * @param cronJobId 定时任务ID + * @return 通知Job名称 + */ + public static String getNotifyJobName(long cronJobId) { + return getJobName(cronJobId) + "_notify"; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/QuartzServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/QuartzServiceImpl.java new file mode 100644 index 0000000000..afae412653 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/QuartzServiceImpl.java @@ -0,0 +1,222 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service.impl; + +import com.tencent.bk.job.common.constant.ErrorCode; +import com.tencent.bk.job.common.exception.FailedPreconditionException; +import com.tencent.bk.job.common.exception.InternalException; +import com.tencent.bk.job.common.exception.ServiceException; +import com.tencent.bk.job.common.util.TimeUtil; +import com.tencent.bk.job.common.util.date.DateUtils; +import com.tencent.bk.job.crontab.constant.CronConstants; +import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; +import com.tencent.bk.job.crontab.service.QuartzService; +import com.tencent.bk.job.crontab.timer.AbstractQuartzTaskHandler; +import com.tencent.bk.job.crontab.timer.QuartzJob; +import com.tencent.bk.job.crontab.timer.QuartzJobBuilder; +import com.tencent.bk.job.crontab.timer.QuartzTrigger; +import com.tencent.bk.job.crontab.timer.QuartzTriggerBuilder; +import com.tencent.bk.job.crontab.timer.executor.NotifyJobExecutor; +import com.tencent.bk.job.crontab.timer.executor.SimpleJobExecutor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.quartz.CronTrigger; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.SimpleTrigger; +import org.slf4j.helpers.MessageFormatter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.sql.Date; +import java.time.Instant; + +@Slf4j +@Service +public class QuartzServiceImpl implements QuartzService { + + private final AbstractQuartzTaskHandler quartzTaskHandler; + + @Autowired + public QuartzServiceImpl(AbstractQuartzTaskHandler quartzTaskHandler) { + this.quartzTaskHandler = quartzTaskHandler; + } + + @Override + public void tryToAddJobToQuartz(CronJobInfoDTO cronJobInfo) { + try { + addJobToQuartz(cronJobInfo); + } catch (ServiceException e) { + deleteJobFromQuartz(cronJobInfo.getAppId(), cronJobInfo.getId()); + throw e; + } catch (Exception e) { + deleteJobFromQuartz(cronJobInfo.getAppId(), cronJobInfo.getId()); + throw new InternalException(e, ErrorCode.INTERNAL_ERROR); + } + } + + /** + * 从Quartz引擎删除定时任务(含相关的通知任务) + * + * @param appId Job业务ID + * @param cronJobId 定时任务ID + * @return 是否删除成功 + */ + @Override + public boolean deleteJobFromQuartz(long appId, long cronJobId) { + if (appId <= 0 || cronJobId <= 0) { + return false; + } + String jobName = JobCronNameUtil.getJobName(cronJobId); + String jobGroup = JobCronNameUtil.getJobGroup(appId); + String notifyJobName = JobCronNameUtil.getNotifyJobName(cronJobId); + try { + quartzTaskHandler.deleteJob(JobKey.jobKey(jobName, jobGroup)); + quartzTaskHandler.deleteJob(JobKey.jobKey(notifyJobName, jobGroup)); + return true; + } catch (SchedulerException e) { + log.error("Error while delete job!", e); + } + return false; + } + + /** + * 将定时任务添加至Quartz引擎(含相关的前置通知任务) + * + * @param cronJobInfo 定时任务信息 + * @throws SchedulerException Quartz调度异常 + */ + private void addJobToQuartz(CronJobInfoDTO cronJobInfo) throws SchedulerException { + Long appId = cronJobInfo.getAppId(); + Long cronJobId = cronJobInfo.getId(); + String jobName = JobCronNameUtil.getJobName(cronJobId); + String jobGroup = JobCronNameUtil.getJobGroup(appId); + QuartzTrigger trigger = buildTrigger(cronJobInfo); + + QuartzJob job = QuartzJobBuilder.newJob() + .withIdentity(jobName, jobGroup) + .forJob(SimpleJobExecutor.class) + .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) + .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) + .withTrigger(trigger) + .build(); + + quartzTaskHandler.deleteJob(JobKey.jobKey(jobName, jobGroup)); + quartzTaskHandler.addJob(job); + + addNotifyJobIfNeed(cronJobInfo); + } + + /** + * 构建定时任务Quartz触发器 + * + * @param cronJobInfo 定时任务信息 + * @return 定时任务Quartz触发器 + */ + private QuartzTrigger buildTrigger(CronJobInfoDTO cronJobInfo) { + QuartzTrigger trigger; + String jobName = JobCronNameUtil.getJobName(cronJobInfo.getId()); + String jobGroup = JobCronNameUtil.getJobGroup(cronJobInfo.getAppId()); + if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { + // 根据cron表达式执行的定时任务 + QuartzTriggerBuilder cronTriggerBuilder = QuartzTriggerBuilder.newTrigger() + .ofType(QuartzTrigger.TriggerType.CRON) + .withIdentity(jobName, jobGroup) + .withCronExpression(cronJobInfo.getCronExpression()) + .withMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING); + if (cronJobInfo.getEndTime() > 0) { + if (cronJobInfo.getEndTime() < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); + } else { + cronTriggerBuilder = cronTriggerBuilder. + endAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getEndTime()))); + } + } + trigger = cronTriggerBuilder.build(); + } else if (cronJobInfo.getExecuteTime() > DateUtils.currentTimeSeconds()) { + // 只执行一次的定时任务 + trigger = QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.SIMPLE) + .withIdentity(jobName, jobGroup) + .startAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getExecuteTime()))).withRepeatCount(0) + .withIntervalInHours(1) + .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) + .build(); + } else { + // 只执行一次但是执行时间已经过期的定时任务 + String message = MessageFormatter.format( + "Cron job executionTime({}) already passed", + TimeUtil.formatTime(cronJobInfo.getExecuteTime()) + ).getMessage(); + throw new FailedPreconditionException(message, ErrorCode.CRON_JOB_TIME_PASSED); + } + return trigger; + } + + /** + * 按需添加通知任务至Quartz引擎 + * + * @param cronJobInfo 定时任务信息 + * @throws SchedulerException Quartz调度异常 + */ + private void addNotifyJobIfNeed(CronJobInfoDTO cronJobInfo) throws SchedulerException { + Long appId = cronJobInfo.getAppId(); + Long cronJobId = cronJobInfo.getId(); + String notifyJobName = JobCronNameUtil.getNotifyJobName(cronJobId); + String jobGroup = JobCronNameUtil.getJobGroup(appId); + if (cronJobInfo.getNotifyOffset() > 0) { + long notifyTime = 0L; + if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { + if (cronJobInfo.getEndTime() > 0) { + notifyTime = cronJobInfo.getEndTime() - cronJobInfo.getNotifyOffset(); + } + } else { + notifyTime = cronJobInfo.getExecuteTime() - cronJobInfo.getNotifyOffset(); + } + if (notifyTime < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); + } + QuartzTrigger notifyTrigger = QuartzTriggerBuilder.newTrigger() + .ofType(QuartzTrigger.TriggerType.SIMPLE) + .withIdentity(notifyJobName, jobGroup) + .startAt(Date.from(Instant.ofEpochSecond(notifyTime))).withRepeatCount(0).withIntervalInHours(1) + .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) + .build(); + + QuartzJob notifyJob = QuartzJobBuilder.newJob() + .withIdentity(notifyJobName, jobGroup) + .forJob(NotifyJobExecutor.class) + .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) + .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) + .withTrigger(notifyTrigger) + .build(); + + quartzTaskHandler.deleteJob(JobKey.jobKey(notifyJobName, jobGroup)); + quartzTaskHandler.addJob(notifyJob); + } else { + quartzTaskHandler.deleteJob(JobKey.jobKey(notifyJobName, jobGroup)); + } + } + +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/handler/DefaultQuartzTaskHandler.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/handler/DefaultQuartzTaskHandler.java index 890c888418..5cf65a44d1 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/handler/DefaultQuartzTaskHandler.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/handler/DefaultQuartzTaskHandler.java @@ -67,8 +67,8 @@ public void addJob(QuartzJob quartzJob) throws SchedulerException { Set triggers = createTriggers(quartzJob); - if (scheduler.isShutdown()) { - log.info("scheduler is shutdown, ignore add job {}!", quartzJob.getKey().getName()); + if (!scheduler.isStarted()) { + log.info("scheduler is not started, ignore add job {}!", quartzJob.getKey().getName()); return; } @@ -96,8 +96,8 @@ public void deleteJob(JobKey jobKey) throws SchedulerException { Assert.notNull(jobKey, "jobKey cannot be empty!"); Assert.notNull(jobKey.getName(), "jobKey name cannot be empty!"); - if (scheduler.isShutdown()) { - log.info("scheduler is shutdown, ignore delete job {}!", jobKey.getName()); + if (!scheduler.isStarted()) { + log.info("scheduler is not started, ignore delete job {}!", jobKey.getName()); return; } @@ -108,9 +108,9 @@ public void deleteJob(JobKey jobKey) throws SchedulerException { public void deleteJob(List jobKeys) throws SchedulerException { Assert.notNull(jobKeys, "jobKeys cannot be empty!"); - if (scheduler.isShutdown()) { + if (!scheduler.isStarted()) { log.info( - "scheduler is shutdown, ignore delete {} job keys: {}", + "scheduler is not started, ignore delete {} job keys: {}", jobKeys.size(), jobKeys.stream().map(JobKey::getName).collect(Collectors.toList()) ); @@ -125,8 +125,8 @@ public void deleteJob(List jobKeys) throws SchedulerException { */ @Override public void pauseAll() throws SchedulerException { - if (scheduler.isShutdown()) { - log.info("scheduler is shutdown, ignore pauseAll!"); + if (!scheduler.isStarted()) { + log.info("scheduler is not started, ignore pauseAll!"); return; } this.scheduler.pauseAll(); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index 4db9d05817..f261bd07f0 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -53,7 +53,6 @@ public static Condition build(Long taskInstanceId, ToggleEvaluateContext toggleEvaluateContext; JobExecuteContext jobExecuteContext = JobExecuteContextThreadLocalRepo.get(); if (jobExecuteContext == null) { - log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) toggleEvaluateContext = ToggleEvaluateContext.EMPTY; @@ -63,7 +62,6 @@ public static Condition build(Long taskInstanceId, toggleEvaluateContext = ToggleEvaluateContext.builder() .addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, resourceScope); } else { - log.info("TaskInstanceIdDynamicCondition : EmptyResourceScope!"); toggleEvaluateContext = ToggleEvaluateContext.EMPTY; } } @@ -76,13 +74,9 @@ public static Condition build(Long taskInstanceId, // 为了不影响兼容性,忽略错误 return DSL.trueCondition(); } else { - // 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG - log.info("TaskInstanceIdDynamicCondition: UseTaskInstanceIdCondition"); return taskInstanceIdConditionBuilder.apply(taskInstanceId); } } else { - // 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG - log.info("TaskInstanceIdDynamicCondition: IgnoreTaskInstanceIdCondition"); return DSL.trueCondition(); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java index 08b8f9c8bb..38024edb30 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java @@ -773,8 +773,15 @@ private void checkExecuteObjectExist(TaskInstanceDTO taskInstance, // 处理主机执行对象 if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotExistHosts())) { if (shouldIgnoreInvalidHost(taskInstance)) { - // 忽略主机不存在错误,并标识执行对象的 invalid 属性为 true - markExecuteObjectInvalid(stepInstanceList, taskInstanceExecuteObjects.getNotExistHosts()); + if (taskInstanceExecuteObjects.getNotExistHosts().stream().anyMatch(host -> host.getHostId() == null)) { + // 由于历史原因,部分定时任务使用了管控区域ID:Ipv4 作为主机 ID,并且这部分主机已经不存在于 cmdb,所以无法 + // 正确获取到对应的 hostId,会导致后续报错;所以这里直接对外抛出错误,不再继续兼容处理 + invalidExecuteObjects.addAll(taskInstanceExecuteObjects.getNotExistHosts().stream() + .map(this::printHostIdOrIp).collect(Collectors.toList())); + } else { + // 忽略主机不存在错误,并标识执行对象的 invalid 属性为 true + markExecuteObjectInvalid(stepInstanceList, taskInstanceExecuteObjects.getNotExistHosts()); + } } else { invalidExecuteObjects.addAll(taskInstanceExecuteObjects.getNotExistHosts().stream() .map(this::printHostIdOrIp).collect(Collectors.toList())); diff --git a/support-files/kubernetes/charts/bk-job/templates/job-analysis/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-analysis/configmap.yaml index 48767cc5ac..52d4c43e1e 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-analysis/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-analysis/configmap.yaml @@ -17,6 +17,9 @@ data: async: requestTimeout: 180s cloud: + loadbalancer: + cache: + ttl: 20s stream: function: definition: handleAIChatOperationEvent diff --git a/support-files/kubernetes/charts/bk-job/templates/job-backup/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-backup/configmap.yaml index 3ec5de1c6c..3d06691bc6 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-backup/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-backup/configmap.yaml @@ -14,6 +14,9 @@ data: application.yaml: |- spring: cloud: + loadbalancer: + cache: + ttl: 20s stream: defaultBinder: jobCommon binders: diff --git a/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml index 9e1cb16169..737d54edc6 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml @@ -14,6 +14,9 @@ data: application.yaml: |- spring: cloud: + loadbalancer: + cache: + ttl: 20s stream: defaultBinder: jobCommon binders: @@ -101,6 +104,9 @@ data: class: org.quartz.simpl.RAMJobStore misfireThreshold: 60000 plugin: + shutdownhook: + class: org.quartz.plugins.management.ShutdownHookPlugin + cleanShutdown: true triggHistory: class: org.quartz.plugins.history.LoggingJobHistoryPlugin scheduler: diff --git a/support-files/kubernetes/charts/bk-job/templates/job-execute/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-execute/configmap.yaml index f76474e32b..fad2462ec8 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-execute/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-execute/configmap.yaml @@ -14,6 +14,9 @@ data: application.yaml: |- spring: cloud: + loadbalancer: + cache: + ttl: 20s stream: defaultBinder: jobCommon binders: diff --git a/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml index f3a0c9142c..9bc3a46218 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml @@ -14,6 +14,9 @@ data: application.yaml: |- spring: cloud: + loadbalancer: + cache: + ttl: 20s stream: defaultBinder: jobCommon binders: diff --git a/support-files/kubernetes/charts/bk-job/templates/job-gateway/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-gateway/configmap.yaml index 13652bd34c..3fba9ff5b6 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-gateway/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-gateway/configmap.yaml @@ -14,6 +14,9 @@ data: application.yaml: |- spring: cloud: + loadbalancer: + cache: + ttl: 20s stream: defaultBinder: jobCommon binders: