diff --git a/src/backend/commons/common-utils/src/main/java/com/tencent/bk/job/common/util/CollectionUtil.java b/src/backend/commons/common-utils/src/main/java/com/tencent/bk/job/common/util/CollectionUtil.java index 345519c457..86c430a522 100644 --- a/src/backend/commons/common-utils/src/main/java/com/tencent/bk/job/common/util/CollectionUtil.java +++ b/src/backend/commons/common-utils/src/main/java/com/tencent/bk/job/common/util/CollectionUtil.java @@ -116,4 +116,26 @@ public static Map convertToMap(List entityCollection, } return map; } + + public static ArrayList mergeToArrayList(List list1, List list2) { + ArrayList mergeList; + + boolean isList1NotEmpty = CollectionUtils.isNotEmpty(list1); + boolean isList2NotEmpty = CollectionUtils.isNotEmpty(list2); + + if (isList1NotEmpty && isList2NotEmpty) { + mergeList = new ArrayList<>(list1.size() + list2.size()); + mergeList.addAll(list1); + mergeList.addAll(list2); + return mergeList; + } else if (isList1NotEmpty) { + mergeList = new ArrayList<>(list1); + return mergeList; + } else if (isList2NotEmpty) { + mergeList = new ArrayList<>(list2); + return mergeList; + } else { + return new ArrayList<>(0); + } + } } diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/HostCompositeKey.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/HostCompositeKey.java new file mode 100644 index 0000000000..8bd667f1b9 --- /dev/null +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/HostCompositeKey.java @@ -0,0 +1,107 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.model; + +import com.tencent.bk.job.common.model.dto.HostDTO; +import lombok.Getter; +import lombok.Setter; + +import java.util.Objects; + +/** + * 主机复合 KEY,用于主机的多种表达方式 + */ +@Getter +@Setter +public class HostCompositeKey { + /** + * Key 类型 + */ + private final HostCompositeKeyType keyType; + + /** + * 主机唯一 key(目前支持 hostId/cloudIp 两种) + */ + private final String key; + + public HostCompositeKey(HostCompositeKeyType keyType, String key) { + this.keyType = keyType; + this.key = key; + } + + public static HostCompositeKey ofHost(HostDTO host) { + if (host.getHostId() != null) { + // 优先使用 hostId + return new HostCompositeKey(HostCompositeKeyType.HOST_ID, String.valueOf(host.getHostId())); + } else if (host.toCloudIp() != null) { + // 没有 hostId, 使用管控区域 ID + ipv4 + return new HostCompositeKey(HostCompositeKeyType.CLOUD_IP, host.toCloudIp()); + } else { + throw new IllegalArgumentException("Invalid host, both hostId or cloudIp are empty"); + } + } + + + @Getter + public enum HostCompositeKeyType { + /** + * HostId 作为 KEY + */ + HOST_ID(1), + /** + * 管控区域 ID+ ipv4 作为 KEY + */ + CLOUD_IP(2); + + private final int value; + + HostCompositeKeyType(int value) { + this.value = value; + } + + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + HostCompositeKey that = (HostCompositeKey) o; + if (this.getKeyType() != that.getKeyType()) { + return false; + } + + return keyType == ((HostCompositeKey) o).getKeyType() && key.equals(that.getKey()); + } + + @Override + public int hashCode() { + return Objects.hash(keyType, key); + } + + @Override + public String toString() { + return keyType.getValue() + ":" + key; + } +} diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java index de9a1718f5..5b6224119e 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.tencent.bk.job.common.annotation.PersistenceObject; +import com.tencent.bk.job.common.model.HostCompositeKey; import com.tencent.bk.job.common.model.openapi.v4.OpenApiHostDTO; import com.tencent.bk.job.common.model.vo.CloudAreaInfoVO; import com.tencent.bk.job.common.model.vo.HostInfoVO; @@ -269,12 +270,8 @@ public HostDTO clone() { * @return 主机KEY */ @JsonIgnore - public String getUniqueKey() { - if (hostId != null) { - return "HOST_ID:" + hostId; - } else { - return "HOST_IP:" + toCloudIp(); - } + public HostCompositeKey getUniqueKey() { + return HostCompositeKey.ofHost(this); } /** diff --git a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileAgentTaskDAOImplIntegrationTest.java b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileAgentTaskDAOImplIntegrationTest.java index ce0f9aeec7..0e7c93a123 100644 --- a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileAgentTaskDAOImplIntegrationTest.java +++ b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileAgentTaskDAOImplIntegrationTest.java @@ -118,7 +118,7 @@ public void testBatchSaveAgentTasks() { agentTask2.setEndTime(1572858331000L); agentTask2.setTotalTime(1000L); agentTask2.setErrorCode(88); - agentTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + agentTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); agentTaskList.add(agentTask2); fileAgentTaskDAO.batchSaveAgentTasks(agentTaskList); @@ -156,7 +156,7 @@ public void testBatchSaveAgentTasks() { assertThat(agentTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(agentTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(agentTask2Return.getErrorCode()).isEqualTo(88); - assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); } @Test @@ -193,7 +193,7 @@ public void testBatchUpdateAgentTasks() { agentTask2.setEndTime(1572858331000L); agentTask2.setTotalTime(1000L); agentTask2.setErrorCode(88); - agentTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + agentTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); agentTaskList.add(agentTask2); fileAgentTaskDAO.batchUpdateAgentTasks(agentTaskList); @@ -227,7 +227,7 @@ public void testBatchUpdateAgentTasks() { assertThat(agentTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(agentTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(agentTask2Return.getErrorCode()).isEqualTo(88); - assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); } @Test diff --git a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileExecuteObjectTaskDAOImplIntegrationTest.java b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileExecuteObjectTaskDAOImplIntegrationTest.java index 6e1fc66b69..186333b9ed 100644 --- a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileExecuteObjectTaskDAOImplIntegrationTest.java +++ b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/FileExecuteObjectTaskDAOImplIntegrationTest.java @@ -119,7 +119,7 @@ public void testBatchSaveTasks() { executeObjectTask2.setEndTime(1572858331000L); executeObjectTask2.setTotalTime(1000L); executeObjectTask2.setErrorCode(88); - executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); executeObjectTaskList.add(executeObjectTask2); fileExecuteObjectTaskDAO.batchSaveTasks(executeObjectTaskList); @@ -157,7 +157,7 @@ public void testBatchSaveTasks() { assertThat(executeObjectTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(executeObjectTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(executeObjectTask2Return.getErrorCode()).isEqualTo(88); - assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); } @Test @@ -194,7 +194,7 @@ public void testBatchUpdateAgentTasks() { executeObjectTask2.setEndTime(1572858331000L); executeObjectTask2.setTotalTime(1000L); executeObjectTask2.setErrorCode(88); - executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); executeObjectTaskList.add(executeObjectTask2); fileExecuteObjectTaskDAO.batchUpdateTasks(executeObjectTaskList); @@ -226,7 +226,7 @@ public void testBatchUpdateAgentTasks() { assertThat(executeObjectTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(executeObjectTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(executeObjectTask2Return.getErrorCode()).isEqualTo(88); - assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); } @Test diff --git a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptAgentTaskDAOImplIntegrationTest.java b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptAgentTaskDAOImplIntegrationTest.java index 33d4b9b0cb..8e6f170526 100644 --- a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptAgentTaskDAOImplIntegrationTest.java +++ b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptAgentTaskDAOImplIntegrationTest.java @@ -121,7 +121,7 @@ public void testBatchSaveAgentTasks() { agentTask2.setEndTime(1572858331000L); agentTask2.setTotalTime(1000L); agentTask2.setErrorCode(88); - agentTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + agentTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); agentTask2.setTag("bb"); agentTask2.setExitCode(2); agentTaskList.add(agentTask2); @@ -159,7 +159,7 @@ public void testBatchSaveAgentTasks() { assertThat(agentTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(agentTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(agentTask2Return.getErrorCode()).isEqualTo(88); - assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); assertThat(agentTask2Return.getTag()).isEqualTo("bb"); assertThat(agentTask2Return.getExitCode()).isEqualTo(2); } @@ -199,7 +199,7 @@ public void testBatchUpdateAgentTasks() { agentTask2.setEndTime(1572858331000L); agentTask2.setTotalTime(1000L); agentTask2.setErrorCode(88); - agentTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + agentTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); agentTask2.setTag("bb"); agentTask2.setExitCode(2); agentTaskList.add(agentTask2); @@ -235,7 +235,7 @@ public void testBatchUpdateAgentTasks() { assertThat(agentTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(agentTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(agentTask2Return.getErrorCode()).isEqualTo(88); - assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(agentTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); assertThat(agentTask2Return.getTag()).isEqualTo("bb"); assertThat(agentTask2Return.getExitCode()).isEqualTo(2); } diff --git a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptExecuteObjectTaskDAOImplIntegrationTest.java b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptExecuteObjectTaskDAOImplIntegrationTest.java index 16dd78b2f8..8456217daa 100644 --- a/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptExecuteObjectTaskDAOImplIntegrationTest.java +++ b/src/backend/job-execute/boot-job-execute/src/test/java/com/tencent/bk/job/execute/dao/impl/ScriptExecuteObjectTaskDAOImplIntegrationTest.java @@ -122,7 +122,7 @@ public void testBatchSaveTasks() { executeObjectTask2.setEndTime(1572858331000L); executeObjectTask2.setTotalTime(1000L); executeObjectTask2.setErrorCode(88); - executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); executeObjectTask2.setTag("bb"); executeObjectTask2.setExitCode(2); executeObjectTaskList.add(executeObjectTask2); @@ -162,7 +162,7 @@ public void testBatchSaveTasks() { assertThat(executeObjectTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(executeObjectTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(executeObjectTask2Return.getErrorCode()).isEqualTo(88); - assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); assertThat(executeObjectTask2Return.getTag()).isEqualTo("bb"); assertThat(executeObjectTask2Return.getExitCode()).isEqualTo(2); } @@ -202,7 +202,7 @@ public void testBatchUpdateTasks() { executeObjectTask2.setEndTime(1572858331000L); executeObjectTask2.setTotalTime(1000L); executeObjectTask2.setErrorCode(88); - executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + executeObjectTask2.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); executeObjectTask2.setTag("bb"); executeObjectTask2.setExitCode(2); executeObjectTaskList.add(executeObjectTask2); @@ -238,7 +238,7 @@ public void testBatchUpdateTasks() { assertThat(executeObjectTask2Return.getEndTime()).isEqualTo(1572858331000L); assertThat(executeObjectTask2Return.getTotalTime()).isEqualTo(1000L); assertThat(executeObjectTask2Return.getErrorCode()).isEqualTo(88); - assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.HOST_NOT_EXIST); + assertThat(executeObjectTask2Return.getStatus()).isEqualTo(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); assertThat(executeObjectTask2Return.getTag()).isEqualTo("bb"); assertThat(executeObjectTask2Return.getExitCode()).isEqualTo(2); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTaskStatusEnum.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTaskStatusEnum.java index 33c0458191..2dae160527 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTaskStatusEnum.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTaskStatusEnum.java @@ -37,9 +37,9 @@ public enum ExecuteObjectTaskStatusEnum { */ AGENT_ERROR(1), /** - * 无效主机 + * 无效执行对象 */ - HOST_NOT_EXIST(2), + INVALID_EXECUTE_OBJECT(2), /** * 上次已成功 */ diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskStartCommand.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskStartCommand.java index 299400017c..d4ebc7969d 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskStartCommand.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskStartCommand.java @@ -31,7 +31,6 @@ import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.config.JobExecuteConfig; import com.tencent.bk.job.execute.engine.EngineDependentServiceHolder; -import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum; import com.tencent.bk.job.execute.engine.evict.TaskEvictPolicyExecutor; import com.tencent.bk.job.execute.engine.listener.event.EventSource; import com.tencent.bk.job.execute.engine.listener.event.StepEvent; @@ -59,7 +58,6 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.util.StopWatch; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -102,9 +100,9 @@ public abstract class AbstractGseTaskStartCommand extends AbstractGseTaskCommand */ protected Map globalVariables = new HashMap<>(); /** - * 执行对象任务列表 + * 目标执行对象任务列表(全量,包含非法的任务) */ - protected List executeObjectTasks; + protected List targetExecuteObjectTasks; AbstractGseTaskStartCommand(EngineDependentServiceHolder engineDependentServiceHolder, @@ -225,33 +223,19 @@ private boolean startGseTaskIfNotAvailable(StopWatch watch) { } private void initExecuteObjectTasks() { - this.executeObjectTasks = executeObjectTaskService.listTasksByGseTaskId(stepInstance, gseTask.getId()); - updateUninstalledExecuteObjectTasks(this.executeObjectTasks); + targetExecuteObjectTasks = + executeObjectTaskService.listTasksByGseTaskId(stepInstance, gseTask.getId()) + .stream() + .filter(ExecuteObjectTask::isTarget) + .collect(Collectors.toList()); - executeObjectTasks.stream() - .filter(ExecuteObjectTask::isTarget) - .filter(executeObjectTask -> !executeObjectTask.getExecuteObject().isAgentIdEmpty()) + targetExecuteObjectTasks.stream() + .filter(executeObjectTask -> executeObjectTask.getExecuteObject().isExecutable()) .forEach(executeObjectTask -> this.targetExecuteObjectTaskMap.put( executeObjectTask.getExecuteObject().toExecuteObjectGseKey(), executeObjectTask)); } - private void updateUninstalledExecuteObjectTasks(Collection executeObjectTasks) { - List invalidExecuteObjectTasks = executeObjectTasks.stream() - .filter(executeObjectTask -> executeObjectTask.getExecuteObject().isAgentIdEmpty()) - .collect(Collectors.toList()); - if (CollectionUtils.isNotEmpty(invalidExecuteObjectTasks)) { - log.warn("{} contains invalid execute object tasks: {}", gseTaskInfo, invalidExecuteObjectTasks); - invalidExecuteObjectTasks.forEach(executeObjectTask -> { - executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.AGENT_NOT_INSTALLED); - executeObjectTask.setStartTime(System.currentTimeMillis()); - executeObjectTask.setEndTime(System.currentTimeMillis()); - executeObjectTask.calculateTotalTime(); - }); - executeObjectTaskService.batchUpdateTasks(executeObjectTasks); - } - } - private void initVariables() { if (taskInstance.isPlanInstance()) { List taskVariables = diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/FileGseTaskStartCommand.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/FileGseTaskStartCommand.java index 9c31c19443..7a88d40a09 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/FileGseTaskStartCommand.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/FileGseTaskStartCommand.java @@ -33,6 +33,7 @@ import com.tencent.bk.job.common.gse.v2.model.SourceFile; import com.tencent.bk.job.common.gse.v2.model.TargetFile; import com.tencent.bk.job.common.gse.v2.model.TransferFileRequest; +import com.tencent.bk.job.common.util.CollectionUtil; import com.tencent.bk.job.common.util.DataSizeConverter; import com.tencent.bk.job.common.util.FilePathUtils; import com.tencent.bk.job.common.util.date.DateUtils; @@ -99,6 +100,10 @@ public class FileGseTaskStartCommand extends AbstractGseTaskStartCommand { * 源文件与目标文件路径映射关系, 包含非法主机 */ private Map allSrcDestFileMap; + /** + * 源执行对象任务列表(全量,包含非法的任务) + */ + protected List sourceExecuteObjectTasks; public FileGseTaskStartCommand(EngineDependentServiceHolder engineDependentServiceHolder, @@ -144,7 +149,7 @@ private void parseSrcDestFileMap() { allSrcDestFileMap = JobSrcFileUtils.buildSourceDestPathMapping(allSrcFiles, targetDir, stepInstance.getFileTargetName()); allSrcDestFileMap.forEach((sreFile, destFile) -> { - if (isAgentInstalled(sreFile.getExecuteObject())) { + if (sreFile.getExecuteObject().isExecutable()) { srcDestFileMap.put(sreFile, destFile); } }); @@ -170,7 +175,7 @@ private void resolveFileSource() { private void parseSrcFiles() { allSrcFiles = JobSrcFileUtils.parseSrcFiles(stepInstance, fileStorageRootPath); srcFiles = allSrcFiles.stream() - .filter(file -> isAgentInstalled(file.getExecuteObject())) + .filter(file -> file.getExecuteObject().isExecutable()) .collect(Collectors.toSet()); // 设置源文件所在主机账号信息 setAccountInfoForSourceFiles(srcFiles); @@ -237,7 +242,7 @@ private void initFileSourceExecuteObjectTasks() { } } } - List executeObjectTasks = new ArrayList<>(); + sourceExecuteObjectTasks = new ArrayList<>(); for (ExecuteObject sourceExecuteObject : sourceExecuteObjects) { ExecuteObjectTask executeObjectTask = new ExecuteObjectTask( taskInstanceId, @@ -251,15 +256,17 @@ private void initFileSourceExecuteObjectTasks() { executeObjectTask.setGseTaskId(gseTask.getId()); if (sourceExecuteObject.isAgentIdEmpty()) { - executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.FAILED); + executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.AGENT_NOT_INSTALLED); + } else if (sourceExecuteObject.isInvalid()) { + executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); } else { executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.WAITING); sourceExecuteObjectTaskMap.put(sourceExecuteObject.toExecuteObjectGseKey(), executeObjectTask); } - executeObjectTasks.add(executeObjectTask); + sourceExecuteObjectTasks.add(executeObjectTask); } - fileExecuteObjectTaskService.batchSaveTasks(executeObjectTasks); + fileExecuteObjectTaskService.batchSaveTasks(sourceExecuteObjectTasks); } @Override @@ -348,33 +355,50 @@ private void saveInitialFileTaskLogs() { private void addInitialFileUploadTaskLogs(Map logs) { // 每个要分发的源文件一条上传日志 for (JobFile file : allSrcFiles) { - boolean isAgentInstalled = isAgentInstalled(file.getExecuteObject()); - FileDistStatusEnum status = isAgentInstalled ? + boolean isSourceValid = !file.getExecuteObject().isInvalid(); + boolean isSourceAgentInstalled = !file.getExecuteObject().isAgentIdEmpty(); + FileDistStatusEnum status = isSourceValid && isSourceAgentInstalled ? FileDistStatusEnum.WAITING : FileDistStatusEnum.FAILED; logService.addFileTaskLog( stepInstance, logs, file.getExecuteObject(), logService.buildUploadServiceFileTaskLogDTO( - stepInstance, file, status, "--", "--", "--", - isAgentInstalled ? null : "Agent is not installed")); + stepInstance, + file, + status, + "--", + "--", + "--", + buildInitialFileTaskUploadLogContent(isSourceValid, isSourceAgentInstalled) + ) + ); } } - private boolean isAgentInstalled(ExecuteObject executeObject) { - return !executeObject.isAgentIdEmpty(); + private String buildInitialFileTaskUploadLogContent(boolean isSourceValid, + boolean isSourceAgentInstalled) { + if (!isSourceValid) { + return "Execute object is invalid"; + } else if (!isSourceAgentInstalled) { + return "Agent is not installed"; + } else { + // 源、目标正常,无需写入错误日志 + return null; + } } private void addInitialFileDownloadTaskLogs(Map logs) { // 每个目标IP从每个要分发的源文件下载的一条下载日志 - executeObjectTasks.stream() - .filter(ExecuteObjectTask::isTarget) + targetExecuteObjectTasks .forEach(targetExecuteObjectTask -> { - boolean isTargetAgentInstalled = isAgentInstalled(targetExecuteObjectTask.getExecuteObject()); + boolean isTargetValid = !targetExecuteObjectTask.getExecuteObject().isInvalid(); + boolean isTargetAgentInstalled = !targetExecuteObjectTask.getExecuteObject().isAgentIdEmpty(); for (JobFile file : allSrcFiles) { - boolean isSourceAgentInstalled = isAgentInstalled(file.getExecuteObject()); - FileDistStatusEnum status = isTargetAgentInstalled && isSourceAgentInstalled ? - FileDistStatusEnum.WAITING : FileDistStatusEnum.FAILED; + boolean isSourceValid = !file.getExecuteObject().isInvalid(); + boolean isSourceAgentInstalled = !file.getExecuteObject().isAgentIdEmpty(); + FileDistStatusEnum status = isTargetValid && isTargetAgentInstalled && isSourceValid + && isSourceAgentInstalled ? FileDistStatusEnum.WAITING : FileDistStatusEnum.FAILED; logService.addFileTaskLog( stepInstance, logs, @@ -388,14 +412,36 @@ private void addInitialFileDownloadTaskLogs(Map executionLogs) { if (log.isDebugEnabled()) { log.debug("Write file task initial logs, executionLogs: {}", executionLogs); @@ -427,10 +473,11 @@ protected void addResultHandleTask() { gseTask, srcDestFileMap, requestId, - executeObjectTasks); + CollectionUtil.mergeToArrayList(targetExecuteObjectTasks, sourceExecuteObjectTasks)); resultHandleManager.handleDeliveredTask(fileResultHandleTask); } + @Override protected boolean checkGseTaskExecutable() { if (this.targetExecuteObjectTaskMap.isEmpty()) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java index 065156976c..46b93c3016 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java @@ -611,7 +611,7 @@ protected final void addResultHandleTask() { targetExecuteObjectTaskMap, gseTask, requestId, - executeObjectTasks); + targetExecuteObjectTasks); resultHandleManager.handleDeliveredTask(scriptResultHandleTask); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepEventHandler.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepEventHandler.java index ebfe06ca85..b1a76e90f1 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepEventHandler.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepEventHandler.java @@ -282,9 +282,15 @@ private void saveExecuteObjectTasksForStartStep(Long gseTaskId, } else { // 普通步骤,启动的时候需要初始化所有ExecuteObjectTask List executeObjectTasks = new ArrayList<>( - buildInitialExecuteObjectTasks(stepInstance.getTaskInstanceId(), stepInstanceId, executeCount, - executeCount, batch, gseTaskId, - stepInstance.getTargetExecuteObjects().getExecuteObjectsCompatibly())); + buildInitialExecuteObjectTasks( + stepInstance.getTaskInstanceId(), + stepInstanceId, + executeCount, + executeCount, + batch, + gseTaskId, + stepInstance.getTargetExecuteObjects().getExecuteObjectsCompatibly()) + ); saveExecuteObjectTasks(stepInstance, executeObjectTasks); } } @@ -361,7 +367,16 @@ private List buildInitialExecuteObjectTasks(long taskInstance executeObjectTask.setActualExecuteCount(actualExecuteCount); executeObjectTask.setBatch(batch); executeObjectTask.setGseTaskId(gseTaskId); - executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.WAITING); + executeObjectTask.setStatus(executeObject.isExecutable() ? + ExecuteObjectTaskStatusEnum.WAITING : + executeObject.isAgentIdEmpty() ? + ExecuteObjectTaskStatusEnum.AGENT_NOT_INSTALLED : + ExecuteObjectTaskStatusEnum.INVALID_EXECUTE_OBJECT); + if (!executeObject.isExecutable()) { + executeObjectTask.setStartTime(System.currentTimeMillis()); + executeObjectTask.setEndTime(System.currentTimeMillis()); + executeObjectTask.setTotalTime(0L); + } executeObjectTask.setFileTaskMode(FileTaskModeEnum.DOWNLOAD); executeObjectTask.setExecuteObject(executeObject); return executeObjectTask; @@ -561,7 +576,9 @@ private boolean isStepSupportRetry(RunStatusEnum stepStatus) { || RunStatusEnum.STOP_SUCCESS == stepStatus; } - private void saveExecuteObjectTasksForRetryFail(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch, + private void saveExecuteObjectTasksForRetryFail(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch, Long gseTaskId) { List retryExecuteObjectTasks = listTargetExecuteObjectTasks(stepInstance, executeCount - 1); @@ -572,8 +589,10 @@ private void saveExecuteObjectTasksForRetryFail(StepInstanceBaseDTO stepInstance } // 只有失败的目标主机才需要参与重试 if (!ExecuteObjectTaskStatusEnum.isSuccess(retryExecuteObjectTask.getStatus())) { - retryExecuteObjectTask.setActualExecuteCount(executeCount); - retryExecuteObjectTask.resetTaskInitialStatus(); + if (retryExecuteObjectTask.getExecuteObject().isExecutable()) { + retryExecuteObjectTask.setActualExecuteCount(executeCount); + retryExecuteObjectTask.resetTaskInitialStatus(); + } retryExecuteObjectTask.setGseTaskId(gseTaskId); } } @@ -582,7 +601,9 @@ private void saveExecuteObjectTasksForRetryFail(StepInstanceBaseDTO stepInstance } - private void saveExecuteObjectTasksForRetryAll(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch, + private void saveExecuteObjectTasksForRetryAll(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch, Long gseTaskId) { List retryExecuteObjectTasks = listTargetExecuteObjectTasks(stepInstance, executeCount - 1); @@ -591,8 +612,11 @@ private void saveExecuteObjectTasksForRetryAll(StepInstanceBaseDTO stepInstance, if (batch != null && retryExecuteObjectTask.getBatch() != batch) { continue; } - retryExecuteObjectTask.setActualExecuteCount(executeCount); - retryExecuteObjectTask.resetTaskInitialStatus(); + if (retryExecuteObjectTask.getExecuteObject().isExecutable()) { + // 重置运行数据 + retryExecuteObjectTask.setActualExecuteCount(executeCount); + retryExecuteObjectTask.resetTaskInitialStatus(); + } retryExecuteObjectTask.setGseTaskId(gseTaskId); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java index 86ccd46fd0..3b5d0707fc 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java @@ -93,6 +93,11 @@ public class ExecuteObject implements Cloneable { @JsonIgnore private ExecuteObjectGseKey executeObjectGseKey; + /** + * 是否非法执行对象 + */ + private boolean invalid; + public ExecuteObject(Container container) { this.container = container; this.type = ExecuteObjectTypeEnum.CONTAINER; @@ -138,6 +143,7 @@ public ExecuteObject clone() { if (container != null) { clone.setContainer(container.clone()); } + clone.setInvalid(invalid); return clone; } @@ -175,6 +181,14 @@ public boolean isAgentIdEmpty() { } } + /** + * 判断是否可作为执行目标 + */ + @JsonIgnore + public boolean isExecutable() { + return !isInvalid() && !isAgentIdEmpty(); + } + public Agent toGseAgent() { Agent agent = new Agent(); if (isHostExecuteObject()) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java index be2ccab43d..63aa59183f 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java @@ -133,7 +133,7 @@ public abstract class AbstractResultHandleTask implements ContinuousScheduled */ protected GseTaskDTO gseTask; /** - * 执行对象任务列表 + * 执行对象任务列表(全量) */ protected List executeObjectTasks; /** @@ -204,9 +204,9 @@ public abstract class AbstractResultHandleTask implements ContinuousScheduled */ protected boolean gseV2Task; /** - * 是否包含非法执行对象 + * 是否存在不可执行的目标执行对象 */ - protected boolean hasInvalidExecuteObject; + protected boolean existNoExecutableTargetExecuteObject; /** * GSE 任务信息,用于日志输出 */ @@ -214,7 +214,7 @@ public abstract class AbstractResultHandleTask implements ContinuousScheduled protected final RunningJobKeepaliveManager runningJobKeepaliveManager; - private TaskContext taskContext; + private final TaskContext taskContext; protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentServiceHolder, ExecuteObjectTaskService executeObjectTaskService, @@ -265,9 +265,10 @@ protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentS } } - this.hasInvalidExecuteObject = - executeObjectTasks.stream().anyMatch( - executeObjectTask -> executeObjectTask.getExecuteObject().isAgentIdEmpty()); + this.existNoExecutableTargetExecuteObject = + executeObjectTasks.stream(). + filter(ExecuteObjectTask::isTarget) + .anyMatch(executeObjectTask -> !executeObjectTask.getExecuteObject().isExecutable()); } private String buildGseTaskInfo(Long jobInstanceId, GseTaskDTO gseTask) { @@ -686,7 +687,7 @@ protected GseTaskExecuteResult analyseFinishedExecuteResult() { GseTaskExecuteResult rst; if (isAllTargetExecuteObjectTasksSuccess()) { // 如果源/目标包含非法主机,设置任务状态为失败 - if (hasInvalidExecuteObject) { + if (existNoExecutableExecuteObject()) { log.info("Gse task contains invalid execute object, set execute result fail"); rst = GseTaskExecuteResult.FAILED; } else { @@ -702,6 +703,13 @@ protected GseTaskExecuteResult analyseFinishedExecuteResult() { return rst; } + /** + * 任务是否包含不可执行的执行对象 + */ + protected boolean existNoExecutableExecuteObject() { + return this.existNoExecutableTargetExecuteObject; + } + /** * 获取执行结果 * diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/FileResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/FileResultHandleTask.java index 9ddc30d2f4..77188e0f69 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/FileResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/FileResultHandleTask.java @@ -150,9 +150,9 @@ public class FileResultHandleTask extends AbstractResultHandleTask !executeObjectTask.isTarget() && + !executeObjectTask.getExecuteObject().isExecutable()); + log.info("InitFileResultHandleTask|stepInstanceId: {}|sourceExecuteObjectGseKeys: {}" - + "|targetExecuteObjectGseKeys: {}|fileUploadTaskNumMap: {}|fileDownloadTaskNumMap: {}", + + "|targetExecuteObjectGseKeys: {}|fileUploadTaskNumMap: {}|fileDownloadTaskNumMap: {}" + + "|existNoExecutableSourceExecuteObject: {}", stepInstance.getId(), sourceExecuteObjectGseKeys, targetExecuteObjectGseKeys, fileUploadTaskNumMap, - fileDownloadTaskNumMap); + fileDownloadTaskNumMap, existNoExecutableSourceExecuteObject); } private void initSrcFilesMap(Collection srcFiles) { @@ -532,6 +538,11 @@ private GseTaskExecuteResult analyseExecuteResult() { return rst; } + @Override + protected boolean existNoExecutableExecuteObject() { + return this.existNoExecutableTargetExecuteObject || this.existNoExecutableSourceExecuteObject; + } + private boolean isAllSourceExecuteObjectTasksDone() { return this.notFinishedSourceExecuteObjectGseKeys.isEmpty() && this.notFinishedTargetExecuteObjectGseKeys.isEmpty(); @@ -724,7 +735,7 @@ private void analyseExecuteObjectTaskStatus(int errorCode, ExecuteObjectTask executeObjectTask) { // 文件任务成功数=任务总数 if (successNum >= fileNum) { - if (hasInvalidSourceExecuteObject) { + if (existNoExecutableSourceExecuteObject) { // 如果包含了非法的源文件主机,即使GSE任务(已过滤非法主机)执行成功,那么对于这个主机来说,整体上任务状态是失败 executeObjectTask.setStatus(ExecuteObjectTaskStatusEnum.FAILED); } else { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectTask.java index 89e238409e..6fc70dfa73 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectTask.java @@ -50,38 +50,55 @@ public class ExecuteObjectTask { /** * 作业实例ID */ + @Setter + @Getter private long taskInstanceId; /** * 步骤实例ID */ + @Setter + @Getter private long stepInstanceId; /** * 步骤执行次数 */ + @Setter + @Getter private int executeCount; /** * 任务对应的实际的步骤执行次数(重试场景,可能任务并没有实际被执行) */ + @Setter + @Getter private Integer actualExecuteCount; /** * 滚动执行批次 */ + @Setter + @Getter private int batch; /** * GSE 任务ID */ + @Setter + @Getter private Long gseTaskId; /** * 执行对象 ID */ + @Setter + @Getter private String executeObjectId; /** * 执行对象类型 */ + @Setter + @Getter private ExecuteObjectTypeEnum executeObjectType; /** * 执行对象 */ + @Getter private ExecuteObject executeObject; /** * 主机 ID @@ -100,46 +117,61 @@ public class ExecuteObjectTask { /** * 任务状态 */ + @Getter private ExecuteObjectTaskStatusEnum status; /** * 任务开始时间 */ + @Getter private Long startTime; /** * 任务结束时间 */ + @Getter private Long endTime; /** * 耗时,毫秒 */ + @Getter private Long totalTime; /** * GSE返回错误码 */ + @Getter private int errorCode; /** * 脚本任务-执行程序退出码, 0 脚本执行成功,非 0 脚本执行失败 */ + @Getter private Integer exitCode; /** * 脚本任务-用户自定义执行结果分组 */ + @Setter + @Getter private String tag = ""; /** * 脚本任务-日志偏移量。Job 从 GSE 根据 scriptLogOffset 增量拉取执行日志 */ + @Getter private int scriptLogOffset; /** * 脚本任务-执行日志 */ + @Setter + @Getter private String scriptLogContent; /** * 文件任务类型 */ + @Setter + @Getter private FileTaskModeEnum fileTaskMode; /** * 结果是否发生变化 */ + @Setter + @Getter private volatile boolean changed; public ExecuteObjectTask(long taskInstanceId, @@ -282,74 +314,6 @@ public boolean isSuccess() { return ExecuteObjectTaskStatusEnum.isSuccess(status); } - public long getTaskInstanceId() { - return taskInstanceId; - } - - public void setTaskInstanceId(long taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - - public long getStepInstanceId() { - return stepInstanceId; - } - - public void setStepInstanceId(long stepInstanceId) { - this.stepInstanceId = stepInstanceId; - } - - public int getExecuteCount() { - return executeCount; - } - - public void setExecuteCount(int executeCount) { - this.executeCount = executeCount; - } - - public Integer getActualExecuteCount() { - return actualExecuteCount; - } - - public void setActualExecuteCount(Integer actualExecuteCount) { - this.actualExecuteCount = actualExecuteCount; - } - - public int getBatch() { - return batch; - } - - public void setBatch(int batch) { - this.batch = batch; - } - - public Long getGseTaskId() { - return gseTaskId; - } - - public void setGseTaskId(Long gseTaskId) { - this.gseTaskId = gseTaskId; - } - - public String getExecuteObjectId() { - return executeObjectId; - } - - public void setExecuteObjectId(String executeObjectId) { - this.executeObjectId = executeObjectId; - } - - public ExecuteObjectTypeEnum getExecuteObjectType() { - return executeObjectType; - } - - public void setExecuteObjectType(ExecuteObjectTypeEnum executeObjectType) { - this.executeObjectType = executeObjectType; - } - - public ExecuteObject getExecuteObject() { - return executeObject; - } - @Deprecated @CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA, explain = "兼容老数据,数据失效后可删除") @@ -378,66 +342,6 @@ public void setAgentId(String agentId) { this.agentId = agentId; } - public ExecuteObjectTaskStatusEnum getStatus() { - return status; - } - - public Long getStartTime() { - return startTime; - } - - public Long getEndTime() { - return endTime; - } - - public Long getTotalTime() { - return totalTime; - } - - public int getErrorCode() { - return errorCode; - } - - public Integer getExitCode() { - return exitCode; - } - - public String getTag() { - return tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - public int getScriptLogOffset() { - return scriptLogOffset; - } - - public String getScriptLogContent() { - return scriptLogContent; - } - - public void setScriptLogContent(String scriptLogContent) { - this.scriptLogContent = scriptLogContent; - } - - public FileTaskModeEnum getFileTaskMode() { - return fileTaskMode; - } - - public void setFileTaskMode(FileTaskModeEnum fileTaskMode) { - this.fileTaskMode = fileTaskMode; - } - - public boolean isChanged() { - return changed; - } - - public void setChanged(boolean changed) { - this.changed = changed; - } - public void setExecuteObject(ExecuteObject executeObject) { this.executeObject = executeObject; if (executeObject != null) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java index e2fb0867f3..39b444d794 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java @@ -24,11 +24,16 @@ package com.tencent.bk.job.execute.model; +import com.tencent.bk.job.common.model.HostCompositeKey; import com.tencent.bk.job.common.model.dto.Container; import com.tencent.bk.job.common.model.dto.HostDTO; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.collections4.CollectionUtils; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,16 +42,19 @@ /** * 作业实例中包含的执行对象 */ -@Data +@Getter +@ToString public class TaskInstanceExecuteObjects { /** * 当前作业实例是否包含主机执行对象 */ + @Setter private boolean containsAnyHost; /** * 当前作业实例是否包含容器执行对象 */ + @Setter private boolean containsAnyContainer; /** @@ -69,12 +77,18 @@ public class TaskInstanceExecuteObjects { /** * 不存在的容器ID列表 */ + @Setter private Set notExistContainerIds; /** * 主机白名单 * key=hostId, value: 允许的操作列表 */ - Map> whiteHostAllowActions; + @Setter + private Map> whiteHostAllowActions; + /** + * 全量主机 Map + */ + private final Map hostMap = new HashMap<>(); public void addContainers(Collection containers) { if (validContainers == null) { @@ -89,4 +103,25 @@ public void addContainer(Container container) { } validContainers.add(container); } + + public void setNotExistHosts(List notExistHosts) { + this.notExistHosts = notExistHosts; + putHostMap(notExistHosts); + } + + public void setNotInAppHosts(List notInAppHosts) { + this.notInAppHosts = notInAppHosts; + putHostMap(notInAppHosts); + } + + public void setValidHosts(List validHosts) { + this.validHosts = validHosts; + putHostMap(validHosts); + } + + private void putHostMap(List hosts) { + if (CollectionUtils.isNotEmpty(hosts)) { + hosts.forEach(host -> hostMap.put(host.getUniqueKey(), host)); + } + } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java index ed82f0e2ec..4c6362da15 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java @@ -814,7 +814,8 @@ private void checkStepInstanceExecuteTargetNonEmpty(StepInstanceDTO stepInstance // 远程文件分发需要判断文件源主机是否为空 if (TaskFileTypeEnum.SERVER.getType() == fileSource.getFileType()) { ExecuteTargetDTO executeTarget = fileSource.getServers(); - if (executeTarget != null && CollectionUtils.isEmpty(executeTarget.getExecuteObjectsCompatibly())) { + if (executeTarget == null + || CollectionUtils.isEmpty(executeTarget.getExecuteObjectsCompatibly())) { log.warn("Empty file source server, stepInstanceName: {}", stepInstance.getName()); throw new FailedPreconditionException(ErrorCode.STEP_SOURCE_EXECUTE_OBJECT_EMPTY, new String[]{stepInstance.getName()}); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java index 97026a81e4..7306aabe54 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskInstanceExecuteObjectProcessor.java @@ -40,6 +40,7 @@ import com.tencent.bk.job.common.gse.util.AgentUtils; import com.tencent.bk.job.common.gse.v2.model.resp.AgentState; import com.tencent.bk.job.common.metrics.CommonMetricTags; +import com.tencent.bk.job.common.model.HostCompositeKey; import com.tencent.bk.job.common.model.dto.Container; import com.tencent.bk.job.common.model.dto.HostDTO; import com.tencent.bk.job.common.model.dto.ResourceScope; @@ -163,8 +164,8 @@ public TaskInstanceExecuteObjects processExecuteObjects(TaskInstanceDTO taskInst boolean isSupportExecuteObjectFeature = isSupportExecuteObjectFeature(taskInstance); // 合并所有执行对象 mergeExecuteObjects(stepInstanceList, variables, isSupportExecuteObjectFeature); - // 检查执行对象是否存在 - checkExecuteObjectExist(taskInstanceExecuteObjects); + // 检查执行对象是否合法 + checkExecuteObjectExist(taskInstance, stepInstanceList, taskInstanceExecuteObjects); watch.stop(); // 如果包含主机执行对象,需要获取主机白名单 @@ -180,7 +181,7 @@ public TaskInstanceExecuteObjects processExecuteObjects(TaskInstanceDTO taskInst // 检查执行对象是否可用 watch.start("checkExecuteObjectAccessible"); - checkExecuteObjectAccessible(appId, stepInstanceList, taskInstanceExecuteObjects); + checkExecuteObjectAccessible(taskInstance, stepInstanceList, taskInstanceExecuteObjects); watch.stop(); return taskInstanceExecuteObjects; @@ -467,19 +468,7 @@ private void fillTaskInstanceHostDetail(TaskInstanceDTO taskInstance, fillHostAgent(taskInstance, taskInstanceExecuteObjects); - Map hostMap = new HashMap<>(); - if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getValidHosts())) { - taskInstanceExecuteObjects.getValidHosts().forEach(host -> { - hostMap.put("hostId:" + host.getHostId(), host); - hostMap.put("hostIp:" + host.toCloudIp(), host); - }); - } - if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotInAppHosts())) { - taskInstanceExecuteObjects.getNotInAppHosts().forEach(host -> { - hostMap.put("hostId:" + host.getHostId(), host); - hostMap.put("hostIp:" + host.toCloudIp(), host); - }); - } + Map hostMap = taskInstanceExecuteObjects.getHostMap(); for (StepInstanceDTO stepInstance : stepInstanceList) { if (!stepInstance.isStepContainsExecuteObject()) { @@ -539,11 +528,11 @@ private void setHostAgentId(boolean isUsingGseV2, HostDTO host, Set inv } } - private void fillTargetHostDetail(StepInstanceDTO stepInstance, Map hostMap) { + private void fillTargetHostDetail(StepInstanceDTO stepInstance, Map hostMap) { fillHostsDetail(stepInstance.getTargetExecuteObjects(), hostMap); } - private void fillFileSourceHostDetail(StepInstanceDTO stepInstance, Map hostMap) { + private void fillFileSourceHostDetail(StepInstanceDTO stepInstance, Map hostMap) { if (stepInstance.getExecuteType() == SEND_FILE) { List fileSourceList = stepInstance.getFileSourceList(); if (fileSourceList != null) { @@ -554,7 +543,7 @@ private void fillFileSourceHostDetail(StepInstanceDTO stepInstance, Map hostMap) { + private void fillHostsDetail(ExecuteTargetDTO executeTargetDTO, Map hostMap) { if (executeTargetDTO != null) { fillHostsDetail(executeTargetDTO.getStaticIpList(), hostMap); if (CollectionUtils.isNotEmpty(executeTargetDTO.getDynamicServerGroups())) { @@ -567,17 +556,9 @@ private void fillHostsDetail(ExecuteTargetDTO executeTargetDTO, Map hosts, Map hostMap) { + private void fillHostsDetail(Collection hosts, Map hostMap) { if (CollectionUtils.isNotEmpty(hosts)) { - hosts.forEach(host -> { - HostDTO hostDetail; - if (host.getHostId() != null) { - hostDetail = hostMap.get("hostId:" + host.getHostId()); - } else { - hostDetail = hostMap.get("hostIp:" + host.toCloudIp()); - } - host.updateByHost(hostDetail); - }); + hosts.forEach(host -> host.updateByHost(hostMap.get(host.getUniqueKey()))); } } @@ -782,27 +763,85 @@ private void mergeExecuteObjects(List stepInstanceList, } } - private void checkExecuteObjectExist(TaskInstanceExecuteObjects taskInstanceExecuteObjects) { - List notExistExecuteObjectList = new ArrayList<>(); + private void checkExecuteObjectExist(TaskInstanceDTO taskInstance, + List stepInstanceList, + TaskInstanceExecuteObjects taskInstanceExecuteObjects) { + List invalidExecuteObjects = new ArrayList<>(); + + // 处理主机执行对象 if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotExistHosts())) { - notExistExecuteObjectList.addAll(taskInstanceExecuteObjects.getNotExistHosts().stream() - .map(this::printHostIdOrIp).collect(Collectors.toList())); + if (shouldIgnoreInvalidHost(taskInstance)) { + // 忽略主机不存在错误,并标识执行对象的 invalid 属性为 true + markExecuteObjectInvalid(stepInstanceList, taskInstanceExecuteObjects.getNotExistHosts()); + } else { + invalidExecuteObjects.addAll(taskInstanceExecuteObjects.getNotExistHosts().stream() + .map(this::printHostIdOrIp).collect(Collectors.toList())); + } } + + // 处理容器执行对象 if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotExistContainerIds())) { - notExistExecuteObjectList.addAll( + invalidExecuteObjects.addAll( taskInstanceExecuteObjects.getNotExistContainerIds().stream() .map(containerId -> "(container_id:" + containerId + ")") .collect(Collectors.toList())); } - if (CollectionUtils.isNotEmpty(notExistExecuteObjectList)) { - String executeObjectStr = StringUtils.join(notExistExecuteObjectList, ","); - log.warn("The following execute object are not exist, notExistExecuteObjectList={}", - notExistExecuteObjectList); + + if (CollectionUtils.isNotEmpty(invalidExecuteObjects)) { + String executeObjectStr = StringUtils.join(invalidExecuteObjects, ","); + log.warn("The following execute object are not exist, invalidExecuteObjects={}", + invalidExecuteObjects); throw new FailedPreconditionException(ErrorCode.EXECUTE_OBJECT_NOT_EXIST, - new Object[]{notExistExecuteObjectList.size(), executeObjectStr}); + new Object[]{invalidExecuteObjects.size(), executeObjectStr}); } } + private void markExecuteObjectInvalid(List stepInstanceList, + List invalidHost) { + for (StepInstanceDTO stepInstance : stepInstanceList) { + if (!stepInstance.isStepContainsExecuteObject()) { + continue; + } + // 检查目标主机 + stepInstance.getTargetExecuteObjects().getExecuteObjectsCompatibly().stream() + .filter(ExecuteObject::isHostExecuteObject) + .forEach(executeObject -> { + if (invalidHost.contains(executeObject.getHost())) { + executeObject.setInvalid(true); + } + }); + // 如果是文件分发任务,检查文件源 + if (stepInstance.isFileStep()) { + List fileSourceList = stepInstance.getFileSourceList(); + if (CollectionUtils.isEmpty(fileSourceList)) { + return; + } + for (FileSourceDTO fileSource : fileSourceList) { + // 远程文件分发需要校验文件源主机;其他类型不需要 + if (fileSource.getFileType().equals(TaskFileTypeEnum.SERVER.getType())) { + ExecuteTargetDTO executeTarget = fileSource.getServers(); + if (executeTarget == null || + CollectionUtils.isEmpty(executeTarget.getExecuteObjectsCompatibly())) { + continue; + } + executeTarget.getExecuteObjectsCompatibly().stream() + .filter(ExecuteObject::isHostExecuteObject) + .forEach(executeObject -> { + if (invalidHost.contains(executeObject.getHost())) { + executeObject.setInvalid(true); + } + }); + } + } + } + } + } + + private boolean shouldIgnoreInvalidHost(TaskInstanceDTO taskInstance) { + // 定时任务忽略非法主机,继续执行 + return TaskStartupModeEnum.getStartupMode(taskInstance.getStartupMode()) == TaskStartupModeEnum.CRON; + } + private void throwHostInvalidException(Long appId, Collection invalidHosts) { ServiceApplicationDTO application = applicationService.getAppById(appId); String appName = application.getName(); @@ -816,11 +855,11 @@ private void throwHostInvalidException(Long appId, Collection invalidHo /** * 判断执行对象是否可以被当前作业使用 * - * @param appId 业务 ID + * @param taskInstance 作业实例 * @param stepInstanceList 作业步骤列表 * @param taskInstanceExecuteObjects 作业实例中包含的执行对象 */ - private void checkExecuteObjectAccessible(long appId, + private void checkExecuteObjectAccessible(TaskInstanceDTO taskInstance, List stepInstanceList, TaskInstanceExecuteObjects taskInstanceExecuteObjects) { if (CollectionUtils.isEmpty(taskInstanceExecuteObjects.getNotInAppHosts())) { @@ -833,6 +872,7 @@ private void checkExecuteObjectAccessible(long appId, .collect(Collectors.toMap(HostDTO::getHostId, host -> host, (host1, host2) -> host2)); // 非法的主机 + boolean shouldIgnoreInvalidHost = shouldIgnoreInvalidHost(taskInstance); Set invalidHosts = new HashSet<>(); for (StepInstanceDTO stepInstance : stepInstanceList) { if (!stepInstance.isStepContainsExecuteObject()) { @@ -844,17 +884,22 @@ private void checkExecuteObjectAccessible(long appId, .filter(ExecuteObject::isHostExecuteObject) .forEach(executeObject -> { if (isHostUnAccessible(stepType, executeObject.getHost(), notInAppHostMap, whileHostAllowActions)) { - invalidHosts.add(executeObject.getHost()); + if (shouldIgnoreInvalidHost) { + executeObject.setInvalid(true); + } else { + invalidHosts.add(executeObject.getHost()); + } } }); // 如果是文件分发任务,检查文件源 - checkFileSourceHostAccessible(invalidHosts, stepInstance, stepType, notInAppHostMap, whileHostAllowActions); + checkFileSourceHostAccessible(invalidHosts, stepInstance, stepType, notInAppHostMap, + whileHostAllowActions, shouldIgnoreInvalidHost); } if (CollectionUtils.isNotEmpty(invalidHosts)) { // 检查是否在白名单配置 - log.warn("Found hosts not in target app: {}!", appId); - throwHostInvalidException(appId, invalidHosts); + log.warn("Found hosts not in target app: {}!", taskInstance.getAppId()); + throwHostInvalidException(taskInstance.getAppId(), invalidHosts); } } @@ -862,7 +907,8 @@ private void checkFileSourceHostAccessible(Set invalidHosts, StepInstanceDTO stepInstance, TaskStepTypeEnum stepType, Map notInAppHostMap, - Map> whileHostAllowActions) { + Map> whileHostAllowActions, + boolean ignoreInvalidHost) { if (!stepInstance.isFileStep()) { return; } @@ -882,7 +928,11 @@ private void checkFileSourceHostAccessible(Set invalidHosts, .forEach(executeObject -> { if (isHostUnAccessible(stepType, executeObject.getHost(), notInAppHostMap, whileHostAllowActions)) { - invalidHosts.add(executeObject.getHost()); + if (ignoreInvalidHost) { + executeObject.setInvalid(true); + } else { + invalidHosts.add(executeObject.getHost()); + } } }); } diff --git a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message.properties b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message.properties index 2cc04420cb..38d7a51635 100644 --- a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message.properties +++ b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message.properties @@ -74,7 +74,7 @@ agent.task.status.gse_file_task_error=文件传输错误 agent.task.status.gse_task_error=任务执行出错 agent.task.status.gse_task_terminate_success=任务强制终止成功 agent.task.status.gse_task_terminate_failed=任务强制终止失败 -agent.task.status.host_not_exist=无效主机 +agent.task.status.invalid_execute_object=无效执行对象 agent.task.status.unknown=未知 agent.task.status.agent_not_installed=Agent 未安装 diff --git a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en.properties b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en.properties index 0449b22021..b7774ef8d2 100644 --- a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en.properties +++ b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en.properties @@ -74,7 +74,7 @@ agent.task.status.gse_file_task_error=File Transmission Error agent.task.status.gse_task_error=Execution Error agent.task.status.gse_task_terminate_success=Force Terminated agent.task.status.gse_task_terminate_failed=Force Terminate Failed -agent.task.status.host_not_exist=Invalid host +agent.task.status.invalid_execute_object=Invalid Execute Object agent.task.status.unknown=Unknown agent.task.status.agent_not_installed=Agent not installed diff --git a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en_US.properties b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en_US.properties index 0449b22021..b7774ef8d2 100644 --- a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en_US.properties +++ b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_en_US.properties @@ -74,7 +74,7 @@ agent.task.status.gse_file_task_error=File Transmission Error agent.task.status.gse_task_error=Execution Error agent.task.status.gse_task_terminate_success=Force Terminated agent.task.status.gse_task_terminate_failed=Force Terminate Failed -agent.task.status.host_not_exist=Invalid host +agent.task.status.invalid_execute_object=Invalid Execute Object agent.task.status.unknown=Unknown agent.task.status.agent_not_installed=Agent not installed diff --git a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh.properties b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh.properties index 2cc04420cb..38d7a51635 100644 --- a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh.properties +++ b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh.properties @@ -74,7 +74,7 @@ agent.task.status.gse_file_task_error=文件传输错误 agent.task.status.gse_task_error=任务执行出错 agent.task.status.gse_task_terminate_success=任务强制终止成功 agent.task.status.gse_task_terminate_failed=任务强制终止失败 -agent.task.status.host_not_exist=无效主机 +agent.task.status.invalid_execute_object=无效执行对象 agent.task.status.unknown=未知 agent.task.status.agent_not_installed=Agent 未安装 diff --git a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh_CN.properties b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh_CN.properties index 2cc04420cb..38d7a51635 100644 --- a/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh_CN.properties +++ b/src/backend/job-execute/service-job-execute/src/main/resources/i18n/message_zh_CN.properties @@ -74,7 +74,7 @@ agent.task.status.gse_file_task_error=文件传输错误 agent.task.status.gse_task_error=任务执行出错 agent.task.status.gse_task_terminate_success=任务强制终止成功 agent.task.status.gse_task_terminate_failed=任务强制终止失败 -agent.task.status.host_not_exist=无效主机 +agent.task.status.invalid_execute_object=无效执行对象 agent.task.status.unknown=未知 agent.task.status.agent_not_installed=Agent 未安装