From 6fa0904ddee88254d4af5c246ff0247c988dfdd2 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Thu, 27 Jul 2023 15:13:12 +0800 Subject: [PATCH] feat(engine): add comments and sync storage execution --- packages/engine/src/FlowModel.ts | 81 +++++++++++++++++++-------- packages/engine/src/Scheduler.ts | 38 ++++++++----- packages/engine/src/recorder/index.ts | 2 +- 3 files changed, 81 insertions(+), 40 deletions(-) diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 74eca8233..98a47d3c9 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -32,10 +32,6 @@ export default class FlowModel { * 流程支持的节点类型 */ nodeModelMap: Map; - /** - * 每一次执行流程都会生成一个唯一的executionId。 - */ - executionId: string; /** * 调度器,用于调度节点的执行。 */ @@ -120,8 +116,43 @@ export default class FlowModel { public setStartNodeType(startNodeType) { this.startNodeType = startNodeType; } + /** + * 解析LogicFlow图数据,将nodes和edges转换成节点格式。 + * 例如: + * graphData: { + * nodes: [ + * { id: 'node1', type: 'StartNode', properties: {} }, + * { id: 'node2', type: 'TaskNode', properties: {} }, + * ], + * edges: [ + * { id: 'edge1', sourceNodeId: 'node1', targetNodeId: 'node2', properties: {} }, + * ] + * } + * 转换成: + * nodeConfigMap: { + * node1: { + * id: 'node1', + * type: 'StartNode', + * properties: {}, + * incoming: [], + * outgoing: [{ id: 'edge1', properties: {}, target: 'node2' }] + * }, + * node2: { + * id: 'node2', + * type: 'TaskNode', + * properties: {}, + * incoming: [{ id: 'edge1', properties: {}, source: 'node1' }], + * outgoing: [], + * } + * } + * 此格式方便后续执行时,根据节点id快速找到节点和执行初始化节点模型。 + * 同时此方法还会找到所有的开始节点,方便后续执行时,从开始节点开始执行。 + * @param graphData 流程图数据 + */ public load(graphData) { const { nodes = [], edges = [] } = graphData; + this.startNodes = []; + this.nodeConfigMap = new Map(); nodes.forEach((node) => { if (this.nodeModelMap.has(node.type)) { const nodeConfig = { @@ -159,11 +190,10 @@ export default class FlowModel { }); } /** - * 执行流程 - * 同一次执行,这次执行内部的节点执行顺序为并行。 - * 多次执行,多次执行之间为串行。 - * 允许一个流程多次执行,效率更高。 - * 例如: + * 执行流程, 每次执行都会生成一个唯一的executionId,用于区分不同的执行。 + * 同一次执行,这次执行内部的节点执行顺序为并行。内部并行是为了避免异步节点阻塞其他节点的执行。 + * 多次执行,多次执行之间为串行,这里选择串行的原因是避免多次执行之间的数据冲突。 + * example: * 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。 * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 @@ -185,7 +215,8 @@ export default class FlowModel { this.createExecution(); } /** - * 创建节点实例 + * 创建节点实例, 每个节点实例都会有一个唯一的taskId。 + * 通过executionId、nodeId、taskId可以唯一确定一个节点的某一次执行。 * @param nodeId 节点Id * @returns 节点示例 */ @@ -208,11 +239,11 @@ export default class FlowModel { ...data, }; } + /** + * 在执行完成后,通知外部此次执行完成。 + * 如果还存在待执行的任务,那么继续执行。 + */ private onTaskFinished(result) { - const { executionId } = result; - if (executionId !== this.executionId) { - return; - } const { callback } = this.executingInstance; if (callback) { callback(result); @@ -224,24 +255,27 @@ export default class FlowModel { this.isRunning = false; } } + /** + * 从待执行队列中取出需要执行的内容。 + * 会依次判断是否有taskId、nodeId、executionId。 + * 若存在taskId,那么表示恢复执行。 + * 若存在nodeId,那么表示从指定节点开始执行。 + * 若都不存在,那么新建一个executionId,从开始节点开始执行。 + */ private createExecution() { const execParams = this.executeQueue.shift(); this.executingInstance = execParams; - if (execParams.executionId) { - this.executionId = execParams.executionId; - } else { - this.executionId = createExecId(); - } // 如果有taskId,那么表示恢复执行 - if (execParams.taskId) { + if (execParams.taskId && execParams.executionId && execParams.nodeId) { this.scheduler.resume({ - executionId: this.executionId, + executionId: execParams.executionId, taskId: execParams.taskId, nodeId: execParams.nodeId, data: execParams.data, }); return; } + const executionId = execParams.executionId || createExecId(); if (execParams.nodeId) { const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); if (!nodeConfig) { @@ -252,13 +286,12 @@ export default class FlowModel { } this.startNodes.forEach((startNode) => { this.scheduler.addTask({ - executionId: this.executionId, + executionId, nodeId: startNode.id, }); - // 所有的开始节点都执行 }); this.scheduler.run({ - executionId: this.executionId, + executionId, }); } } diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index 6014396bd..b3ca9a05e 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -22,23 +22,38 @@ type TaskResult = { extraInfo?: Record; } & NextTaskParam; +type ExecutionId = string; + /** * 调度器 * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 */ export default class Scheduler extends EventEmitter { - nodeQueueMap: Map; - taskRunningMap: Map; + /** + * 当前需要执行的节点队列 + */ + nodeQueueMap: Map; + /** + * 当前正在执行的节点集合 + * 在每个节点执行完成后,会从集合中删除。 + * 同时会判断此集合中是否还存在和此节点相同的executionId,如果不存在,说明此流程已经执行完成。 + */ + taskRunningMap: Map; + /** + * 流程模型,用于创建节点模型。 + */ flowModel: FlowModel; + /** + * 执行记录存储器 + * 用于存储节点执行的结果。 + */ recorder: Recorder; - currentTask: TaskParam | null; constructor(config) { super(); this.nodeQueueMap = new Map(); this.taskRunningMap = new Map(); this.flowModel = config.flowModel; this.recorder = config.recorder; - this.currentTask = null; } /** * 添加一个任务到队列中。 @@ -103,10 +118,6 @@ export default class Scheduler extends EventEmitter { next: this.next.bind(this), }); } - // 流程执行过程中出错,停止执行 - stop(data) { - console.log('stop', data); - } private pushTaskToRunningMap(taskParam) { const { executionId, taskId } = taskParam; if (!this.taskRunningMap.has(executionId)) { @@ -171,7 +182,7 @@ export default class Scheduler extends EventEmitter { detail: execResult.detail, }); } - private async next(data: NextTaskParam) { + private next(data: NextTaskParam) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { this.addTask({ @@ -180,7 +191,7 @@ export default class Scheduler extends EventEmitter { }); }); } - await this.saveTaskResult(data); + this.saveTaskResult(data); this.removeTaskFromRunningMap(data); this.run({ executionId: data.executionId, @@ -188,11 +199,8 @@ export default class Scheduler extends EventEmitter { taskId: data.taskId, }); } - /** - * 为了防止多次添加导致 - */ - private async saveTaskResult(data: TaskResult) { - await this.recorder.addTask({ + private saveTaskResult(data: TaskResult) { + this.recorder.addTask({ executionId: data.executionId, taskId: data.taskId, nodeId: data.nodeId, diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index 7221fc155..6cdfcdb46 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -20,7 +20,7 @@ export default class Recorder implements RecorderInterface { */ async addTask(task: RecorderData) { const { executionId, taskId } = task; - const instanceData = await this.getExecutionTasks(executionId); + const instanceData = this.getExecutionTasks(executionId); if (!instanceData) { this.pushExecution(executionId); }