Skip to content

Commit

Permalink
fix 完善事件和消息发送
Browse files Browse the repository at this point in the history
  • Loading branch information
jxxghp committed Mar 1, 2025
1 parent ed8895d commit 8bd6ccb
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 57 deletions.
12 changes: 11 additions & 1 deletion app/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class BaseAction(ABC):

# 完成标志
_done_flag = False
# 执行信息
_message = ""

@classmethod
@property
Expand Down Expand Up @@ -49,10 +51,18 @@ def success(self) -> bool:
"""
pass

def job_done(self):
@property
def message(self) -> str:
"""
执行信息
"""
return self._message

def job_done(self, message: str = None):
"""
标记动作完成
"""
self._message = message
self._done_flag = True

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion app/actions/add_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Ac
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)

self.job_done()
self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务")
return context
2 changes: 1 addition & 1 deletion app/actions/add_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))

self.job_done()
self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅")
return context
2 changes: 1 addition & 1 deletion app/actions/fetch_medias.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
if self._medias:
context.medias.extend(self._medias)

self.job_done()
self.job_done(f"获取到 {len(self._medias)} 条媒数据")
return context
4 changes: 2 additions & 2 deletions app/actions/fetch_rss.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))

if self._rss_torrents:
logger.info(f"已获取 {len(self._rss_torrents)} 个RSS资源")
logger.info(f"获取到 {len(self._rss_torrents)} 个RSS资源")
context.torrents.extend(self._rss_torrents)

self.job_done()
self.job_done(f"获取到 {len(self._rss_torrents)} 个资源")
return context
2 changes: 1 addition & 1 deletion app/actions/fetch_torrents.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
context.torrents.extend(self._torrents)
logger.info(f"共搜索到 {len(self._torrents)} 条资源")

self.job_done()
self.job_done(f"搜索到 {len(self._torrents)} 个资源")
return context
2 changes: 1 addition & 1 deletion app/actions/filter_medias.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
if self._medias:
context.medias = self._medias

self.job_done()
self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据")
return context
2 changes: 1 addition & 1 deletion app/actions/filter_torrents.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act

context.torrents = self._torrents

self.job_done()
self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源")
return context
7 changes: 3 additions & 4 deletions app/actions/scan_file.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import copy
from pathlib import Path
from typing import Optional

from pydantic import Field

from app.actions import BaseAction
from app.core.config import global_vars, settings
from app.schemas import ActionParams, ActionContext
from app.chain.storage import StorageChain
from app.core.config import global_vars, settings
from app.log import logger
from app.schemas import ActionParams, ActionContext


class ScanFileParams(ActionParams):
Expand Down Expand Up @@ -74,5 +73,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
if self._fileitems:
context.fileitems.extend(self._fileitems)

self.job_done()
self.job_done(f"扫描到 {len(self._fileitems)} 个文件")
return context
2 changes: 1 addition & 1 deletion app/actions/scrape_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)

self.job_done()
self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件")
return context
22 changes: 8 additions & 14 deletions app/actions/send_event.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import copy

from app.actions import BaseAction
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.core.event import eventmanager
from app.schemas import ActionParams, ActionContext
from app.schemas.types import ChainEventType


class SendEventParams(ActionParams):
Expand All @@ -26,7 +24,7 @@ def name(cls) -> str:
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有事件"
return "发送任务执行事件"

@classmethod
@property
Expand All @@ -39,16 +37,12 @@ def success(self) -> bool:

def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
发送events中的事件
发送工作流事件,以更插件干预工作流执行
"""
if context.events:
# 按优先级排序,优先级高的先发送
context.events.sort(key=lambda x: x.priority, reverse=True)
for event in copy.deepcopy(context.events):
if global_vars.is_workflow_stopped(workflow_id):
break
eventmanager.send_event(etype=event.event_type, data=event.event_data)
context.events.remove(event)
# 触发资源下载事件,更新执行上下文
event = eventmanager.send_event(ChainEventType.WorkflowExecution, context)
if event and event.event_data:
context = event.event_data

self.job_done()
return context
39 changes: 24 additions & 15 deletions app/actions/send_message.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import copy
from typing import List, Optional, Union

from pydantic import Field

from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.schemas import ActionParams, ActionContext
from app.schemas import ActionParams, ActionContext, Notification


class SendMessageParams(ActionParams):
Expand Down Expand Up @@ -33,7 +31,7 @@ def name(cls) -> str:
@classmethod
@property
def description(cls) -> str:
return "发送队列中的所有消息"
return "发送任务执行消息"

@classmethod
@property
Expand All @@ -48,17 +46,28 @@ def execute(self, workflow_id: int, params: dict, context: ActionContext) -> Act
"""
发送messages中的消息
"""
for message in copy.deepcopy(context.messages):
if global_vars.is_workflow_stopped(workflow_id):
break
if params.client:
message.source = params.client
if params.userid:
message.userid = params.userid
self.chain.post_message(message)
context.messages.remove(message)

context.messages = []
params = SendMessageParams(**params)
msg_text = f"当前进度:{context.progress}%"
index = 1
if context.execute_history:
for history in context.execute_history:
if not history.message:
continue
msg_text += f"\n{index}. {history.action}{history.message}"
index += 1
# 发送消息
if not params.client:
params.client = [None]
for client in params.client:
self.chain.post_message(
Notification(
source=client,
userid=params.userid,
title="【工作流执行结果】",
text=msg_text,
link="#/workflow"
)
)

self.job_done()
return context
26 changes: 21 additions & 5 deletions app/chain/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.schemas import ActionContext, ActionFlow, Action
from app.schemas import ActionContext, ActionFlow, Action, ActionExecution


class WorkflowExecutor:
Expand All @@ -33,6 +33,8 @@ def __init__(self, workflow: Workflow, step_callback: Callable = None):
self.step_callback = step_callback
self.actions = {action['id']: Action(**action) for action in workflow.actions}
self.flows = [ActionFlow(**flow) for flow in workflow.flows]
self.total_actions = len(self.actions)
self.finished_actions = 0

self.success = True
self.errmsg = ""
Expand Down Expand Up @@ -115,21 +117,35 @@ def execute(self):
)
future.add_done_callback(self.on_node_complete)

def execute_node(self, workflow_id: int, node_id: int, context: ActionContext) -> Tuple[Action, bool, ActionContext]:
def execute_node(self, workflow_id: int, node_id: int,
context: ActionContext) -> Tuple[Action, bool, str, ActionContext]:
"""
执行单个节点操作,返回修改后的上下文和节点ID
"""
action = self.actions[node_id]
state, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, result_ctx
state, message, result_ctx = self.workflowmanager.excute(workflow_id, action, context=context)
return action, state, message, result_ctx

def on_node_complete(self, future):
"""
节点完成回调:更新上下文、处理后继节点
"""
action, state, result_ctx = future.result()
action, state, message, result_ctx = future.result()

try:
self.finished_actions += 1
# 更新当前进度
self.context.progress = round(self.finished_actions / self.total_actions) * 100

# 补充执行历史
self.context.execute_history.append(
ActionExecution(
action=action.name,
result=state,
message=message
)
)

# 节点执行失败
if not state:
self.success = False
Expand Down
9 changes: 5 additions & 4 deletions app/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def stop(self):
"""
pass

def excute(self, workflow_id: int, action: Action, context: ActionContext = None) -> Tuple[bool, ActionContext]:
def excute(self, workflow_id: int, action: Action,
context: ActionContext = None) -> Tuple[bool, str, ActionContext]:
"""
执行工作流动作
"""
Expand All @@ -70,7 +71,7 @@ def excute(self, workflow_id: int, action: Action, context: ActionContext = None
result_context = action_obj.execute(workflow_id, action.data, context)
except Exception as err:
logger.error(f"{action.name} 执行失败: {err}")
return False, context
return False, f"{err}", context
loop = action.data.get("loop")
loop_interval = action.data.get("loop_interval")
if loop and loop_interval:
Expand All @@ -87,10 +88,10 @@ def excute(self, workflow_id: int, action: Action, context: ActionContext = None
logger.info(f"{action.name} 执行成功")
else:
logger.error(f"{action.name} 执行失败!")
return action_obj.success, result_context
return action_obj.success, action_obj.message, result_context
else:
logger.error(f"未找到动作: {action.type} - {action.name}")
return False, context
return False, " ", context

def list_actions(self) -> List[dict]:
"""
Expand Down
2 changes: 2 additions & 0 deletions app/schemas/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class ChainEventType(Enum):
MediaRecognizeConvert = "media.recognize.convert"
# 推荐数据源
RecommendSource = "recommend.source"
# 工作流执行
WorkflowExecution = "workflow.execution"


# 系统配置Key字典
Expand Down
17 changes: 12 additions & 5 deletions app/schemas/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from pydantic import BaseModel, Field

from app.schemas.context import Context, MediaInfo
from app.schemas.file import FileItem
from app.schemas.download import DownloadTask
from app.schemas.file import FileItem
from app.schemas.site import Site
from app.schemas.subscribe import Subscribe
from app.schemas.message import Notification
from app.schemas.event import Event


class Workflow(BaseModel):
Expand Down Expand Up @@ -52,6 +50,15 @@ class Action(BaseModel):
data: Optional[dict] = Field({}, description="参数")


class ActionExecution(BaseModel):
"""
动作执行情况
"""
action: Optional[str] = Field(None, description="当前动作(名称)")
result: Optional[bool] = Field(None, description="执行结果")
message: Optional[str] = Field(None, description="执行消息")


class ActionContext(BaseModel):
"""
动作基础上下文,各动作通用数据
Expand All @@ -63,8 +70,8 @@ class ActionContext(BaseModel):
downloads: Optional[List[DownloadTask]] = Field([], description="下载任务列表")
sites: Optional[List[Site]] = Field([], description="站点列表")
subscribes: Optional[List[Subscribe]] = Field([], description="订阅列表")
messages: Optional[List[Notification]] = Field([], description="消息列表")
events: Optional[List[Event]] = Field([], description="事件列表")
execute_history: Optional[List[ActionExecution]] = Field([], description="执行历史")
progress: Optional[int] = Field(0, description="执行进度(%)")


class ActionFlow(BaseModel):
Expand Down

0 comments on commit 8bd6ccb

Please sign in to comment.