Skip to content

Commit

Permalink
feat: 后台 Job 任务支持在非全业务集下执行 (closed TencentBlueKing#2158)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed Apr 25, 2024
1 parent 2732949 commit efe6a88
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 35 deletions.
5 changes: 5 additions & 0 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ def get_meta(data) -> Dict[str, Any]:
meta["STEPS"] = []
return meta

@staticmethod
def get_job_meta(data) -> Dict[str, Any]:
meta: Dict[str, Any] = data.get_one_of_inputs("meta", {})
return meta

@classmethod
def get_common_data(cls, data):
"""
Expand Down
47 changes: 32 additions & 15 deletions apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ def request_single_job_and_create_map(
script_language = (constants.ScriptLanguageType.SHELL.value, constants.ScriptLanguageType.BAT.value)[
os_type == constants.OsType.WINDOWS
]
meta: Dict[str, Union[str, int]] = job_params.pop("meta")
scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
job_params.update(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"script_language": script_language,
"script_content": process_parms(job_params.get("script_content", "")),
"script_param": process_parms(job_params.get("script_param", "")),
Expand Down Expand Up @@ -188,17 +191,20 @@ def generate_api_params_log(
job_params.pop("target_server", None)
return json.dumps(job_params, indent=2)

def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) -> List[int]:
def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap, meta: Dict[str, Any]) -> List[int]:
"""
处理作业平台执行结果
:param job_sub_map: 作业平台ID映射
:param meta: 注入实例的meta信息
:return: succeed_sub_inst_ids
"""
scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
ip_results = JobApi.get_job_instance_status(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"job_instance_id": job_sub_map.job_instance_id,
"return_ip_result": True,
}
Expand Down Expand Up @@ -262,17 +268,20 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) ->
succeed_sub_inst_ids.append(sub_inst.id)
return succeed_sub_inst_ids

def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap):
def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap, meta: Dict[str, Any]):
"""
查询作业平台执行状态
:param job_sub_map:
:param meta: 注入实例的meta信息
:return:
"""
scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
result = JobApi.get_job_instance_status(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"job_instance_id": job_sub_map.job_instance_id,
"return_ip_result": False,
}
Expand All @@ -290,7 +299,7 @@ def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionIns
return

# 其它都认为存在失败的情况,需要具体查作业平台的接口查IP详情
self.handler_job_result(job_sub_map)
self.handler_job_result(job_sub_map, meta)

job_sub_map.status = job_status
job_sub_map.save()
Expand All @@ -316,11 +325,12 @@ def skip_polling_result_by_os_types(self, os_types: Optional[List[str]] = None):
).update(status=constants.BkJobStatus.SUCCEEDED)

def _schedule(self, data, parent_data, callback_data=None):
job_meta = self.get_job_meta(data)
polling_time = data.get_one_of_outputs("polling_time") or 0
skip_polling_result = data.get_one_of_inputs("skip_polling_result", default=False)
# 查询未完成的作业, 批量查询作业状态并更新DB
multi_params = [
{"job_sub_map": job_sub_map}
{"job_sub_map": job_sub_map, "meta": job_meta}
for job_sub_map in models.JobSubscriptionInstanceMap.objects.filter(
node_id=self.id, status=constants.BkJobStatus.PENDING
)
Expand All @@ -346,7 +356,7 @@ def _schedule(self, data, parent_data, callback_data=None):
node_id=self.id, status=constants.BkJobStatus.PENDING
)
handler_job_result_params_list = [
{"job_sub_map": pending_job_sub_map} for pending_job_sub_map in pending_job_sub_maps
{"job_sub_map": pending_job_sub_map, "meta": job_meta} for pending_job_sub_map in pending_job_sub_maps
]
# 挽救策略,查询作业中已完成的节点,避免全部误判为超时失败
succeed_sub_inst_ids: Set[int] = set(
Expand Down Expand Up @@ -427,7 +437,7 @@ def script_name(self):
return ""

def _execute(self, data, parent_data, common_data: CommonData):

job_meta = self.get_job_meta(data)
timeout = data.get_one_of_inputs("timeout")
# 批量请求作业平台的参数
multi_job_params_map: Dict[str, Dict[str, Any]] = defaultdict(lambda: defaultdict(list))
Expand All @@ -450,6 +460,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
sub_inst=sub_inst,
host_infos=target_servers,
)
multi_job_params_map[md5_key]["job_params"]["meta"] = job_meta
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_execute_script,
Expand All @@ -461,6 +472,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
"script_param": script_param,
"timeout": timeout,
"os_type": self.get_job_param_os_type(host_obj),
"meta": job_meta,
},
}

Expand Down Expand Up @@ -495,6 +507,7 @@ def inputs_format(self):
]

def _execute(self, data, parent_data, common_data: CommonData):
job_meta = self.get_job_meta(data)
timeout = data.get_one_of_inputs("timeout")
# 批量请求作业平台的参数
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
Expand All @@ -518,6 +531,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
host_infos=target_servers,
sub_inst=sub_inst,
)
multi_job_params_map[md5_key]["job_params"]["meta"] = job_meta
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_transfer_file,
Expand All @@ -529,6 +543,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
"file_source_list": [{"file_list": file_list}],
"timeout": timeout,
"os_type": self.get_job_param_os_type(host_obj),
"meta": job_meta,
},
}

Expand Down Expand Up @@ -591,6 +606,7 @@ def cal_job_unique_key(self, config_info_list: List[Dict[str, Any]], file_target
return f"{'-'.join(sorted(config_unique_keys))}-{file_target_path}"

def _execute(self, data, parent_data, common_data: CommonData):
job_meta = self.get_job_meta(data)
timeout = data.get_one_of_inputs("timeout")
# 批量请求作业平台的参数
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
Expand All @@ -609,6 +625,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
host_obj=host_obj,
sub_inst=sub_inst,
)
multi_job_params_map[job_unique_key]["job_params"]["meta"] = job_meta
else:
file_source_list = []
for config_info in config_info_list:
Expand Down
49 changes: 34 additions & 15 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ class TransferPackageService(JobV3BaseService, PluginBaseService):
"""调用作业平台传输插件包"""

def _execute(self, data, parent_data, common_data: PluginCommonData):
job_meta = self.get_job_meta(data)
process_statuses = common_data.process_statuses
group_id_instance_map = common_data.group_id_instance_map
host_id_obj_map = common_data.host_id_obj_map
Expand Down Expand Up @@ -567,6 +568,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"file_source_list": [{"file_list": file_list}],
"os_type": job["os_type"],
"target_server": {"ip_list": job["ip_list"], "host_id_list": job["host_id_list"]},
"meta": job_meta,
},
}
)
Expand Down Expand Up @@ -621,6 +623,7 @@ def need_skipped(self, process_status: models.ProcessStatus, common_data: Plugin
return False

def _execute(self, data, parent_data, common_data: PluginCommonData):
job_meta = self.get_job_meta(data)
process_statuses = common_data.process_statuses
timeout = data.get_one_of_inputs("timeout")
group_id_instance_map = common_data.group_id_instance_map
Expand Down Expand Up @@ -652,6 +655,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}
)
multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append(host.bk_host_id)
multi_job_params_map[key]["job_params"]["meta"] = job_meta
else:
multi_job_params_map[key] = {
"job_func": JobApi.fast_execute_script,
Expand All @@ -666,6 +670,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"script_param": script_param,
"timeout": timeout,
"os_type": host.os_type,
"meta": job_meta,
},
}
self.run_job_or_finish_schedule(multi_job_params_map)
Expand Down Expand Up @@ -827,15 +832,17 @@ def allocate_port_to_process_status(
subscription_instance: models.SubscriptionInstanceRecord,
job_instance_id: int,
step_instance_id: int,
meta: Dict[str, Any],
):
"""根据job返回日志分配端口"""
bk_host_id = process_status.bk_host_id

scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
# 查询并解析该主机已被占用的端口号
instance_log_base_params: Dict[str, Union[str, int]] = {
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"job_instance_id": job_instance_id,
"step_instance_id": step_instance_id,
}
Expand Down Expand Up @@ -870,18 +877,22 @@ def allocate_port_to_process_status(
[subscription_instance.id], _("主机[{}]在ip->[{}]上无可用端口").format(host.inner_ip, listen_ip)
)

def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap, common_data: PluginCommonData):
def get_job_instance_status(
self, job_sub_map: models.JobSubscriptionInstanceMap, common_data: PluginCommonData, meta: Dict[str, Any]
):
"""查询作业平台执行状态"""
bk_host_ids = common_data.bk_host_ids
process_statuses = common_data.process_statuses
group_id_instance_map = common_data.group_id_instance_map
host_id_obj_map: Dict[int, models.Host] = models.Host.host_id_obj_map(bk_host_id__in=bk_host_ids)

scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
result = JobApi.get_job_instance_status(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"job_instance_id": job_sub_map.job_instance_id,
"return_ip_result": True,
}
Expand Down Expand Up @@ -910,6 +921,7 @@ def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap
"subscription_instance": subscription_instance,
"job_instance_id": job_sub_map.job_instance_id,
"step_instance_id": step_instance_id,
"meta": meta,
}
)
request_multi_thread(self.allocate_port_to_process_status, multi_allocate_params)
Expand All @@ -918,8 +930,9 @@ def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap

def _schedule(self, data, parent_data, callback_data=None):
# 查询未完成的作业, 批量查询作业状态并更新DB
job_meta = self.get_job_meta(data)
multi_params = [
{"job_sub_map": job_sub_map, "common_data": self.get_common_data(data)}
{"job_sub_map": job_sub_map, "common_data": self.get_common_data(data), "meta": job_meta}
for job_sub_map in models.JobSubscriptionInstanceMap.objects.filter(
node_id=self.id, status=constants.BkJobStatus.PENDING
)
Expand All @@ -939,6 +952,7 @@ class RenderAndPushConfigService(PluginBaseService, JobV3BaseService):
"""

def _execute(self, data, parent_data, common_data: PluginCommonData):
job_meta = self.get_job_meta(data)
subscription_step_id = data.get_one_of_inputs("subscription_step_id")
process_statuses = common_data.process_statuses
policy_step_adapter = common_data.policy_step_adapter
Expand Down Expand Up @@ -993,6 +1007,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
host_obj=target_host,
sub_inst=subscription_instance,
)
multi_job_params_map[key]["job_params"]["meta"] = job_meta
else:
multi_job_params_map[key] = {
"job_func": JobApi.push_config_file,
Expand All @@ -1011,6 +1026,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
},
"file_target_path": file_target_path,
"file_list": [{"file_name": file_name, "content": process_parms(file_content)}],
"meta": job_meta,
},
}

Expand Down Expand Up @@ -1366,12 +1382,15 @@ def _schedule(self, data, parent_data, callback_data=None):
job_sub_inst_map = models.JobSubscriptionInstanceMap.objects.filter(node_id=self.id).first()
subscription_instance_id = job_sub_inst_map.subscription_instance_ids[0]
job_instance_id = job_sub_inst_map.job_instance_id
job_meta = self.get_job_meta(data)
scope_id = job_meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID)
scope_type = job_meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)

result = JobApi.get_job_instance_status(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"job_instance_id": job_instance_id,
"return_ip_result": True,
}
Expand All @@ -1389,9 +1408,9 @@ def _schedule(self, data, parent_data, callback_data=None):

instance_log_base_params: Dict[str, Union[str, int]] = {
"job_instance_id": job_instance_id,
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_biz_id": scope_id,
"bk_scope_type": scope_type,
"bk_scope_id": scope_id,
"bk_username": settings.BACKEND_JOB_OPERATOR,
"step_instance_id": result["step_instance_list"][0]["step_instance_id"],
}
Expand Down
9 changes: 6 additions & 3 deletions apps/backend/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from apps.node_man import tools as node_man_tools
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.prometheus import metrics
from apps.utils import translation
from apps.utils import md5, translation
from pipeline import builder
from pipeline.builder import Data, NodeOutput, ServiceActivity, Var
from pipeline.core.pipeline import Pipeline
Expand Down Expand Up @@ -211,16 +211,19 @@ def create_pipeline(
}

sub_insts_gby_metadata: Dict[str, List[models.SubscriptionInstanceRecord]] = defaultdict(list)
md5_value__metadata = {}
for instance_id, step_actions in instances_action.items():
if instance_id not in subscription_instance_map:
continue
sub_inst = subscription_instance_map[instance_id]
# metadata 包含:meta-任务元数据、step_actions-操作步骤及类型
metadata = {"meta": sub_inst.instance_info["meta"], "step_actions": step_actions}
metadata_md5_value = md5.count_md5(metadata)
if metadata_md5_value not in md5_value__metadata:
md5_value__metadata[metadata_md5_value] = metadata
# 聚合同 metadata 的任务
sub_insts_gby_metadata[json.dumps(metadata)].append(sub_inst)
sub_insts_gby_metadata[json.dumps(md5_value__metadata[metadata_md5_value])].append(sub_inst)

#
# # 把同类型操作进行聚合
# action_instances = defaultdict(list)
# for instance_id, step_actions in instances_action.items():
Expand Down
Loading

0 comments on commit efe6a88

Please sign in to comment.