From 8bd6ccb0dec9b7c6672f369afaf2db3c3c55af71 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sat, 1 Mar 2025 18:34:39 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E5=AE=8C=E5=96=84=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=92=8C=E6=B6=88=E6=81=AF=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/actions/__init__.py | 12 ++++++++++- app/actions/add_download.py | 2 +- app/actions/add_subscribe.py | 2 +- app/actions/fetch_medias.py | 2 +- app/actions/fetch_rss.py | 4 ++-- app/actions/fetch_torrents.py | 2 +- app/actions/filter_medias.py | 2 +- app/actions/filter_torrents.py | 2 +- app/actions/scan_file.py | 7 +++--- app/actions/scrape_file.py | 2 +- app/actions/send_event.py | 22 +++++++------------ app/actions/send_message.py | 39 +++++++++++++++++++++------------- app/chain/workflow.py | 26 ++++++++++++++++++----- app/core/workflow.py | 9 ++++---- app/schemas/types.py | 2 ++ app/schemas/workflow.py | 17 ++++++++++----- 16 files changed, 95 insertions(+), 57 deletions(-) diff --git a/app/actions/__init__.py b/app/actions/__init__.py index 95116fa2b..6b1c41fdf 100644 --- a/app/actions/__init__.py +++ b/app/actions/__init__.py @@ -15,6 +15,8 @@ class BaseAction(ABC): # 完成标志 _done_flag = False + # 执行信息 + _message = "" @classmethod @property @@ -49,10 +51,18 @@ def success(self) -> bool: """ pass - def job_done(self): + @property + def message(self) -> str: + """ + 执行信息 + """ + return self._message + + def job_done(self, message: str = None): """ 标记动作完成 """ + self._message = message self._done_flag = True @abstractmethod diff --git a/app/actions/add_download.py b/app/actions/add_download.py index 072cef029..b5486d281 100644 --- a/app/actions/add_download.py +++ b/app/actions/add_download.py @@ -104,5 +104,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Ac [DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads] ) - self.job_done() + self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务") return context diff --git a/app/actions/add_subscribe.py b/app/actions/add_subscribe.py index c542c2e97..779b8d236 100644 --- a/app/actions/add_subscribe.py +++ b/app/actions/add_subscribe.py @@ -77,5 +77,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act for sid in self._added_subscribes: context.subscribes.append(self.subscribeoper.get(sid)) - self.job_done() + self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅") return context diff --git a/app/actions/fetch_medias.py b/app/actions/fetch_medias.py index f60307d44..039108379 100644 --- a/app/actions/fetch_medias.py +++ b/app/actions/fetch_medias.py @@ -166,5 +166,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act if self._medias: context.medias.extend(self._medias) - self.job_done() + self.job_done(f"获取到 {len(self._medias)} 条媒数据") return context diff --git a/app/actions/fetch_rss.py b/app/actions/fetch_rss.py index d11cad796..1c3fd5d3e 100644 --- a/app/actions/fetch_rss.py +++ b/app/actions/fetch_rss.py @@ -105,8 +105,8 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo)) if self._rss_torrents: - logger.info(f"已获取 {len(self._rss_torrents)} 个RSS资源") + logger.info(f"获取到 {len(self._rss_torrents)} 个RSS资源") context.torrents.extend(self._rss_torrents) - self.job_done() + self.job_done(f"获取到 {len(self._rss_torrents)} 个资源") return context diff --git a/app/actions/fetch_torrents.py b/app/actions/fetch_torrents.py index 2666ecb29..eee5a1914 100644 --- a/app/actions/fetch_torrents.py +++ b/app/actions/fetch_torrents.py @@ -97,5 +97,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act context.torrents.extend(self._torrents) logger.info(f"共搜索到 {len(self._torrents)} 条资源") - self.job_done() + self.job_done(f"搜索到 {len(self._torrents)} 个资源") return context diff --git a/app/actions/filter_medias.py b/app/actions/filter_medias.py index 24c74ce03..a1e3b5fe2 100644 --- a/app/actions/filter_medias.py +++ b/app/actions/filter_medias.py @@ -64,5 +64,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act if self._medias: context.medias = self._medias - self.job_done() + self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据") return context diff --git a/app/actions/filter_torrents.py b/app/actions/filter_torrents.py index c8f1c4f35..912df8f05 100644 --- a/app/actions/filter_torrents.py +++ b/app/actions/filter_torrents.py @@ -80,5 +80,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act context.torrents = self._torrents - self.job_done() + self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源") return context diff --git a/app/actions/scan_file.py b/app/actions/scan_file.py index 82b7f2563..224b2ce67 100644 --- a/app/actions/scan_file.py +++ b/app/actions/scan_file.py @@ -1,14 +1,13 @@ -import copy from pathlib import Path from typing import Optional from pydantic import Field from app.actions import BaseAction -from app.core.config import global_vars, settings -from app.schemas import ActionParams, ActionContext from app.chain.storage import StorageChain +from app.core.config import global_vars, settings from app.log import logger +from app.schemas import ActionParams, ActionContext class ScanFileParams(ActionParams): @@ -74,5 +73,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act if self._fileitems: context.fileitems.extend(self._fileitems) - self.job_done() + self.job_done(f"扫描到 {len(self._fileitems)} 个文件") return context diff --git a/app/actions/scrape_file.py b/app/actions/scrape_file.py index 3c55b4b27..a0123ff88 100644 --- a/app/actions/scrape_file.py +++ b/app/actions/scrape_file.py @@ -68,5 +68,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo) self._scraped_files.append(fileitem) - self.job_done() + self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件") return context diff --git a/app/actions/send_event.py b/app/actions/send_event.py index 07c6d6d05..0b8ceecd1 100644 --- a/app/actions/send_event.py +++ b/app/actions/send_event.py @@ -1,9 +1,7 @@ -import copy - from app.actions import BaseAction -from app.core.config import global_vars -from app.schemas import ActionParams, ActionContext from app.core.event import eventmanager +from app.schemas import ActionParams, ActionContext +from app.schemas.types import ChainEventType class SendEventParams(ActionParams): @@ -26,7 +24,7 @@ def name(cls) -> str: @classmethod @property def description(cls) -> str: - return "发送队列中的所有事件" + return "发送任务执行事件" @classmethod @property @@ -39,16 +37,12 @@ def success(self) -> bool: def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext: """ - 发送events中的事件 + 发送工作流事件,以更插件干预工作流执行 """ - if context.events: - # 按优先级排序,优先级高的先发送 - context.events.sort(key=lambda x: x.priority, reverse=True) - for event in copy.deepcopy(context.events): - if global_vars.is_workflow_stopped(workflow_id): - break - eventmanager.send_event(etype=event.event_type, data=event.event_data) - context.events.remove(event) + # 触发资源下载事件,更新执行上下文 + event = eventmanager.send_event(ChainEventType.WorkflowExecution, context) + if event and event.event_data: + context = event.event_data self.job_done() return context diff --git a/app/actions/send_message.py b/app/actions/send_message.py index d6a6b1de1..16a960fbc 100644 --- a/app/actions/send_message.py +++ b/app/actions/send_message.py @@ -1,11 +1,9 @@ -import copy from typing import List, Optional, Union from pydantic import Field from app.actions import BaseAction, ActionChain -from app.core.config import global_vars -from app.schemas import ActionParams, ActionContext +from app.schemas import ActionParams, ActionContext, Notification class SendMessageParams(ActionParams): @@ -33,7 +31,7 @@ def name(cls) -> str: @classmethod @property def description(cls) -> str: - return "发送队列中的所有消息" + return "发送任务执行消息" @classmethod @property @@ -48,17 +46,28 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act """ 发送messages中的消息 """ - for message in copy.deepcopy(context.messages): - if global_vars.is_workflow_stopped(workflow_id): - break - if params.client: - message.source = params.client - if params.userid: - message.userid = params.userid - self.chain.post_message(message) - context.messages.remove(message) - - context.messages = [] + params = SendMessageParams(**params) + msg_text = f"当前进度:{context.progress}%" + index = 1 + if context.execute_history: + for history in context.execute_history: + if not history.message: + continue + msg_text += f"\n{index}. {history.action}:{history.message}" + index += 1 + # 发送消息 + if not params.client: + params.client = [None] + for client in params.client: + self.chain.post_message( + Notification( + source=client, + userid=params.userid, + title="【工作流执行结果】", + text=msg_text, + link="#/workflow" + ) + ) self.job_done() return context diff --git a/app/chain/workflow.py b/app/chain/workflow.py index c5961cf52..d237eae96 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -14,7 +14,7 @@ from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper from app.log import logger -from app.schemas import ActionContext, ActionFlow, Action +from app.schemas import ActionContext, ActionFlow, Action, ActionExecution class WorkflowExecutor: @@ -33,6 +33,8 @@ def __init__(self, workflow: Workflow, step_callback: Callable = None): self.step_callback = step_callback self.actions = {action['id']: Action(**action) for action in workflow.actions} self.flows = [ActionFlow(**flow) for flow in workflow.flows] + self.total_actions = len(self.actions) + self.finished_actions = 0 self.success = True self.errmsg = "" @@ -115,21 +117,35 @@ def execute(self): ) future.add_done_callback(self.on_node_complete) - def execute_node(self, workflow_id: int, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]: + def execute_node(self, workflow_id: int, node_id: int, + context: ActionContext) -> Tuple[Action, bool, str, ActionContext]: """ 执行单个节点操作,返回修改后的上下文和节点ID """ action = self.actions[node_id] - state, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context) - return action, state, result_ctx + state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context) + return action, state, message, result_ctx def on_node_complete(self, future): """ 节点完成回调:更新上下文、处理后继节点 """ - action, state, result_ctx = future.result() + action, state, message, result_ctx = future.result() try: + self.finished_actions += 1 + # 更新当前进度 + self.context.progress = round(self.finished_actions / self.total_actions) * 100 + + # 补充执行历史 + self.context.execute_history.append( + ActionExecution( + action=action.name, + result=state, + message=message + ) + ) + # 节点执行失败 if not state: self.success = False diff --git a/app/core/workflow.py b/app/core/workflow.py index 01e640c3b..18553ffe6 100644 --- a/app/core/workflow.py +++ b/app/core/workflow.py @@ -55,7 +55,8 @@ def stop(self): """ pass - def excute(self, workflow_id: int, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]: + def excute(self, workflow_id: int, action: Action, + context: ActionContext = None) -> Tuple[bool, str, ActionContext]: """ 执行工作流动作 """ @@ -70,7 +71,7 @@ def excute(self, workflow_id: int, action: Action, context: ActionContext = None result_context = action_obj.execute(workflow_id, action.data, context) except Exception as err: logger.error(f"{action.name} 执行失败: {err}") - return False, context + return False, f"{err}", context loop = action.data.get("loop") loop_interval = action.data.get("loop_interval") if loop and loop_interval: @@ -87,10 +88,10 @@ def excute(self, workflow_id: int, action: Action, context: ActionContext = None logger.info(f"{action.name} 执行成功") else: logger.error(f"{action.name} 执行失败!") - return action_obj.success, result_context + return action_obj.success, action_obj.message, result_context else: logger.error(f"未找到动作: {action.type} - {action.name}") - return False, context + return False, " ", context def list_actions(self) -> List[dict]: """ diff --git a/app/schemas/types.py b/app/schemas/types.py index b24fd786d..f177f8e7f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -87,6 +87,8 @@ class ChainEventType(Enum): MediaRecognizeConvert = "media.recognize.convert" # 推荐数据源 RecommendSource = "recommend.source" + # 工作流执行 + WorkflowExecution = "workflow.execution" # 系统配置Key字典 diff --git a/app/schemas/workflow.py b/app/schemas/workflow.py index 4985c5a5b..4934c1cb8 100644 --- a/app/schemas/workflow.py +++ b/app/schemas/workflow.py @@ -3,12 +3,10 @@ from pydantic import BaseModel, Field from app.schemas.context import Context, MediaInfo -from app.schemas.file import FileItem from app.schemas.download import DownloadTask +from app.schemas.file import FileItem from app.schemas.site import Site from app.schemas.subscribe import Subscribe -from app.schemas.message import Notification -from app.schemas.event import Event class Workflow(BaseModel): @@ -52,6 +50,15 @@ class Action(BaseModel): data: Optional[dict] = Field({}, description="参数") +class ActionExecution(BaseModel): + """ + 动作执行情况 + """ + action: Optional[str] = Field(None, description="当前动作(名称)") + result: Optional[bool] = Field(None, description="执行结果") + message: Optional[str] = Field(None, description="执行消息") + + class ActionContext(BaseModel): """ 动作基础上下文,各动作通用数据 @@ -63,8 +70,8 @@ class ActionContext(BaseModel): downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表") sites: Optional[List[Site]] = Field([], description="站点列表") subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表") - messages: Optional[List[Notification]] = Field([], description="消息列表") - events: Optional[List[Event]] = Field([], description="事件列表") + execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史") + progress: Optional[int] = Field(0, description="执行进度(%)") class ActionFlow(BaseModel):