From 7a218c91128e48543f7a30d5dca44ee3251a25eb Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Mon, 7 Aug 2023 19:55:46 +0800 Subject: [PATCH 1/2] style(engine): change variable task to anction --- packages/engine/__test__/01_index.test.js | 1 + packages/engine/__test__/02_recorder.test.js | 9 +- .../engine/__test__/05_customNode.test.js | 5 +- .../__test__/06_parallelAndSerial.test.js | 13 +- packages/engine/src/FlowModel.ts | 74 +++++------ packages/engine/src/Scheduler.ts | 125 +++++++++--------- packages/engine/src/constant/constant.ts | 4 +- packages/engine/src/index.ts | 23 ++-- packages/engine/src/nodes/BaseNode.ts | 41 +++--- packages/engine/src/recorder/index.ts | 32 ++--- packages/engine/src/types.d.ts | 28 ++-- packages/engine/src/util/ID.ts | 9 +- 12 files changed, 184 insertions(+), 180 deletions(-) diff --git a/packages/engine/__test__/01_index.test.js b/packages/engine/__test__/01_index.test.js index 3b1f166c9..2f7ff5d45 100644 --- a/packages/engine/__test__/01_index.test.js +++ b/packages/engine/__test__/01_index.test.js @@ -2,6 +2,7 @@ import Engine from '../src/index'; describe('@logicflow/engine', () => { test('Execution Process Completed, Returning Data Containing executionId', async () => { + // TODO: context在初始化engine时传入 const engine = new Engine(); const flowData = { graphData: { diff --git a/packages/engine/__test__/02_recorder.test.js b/packages/engine/__test__/02_recorder.test.js index 814494474..f41f834e0 100644 --- a/packages/engine/__test__/02_recorder.test.js +++ b/packages/engine/__test__/02_recorder.test.js @@ -33,18 +33,19 @@ describe('@logicflow/engine Recorder', () => { /** * [ * { - * taskId: '', - * nodeId: '', - * instanceId: '', + * actionId: '', // 某一个节点在某一次执行时生成的Id + * nodeId: '', // 流程图节点Id + * executionId: '', // 某一次执行的Id * nodeType: '', * timestamp: '', * properties: {}, * } * ] */ + // TODO: 给个例子自定义执行记录 const execution = await engine.getExecutionRecord(executionId); expect(execution.length).toBe(2); - expect(execution[1]).toHaveProperty('taskId'); + expect(execution[1]).toHaveProperty('actionId'); expect(execution[1]).toHaveProperty('nodeId'); expect(execution[1]).toHaveProperty('executionId'); expect(execution[1]).toHaveProperty('nodeType'); diff --git a/packages/engine/__test__/05_customNode.test.js b/packages/engine/__test__/05_customNode.test.js index 749c8ba44..a0ee383e1 100644 --- a/packages/engine/__test__/05_customNode.test.js +++ b/packages/engine/__test__/05_customNode.test.js @@ -10,6 +10,9 @@ describe('@logicflow/engine Customize Node', () => { } }; } + async onResume({ data }) { + this.globalData.formId = data.formId; + } } const engine = new Engine(); engine.register({ @@ -67,7 +70,7 @@ describe('@logicflow/engine Customize Node', () => { const result2 = await engine.resume({ executionId: result.executionId, nodeId: result.nodeId, - taskId: result.taskId, + actionId: result.actionId, data: { formId: 'form_2' } diff --git a/packages/engine/__test__/06_parallelAndSerial.test.js b/packages/engine/__test__/06_parallelAndSerial.test.js index 1e015113d..bd6410ffd 100644 --- a/packages/engine/__test__/06_parallelAndSerial.test.js +++ b/packages/engine/__test__/06_parallelAndSerial.test.js @@ -1,6 +1,6 @@ import Engine, { TaskNode } from '../src/index'; -describe('@logicflow/engine parallel and serial', () => { +describe('@logicflow/engine parallel execution', () => { class FetchNode extends TaskNode { async action() { await this.fetch() @@ -77,12 +77,5 @@ describe('@logicflow/engine parallel and serial', () => { expect(execution.length).toBe(4); expect(execution[3].nodeId).toEqual('node2') }); - test('When the process is executed twice, the second execution will start only after the first execution is completed.', async () => { - const r = engine.execute(); - const r2 = engine.execute(); - const result = await Promise.all([r, r2]); - const execution1 = await engine.getExecutionRecord(result[0].executionId); - const execution2 = await engine.getExecutionRecord(result[1].executionId); - expect(execution2[0].timestamp >= execution1[3].timestamp).toBe(true) - }); -}) \ No newline at end of file +}) +// TODO: 增加某个节点出现异常和interrupt 后,控制其他分支节点是否要继续执行的测试用例。 diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 98a47d3c9..42ef560fe 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -9,23 +9,22 @@ import { import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; import { ErrorCode, getErrorMsg } from './constant/LogCode'; -import type { TaskParam } from './types.d'; +import type { ActionParam } from './types.d'; export type FlowResult = { result?: Record; -} & TaskParam; +} & ActionParam; -export type TaskParams = { +export type ActionParams = { executionId?: string; - taskId?: string; + actionId?: string; nodeId?: string; - data?: Record; }; export type ExecParams = { callback?: (result: FlowResult) => void; onError?: (error: Error) => void; -} & TaskParams; +} & ActionParams; export default class FlowModel { /** @@ -39,7 +38,7 @@ export default class FlowModel { /** * 待执行的队列,当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 */ - executeQueue: ExecParams[]; + executeList: ExecParams[]; /** * 当前正在执行。当监听到调度器执行完成时,出触发执行参数中的回调,告知外部执行完成。 */ @@ -92,7 +91,7 @@ export default class FlowModel { // 流程包含的节点类型 this.nodeModelMap = nodeModelMap; // 需要执行的队列 - this.executeQueue = []; + this.executeList = []; // 执行中的任务 this.executingInstance = null; // 外部传入的上下文,最终会传递给每个节点 @@ -107,10 +106,10 @@ export default class FlowModel { recorder, }); this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => { - this.onTaskFinished(result); + this.onExecuteFinished(result); }); this.scheduler.on(EVENT_INSTANCE_INTERRUPTED, (result) => { - this.onTaskFinished(result); + this.onExecuteFinished(result); }); } public setStartNodeType(startNodeType) { @@ -197,38 +196,29 @@ export default class FlowModel { * 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。 * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 + * TODO: 去掉此处的对列,直接使用调度器的队列。 */ public async execute(params: ExecParams) { - this.executeQueue.push(params); - if (this.isRunning) { - return; - } - this.isRunning = true; - this.createExecution(); + this.createExecution(params); } public async resume(params: ExecParams) { - this.executeQueue.push(params); - if (this.isRunning) { - return; - } - this.isRunning = true; - this.createExecution(); + this.createExecution(params); } /** - * 创建节点实例, 每个节点实例都会有一个唯一的taskId。 - * 通过executionId、nodeId、taskId可以唯一确定一个节点的某一次执行。 + * 创建节点实例, 每个节点实例都会有一个唯一的actionId。 + * 通过executionId、nodeId、actionId可以唯一确定一个节点的某一次执行。 * @param nodeId 节点Id * @returns 节点示例 */ - public createTask(nodeId: string) { + public createAction(nodeId: string) { const nodeConfig = this.nodeConfigMap.get(nodeId); const NodeModel = this.nodeModelMap.get(nodeConfig.type); - const task = new NodeModel({ + const action = new NodeModel({ nodeConfig, globalData: this.globalData, context: this.context, }); - return task; + return action; } /** * 更新流程全局数据 @@ -243,16 +233,12 @@ export default class FlowModel { * 在执行完成后,通知外部此次执行完成。 * 如果还存在待执行的任务,那么继续执行。 */ - private onTaskFinished(result) { - const { callback } = this.executingInstance; - if (callback) { - callback(result); - } - this.executingInstance = null; - if (this.executeQueue.length > 0) { - this.createExecution(); - } else { - this.isRunning = false; + private onExecuteFinished(result) { + const index = this.executeList.findIndex(i => i.executionId === result.executionId); + if (index !== -1) { + const { callback } = this.executeList[index]; + this.executeList.splice(index, 1); + callback && callback(result); } } /** @@ -262,20 +248,20 @@ export default class FlowModel { * 若存在nodeId,那么表示从指定节点开始执行。 * 若都不存在,那么新建一个executionId,从开始节点开始执行。 */ - private createExecution() { - const execParams = this.executeQueue.shift(); - this.executingInstance = execParams; + private createExecution(execParams) { + this.executeList.push(execParams); // 如果有taskId,那么表示恢复执行 - if (execParams.taskId && execParams.executionId && execParams.nodeId) { + if (execParams.actionId && execParams.executionId && execParams.nodeId) { this.scheduler.resume({ executionId: execParams.executionId, - taskId: execParams.taskId, + actionId: execParams.actionId, nodeId: execParams.nodeId, data: execParams.data, }); return; } - const executionId = execParams.executionId || createExecId(); + const executionId = createExecId(); + execParams.executionId = executionId; if (execParams.nodeId) { const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); if (!nodeConfig) { @@ -285,7 +271,7 @@ export default class FlowModel { this.startNodes = [nodeConfig]; } this.startNodes.forEach((startNode) => { - this.scheduler.addTask({ + this.scheduler.addAction({ executionId, nodeId: startNode.id, }); diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index b3ca9a05e..aec76b83f 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -4,23 +4,22 @@ import { EVENT_INSTANCE_INTERRUPTED, FlowStatus, } from './constant/constant'; -import { createTaskId } from './util/ID'; +import { createActionId } from './util/ID'; import type { - ActionResult, - TaskParam, + ActionParam, NodeParam, ResumeParam, NodeExecResult, } from './types.d'; import type FlowModel from './FlowModel'; -import type { NextTaskParam } from './nodes/BaseNode'; +import type { NextActionParam } from './nodes/BaseNode'; import type Recorder from './recorder'; -type TaskParamMap = Map; +type ActionParamMap = Map; -type TaskResult = { +type ActionResult = { extraInfo?: Record; -} & NextTaskParam; +} & NextActionParam; type ExecutionId = string; @@ -38,7 +37,7 @@ export default class Scheduler extends EventEmitter { * 在每个节点执行完成后,会从集合中删除。 * 同时会判断此集合中是否还存在和此节点相同的executionId,如果不存在,说明此流程已经执行完成。 */ - taskRunningMap: Map; + actionRunningMap: Map; /** * 流程模型,用于创建节点模型。 */ @@ -51,7 +50,7 @@ export default class Scheduler extends EventEmitter { constructor(config) { super(); this.nodeQueueMap = new Map(); - this.taskRunningMap = new Map(); + this.actionRunningMap = new Map(); this.flowModel = config.flowModel; this.recorder = config.recorder; } @@ -60,13 +59,13 @@ export default class Scheduler extends EventEmitter { * 1. 由流程模型将所有的开始节点添加到队列中。 * 2. 当一个节点执行完成后,将后续的节点添加到队列中。 */ - public addTask(nodeParam: NodeParam) { + public addAction(nodeParam: NodeParam) { const { executionId } = nodeParam; if (!this.nodeQueueMap.has(executionId)) { this.nodeQueueMap.set(executionId, []); } - const currentTaskQueue = this.nodeQueueMap.get(executionId); - currentTaskQueue.push(nodeParam); + const currentActionQueue = this.nodeQueueMap.get(executionId); + currentActionQueue.push(nodeParam); } /** * 调度器执行下一个任务 @@ -77,27 +76,28 @@ export default class Scheduler extends EventEmitter { public run(runParams: { executionId: string; nodeId?: string; - taskId?: string; + actionId?: string; }) { const nodeQueue = this.nodeQueueMap.get(runParams.executionId); if (nodeQueue.length > 0) { this.nodeQueueMap.set(runParams.executionId, []); + // TODO: 并发执行,考虑用对列来实现一样的效果,可能会更好理解。 for (let i = 0; i < nodeQueue.length; i++) { const currentNode = nodeQueue[i]; - const taskId = createTaskId(); - const taskParam = { + const actionId = createActionId(); + const actionParam = { ...currentNode, - taskId, + actionId, }; - this.pushTaskToRunningMap(taskParam); - this.exec(taskParam); + this.pushActionToRunningMap(actionParam); + this.exec(actionParam); } - } else if (!this.hasRunningTask(runParams.executionId)) { - // 当一个流程在nodeQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 + } else if (!this.hasRunningAction(runParams.executionId)) { + // 当一个流程在nodeQueueMap和actionRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 this.emit(EVENT_INSTANCE_COMPLETE, { executionId: runParams.executionId, nodeId: runParams.nodeId, - taskId: runParams.taskId, + actionId: runParams.actionId, status: FlowStatus.COMPLETED, }); } @@ -107,58 +107,58 @@ export default class Scheduler extends EventEmitter { * 可以自定义节点手动实现流程中断,然后通过此方法恢复流程的执行。 */ public async resume(resumeParam: ResumeParam) { - this.pushTaskToRunningMap({ + this.pushActionToRunningMap({ executionId: resumeParam.executionId, nodeId: resumeParam.nodeId, - taskId: resumeParam.taskId, + actionId: resumeParam.actionId, }); - const model = this.flowModel.createTask(resumeParam.nodeId); + const model = this.flowModel.createAction(resumeParam.nodeId); await model.resume({ ...resumeParam, next: this.next.bind(this), }); } - private pushTaskToRunningMap(taskParam) { - const { executionId, taskId } = taskParam; - if (!this.taskRunningMap.has(executionId)) { - const runningMap = new Map(); - this.taskRunningMap.set(executionId, runningMap); + private pushActionToRunningMap(actionParam) { + const { executionId, actionId } = actionParam; + if (!this.actionRunningMap.has(executionId)) { + const runningMap = new Map(); + this.actionRunningMap.set(executionId, runningMap); } - this.taskRunningMap.get(executionId).set(taskId, taskParam); + this.actionRunningMap.get(executionId).set(actionId, actionParam); } - private removeTaskFromRunningMap(taskParam: TaskParam) { - const { executionId, taskId } = taskParam; - if (!taskId) return; - const runningMap = this.taskRunningMap.get(executionId); + private removeActionFromRunningMap(actionParam: ActionParam) { + const { executionId, actionId } = actionParam; + if (!actionId) return; + const runningMap = this.actionRunningMap.get(executionId); if (!runningMap) return; - runningMap.delete(taskId); + runningMap.delete(actionId); } - private hasRunningTask(executionId) { - const runningMap = this.taskRunningMap.get(executionId); + private hasRunningAction(executionId) { + const runningMap = this.actionRunningMap.get(executionId); if (!runningMap) return false; if (runningMap.size === 0) { - this.taskRunningMap.delete(executionId); + this.actionRunningMap.delete(executionId); return false; } return true; } - private async exec(taskParam: TaskParam) { - const model = this.flowModel.createTask(taskParam.nodeId); + private async exec(actionParam: ActionParam) { + const model = this.flowModel.createAction(actionParam.nodeId); const execResult = await model.execute({ - executionId: taskParam.executionId, - taskId: taskParam.taskId, - nodeId: taskParam.nodeId, + executionId: actionParam.executionId, + actionId: actionParam.actionId, + nodeId: actionParam.nodeId, next: this.next.bind(this), }); if (execResult && execResult.status === FlowStatus.INTERRUPTED) { this.interrupted({ execResult, - taskParam, + actionParam, }); - this.saveTaskResult({ - executionId: taskParam.executionId, - nodeId: taskParam.nodeId, - taskId: taskParam.taskId, + this.saveActionResult({ + executionId: actionParam.executionId, + nodeId: actionParam.nodeId, + actionId: actionParam.actionId, nodeType: execResult.nodeType, properties: execResult.properties, outgoing: [], @@ -167,42 +167,43 @@ export default class Scheduler extends EventEmitter { detail: execResult.detail, }, }); - this.removeTaskFromRunningMap(taskParam); + this.removeActionFromRunningMap(actionParam); } + // TODO: 考虑停下所有的任务 } private interrupted({ execResult, - taskParam, - } : { execResult: NodeExecResult, taskParam: TaskParam}) { + actionParam, + } : { execResult: NodeExecResult, actionParam: ActionParam}) { this.emit(EVENT_INSTANCE_INTERRUPTED, { - executionId: taskParam.executionId, + executionId: actionParam.executionId, status: FlowStatus.INTERRUPTED, - nodeId: taskParam.nodeId, - taskId: taskParam.taskId, + nodeId: actionParam.nodeId, + actionId: actionParam.actionId, detail: execResult.detail, }); } - private next(data: NextTaskParam) { + private next(data: NextActionParam) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { - this.addTask({ + this.addAction({ executionId: data.executionId, nodeId: item.target, }); }); } - this.saveTaskResult(data); - this.removeTaskFromRunningMap(data); + this.saveActionResult(data); + this.removeActionFromRunningMap(data); this.run({ executionId: data.executionId, nodeId: data.nodeId, - taskId: data.taskId, + actionId: data.actionId, }); } - private saveTaskResult(data: TaskResult) { - this.recorder.addTask({ + private saveActionResult(data: ActionResult) { + this.recorder.addActionRecord({ executionId: data.executionId, - taskId: data.taskId, + actionId: data.actionId, nodeId: data.nodeId, nodeType: data.nodeType, timestamp: Date.now(), diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts index ddc96a178..9a6cec0b4 100644 --- a/packages/engine/src/constant/constant.ts +++ b/packages/engine/src/constant/constant.ts @@ -13,8 +13,8 @@ export enum FlowStatus { ERROR = 'error', } -// task status -export enum TaskStatus { +// action status +export enum ActionStatus { SUCCESS = 'success', ERROR = 'error', INTERRUPTED = 'interrupted', diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index f4ec55cfa..4a5694bc3 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -1,19 +1,22 @@ import type { ResumeParams, GraphConfigData } from './types.d'; -import FlowModel, { TaskParams } from './FlowModel'; +import FlowModel, { ActionParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; import Recorder from './recorder'; +import { createEngineId } from './util/ID'; +import { NodeConstructor } from './nodes/BaseNode'; export default class Engine { global: Record; graphData: GraphConfigData; - nodeModelMap: Map; + nodeModelMap: Map; flowModel: FlowModel; + id: string; recorder: Recorder; constructor() { this.nodeModelMap = new Map(); this.recorder = new Recorder(); - // register node + this.id = createEngineId(); this.register({ type: StartNode.nodeTypeName, model: StartNode, @@ -25,7 +28,7 @@ export default class Engine { } /** * 注册节点 - * @param nodeConfig { type: 'custom-node', model: Class } + * @param nodeConfig { type: 'custom-node', model: NodeClass } */ register(nodeConfig) { this.nodeModelMap.set(nodeConfig.type, nodeConfig.model); @@ -35,8 +38,8 @@ export default class Engine { * 注意:由于执行记录不会主动删除,所以需要自行清理。 * nodejs环境建议自定义为持久化存储。 * engine.setCustomRecorder({ - * async addTask(task) {} - * async getTask(taskId) {} + * async addActionRecord(task) {} + * async getTask(actionId) {} * async getExecutionTasks(executionId) {} * clear() {} * }); @@ -66,7 +69,7 @@ export default class Engine { /** * 执行流程,允许多次调用。 */ - async execute(execParam?: TaskParams) { + async execute(execParam?: ActionParams) { return new Promise((resolve, reject) => { if (!execParam) { execParam = {}; @@ -96,10 +99,10 @@ export default class Engine { }); } async getExecutionRecord(executionId) { - const tasks = await this.recorder.getExecutionTasks(executionId); + const tasks = await this.recorder.getExecutionActions(executionId); const records = []; for (let i = 0; i < tasks.length; i++) { - records.push(this.recorder.getTask(tasks[i])); + records.push(this.recorder.getActionRecord(tasks[i])); } return Promise.all(records); } @@ -112,5 +115,5 @@ export { }; export type { - TaskParams, + ActionParams, }; diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index ef177b6a8..33986dfbc 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -1,4 +1,4 @@ -import { TaskStatus } from '../constant/constant'; +import { ActionStatus } from '../constant/constant'; import { getExpressionResult } from '../expression'; import type { ActionResult, @@ -13,7 +13,7 @@ export interface BaseNodeInterface { nodeId: string; type: string; readonly baseType: string; - execute(taskParam): Promise; + execute(actionParam): Promise; } export type NodeConstructor = { @@ -22,6 +22,17 @@ export type NodeConstructor = { context: Record; globalData: Record; }): BaseNode; + action(params: { + executionId: string; + actionId: string; + nodeId: string; + }): Promise; + onResume(params: { + executionId: string; + actionId: string; + nodeId: string; + data?: Record; + }): Promise; }; export type IncomingConfig = { @@ -44,10 +55,10 @@ export type NodeConfig = { outgoing: OutgoingConfig[]; }; -export type NextTaskParam = { +export type NextActionParam = { executionId: string; nodeId: string; - taskId: string; + actionId: string; nodeType: string; outgoing: OutgoingConfig[]; properties?: Record; @@ -90,19 +101,19 @@ export default class BaseNode implements BaseNodeInterface { this.baseType = 'base'; } /** - * 节点的每一次执行都会生成一个唯一的taskId + * 节点的每一次执行都会生成一个唯一的actionId */ public async execute(params: ExecParams): Promise { const r = await this.action({ executionId: params.executionId, - taskId: params.taskId, + actionId: params.actionId, nodeId: this.nodeId, }); - if (!r || r.status === TaskStatus.SUCCESS) { + if (!r || r.status === ActionStatus.SUCCESS) { const outgoing = await this.getOutgoing(); params.next({ executionId: params.executionId, - taskId: params.taskId, + actionId: params.actionId, nodeId: this.nodeId, nodeType: this.type, properties: this.properties, @@ -113,7 +124,7 @@ export default class BaseNode implements BaseNodeInterface { status: r && r.status, detail: r && r.detail, executionId: params.executionId, - taskId: params.taskId, + actionId: params.actionId, nodeId: this.nodeId, nodeType: this.type, properties: this.properties, @@ -128,12 +139,12 @@ export default class BaseNode implements BaseNodeInterface { await this.onResume({ executionId: params.executionId, nodeId: params.nodeId, - taskId: params.taskId, + actionId: params.actionId, data: params.data, }); params.next({ executionId: params.executionId, - taskId: params.taskId, + actionId: params.actionId, nodeId: this.nodeId, nodeType: this.type, properties: this.properties, @@ -173,12 +184,12 @@ export default class BaseNode implements BaseNodeInterface { * 节点的执行逻辑 * @overridable 可以自定义节点重写此方法。 * @param params.executionId 流程执行记录ID - * @param params.taskId 此节点执行记录ID + * @param params.actionId 此节点执行记录ID * @param params.nodeId 节点ID */ public async action(params: { executionId: string; - taskId: string; + actionId: string; nodeId: string; }): Promise { return undefined; @@ -187,12 +198,12 @@ export default class BaseNode implements BaseNodeInterface { * 节点的重新恢复执行逻辑 * @overridable 可以自定义节点重写此方法。 * @param params.executionId 流程执行记录ID - * @param params.taskId 此节点执行记录ID + * @param params.actionId 此节点执行记录ID * @param params.nodeId 节点ID */ public async onResume(params: { executionId: string, - taskId: string, + actionId: string, nodeId: string, data?: Record, }): Promise { diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index 6cdfcdb46..d501d94ac 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -8,9 +8,9 @@ const LOGICFLOW_ENGINE_INSTANCES = 'LOGICFLOW_ENGINE_INSTANCES'; export default class Recorder implements RecorderInterface { /* - * @param {Object} task + * @param {Object} action * { - * taskId: '', + * actionId: '', * nodeId: '', * executionId: '', * nodeType: '', @@ -18,19 +18,19 @@ export default class Recorder implements RecorderInterface { * properties: {}, * } */ - async addTask(task: RecorderData) { - const { executionId, taskId } = task; - const instanceData = this.getExecutionTasks(executionId); + async addActionRecord(action: RecorderData) { + const { executionId, actionId } = action; + const instanceData = this.getExecutionActions(executionId); if (!instanceData) { this.pushExecution(executionId); } - this.pushTaskToExecution(executionId, taskId); - storage.setItem(taskId, task); + this.pushActionToExecution(executionId, actionId); + storage.setItem(actionId, action); } - async getTask(taskId: string): Promise { - return storage.getItem(taskId); + async getActionRecord(actionId: string): Promise { + return storage.getItem(actionId); } - async getExecutionTasks(executionId) { + async getExecutionActions(executionId) { return storage.getItem(executionId); } clear() { @@ -38,8 +38,8 @@ export default class Recorder implements RecorderInterface { instance.forEach((executionId) => { storage.removeItem(executionId); const instanceData = storage.getItem(executionId) || []; - instanceData.forEach((taskId) => { - storage.removeItem(taskId); + instanceData.forEach((actionId) => { + storage.removeItem(actionId); }); }); storage.removeItem(LOGICFLOW_ENGINE_INSTANCES); @@ -49,9 +49,9 @@ export default class Recorder implements RecorderInterface { instance.push(executionId); storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); } - private pushTaskToExecution(executionId, taskId) { - const tasks = storage.getItem(executionId) || []; - tasks.push(taskId); - storage.setItem(executionId, tasks); + private pushActionToExecution(executionId, actionId) { + const actions = storage.getItem(executionId) || []; + actions.push(actionId); + storage.setItem(executionId, actions); } } diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts index e87283785..352b754e9 100644 --- a/packages/engine/src/types.d.ts +++ b/packages/engine/src/types.d.ts @@ -8,45 +8,45 @@ export type NodeParam = { /** * 执行节点的参数 */ -export type TaskParam = { - taskId: string; +export type ActionParam = { + actionId: string; } & NodeParam; export type ExecParams = { - next: (data: NextTaskParam) => void; -} & TaskParam; + next: (data: NextActionParam) => void; +} & ActionParam; export type ResumeParam = { data: Record; -} & TaskParam; +} & ActionParam; export type ExecResumeParams = { - next: (data: NextTaskParam) => void; + next: (data: NextActionParam) => void; } & ResumeParam; export type RecorderData = { nodeType: string; timestamp: number; properties?: Record; -} & TaskParam; +} & ActionParam; export interface RecorderInterface { - addTask: (task: RecorderData) => Promise; - getTask: (taskId: string) => Promise; - getExecutionTasks: (executionId: string) => Promise; + addActionRecord: (task: RecorderData) => Promise; + getActionRecord: (actionId: string) => Promise; + getExecutionActions: (executionId: string) => Promise; clear: () => void; }; -export type TaskStatus = 'success' | 'error' | 'interrupted' | ''; +export type ActionStatus = 'success' | 'error' | 'interrupted' | ''; export type ActionResult = { - status?: TaskStatus; + status?: ActionStatus; detail?: Record; }; export type NodeExecResult = { executionId: string, - taskId: string, + actionId: string, nodeId: string, nodeType: string, properties?: Record, @@ -54,7 +54,7 @@ export type NodeExecResult = { export type ResumeParams = { executionId: string; - taskId: string; + actionId: string; nodeId: string; data?: Record; } diff --git a/packages/engine/src/util/ID.ts b/packages/engine/src/util/ID.ts index a58491d36..f949db320 100644 --- a/packages/engine/src/util/ID.ts +++ b/packages/engine/src/util/ID.ts @@ -5,7 +5,12 @@ export const createExecId = (): string => { return `exec-${uuid}`; }; -export const createTaskId = (): string => { +export const createActionId = (): string => { const uuid = uuidv4(); - return `task-${uuid}`; + return `action-${uuid}`; +}; + +export const createEngineId = (): string => { + const uuid = uuidv4(); + return `engine-${uuid}`; }; From 69f058c74e9564b9c2847268028558354eb2bad3 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Tue, 8 Aug 2023 16:54:47 +0800 Subject: [PATCH 2/2] refactor(engine): improve readablity --- packages/engine/__test__/01_index.test.js | 8 +- packages/engine/__test__/02_recorder.test.js | 40 +++++ packages/engine/__test__/03_condition.test.js | 4 + packages/engine/__test__/04_execute.test.js | 16 ++ .../engine/__test__/05_customNode.test.js | 89 +++++++---- ...l.test.js => 06_parallelExecution.test.js} | 5 +- .../__test__/07_interruptedAndResume.test.js | 139 ++++++++++++++++++ packages/engine/src/Scheduler.ts | 27 ++-- packages/engine/src/index.ts | 35 ++++- packages/engine/src/nodes/BaseNode.ts | 2 +- packages/engine/src/recorder/index.ts | 22 ++- packages/engine/src/types.d.ts | 4 + 12 files changed, 333 insertions(+), 58 deletions(-) rename packages/engine/__test__/{06_parallelAndSerial.test.js => 06_parallelExecution.test.js} (93%) create mode 100644 packages/engine/__test__/07_interruptedAndResume.test.js diff --git a/packages/engine/__test__/01_index.test.js b/packages/engine/__test__/01_index.test.js index 2f7ff5d45..b8a17270d 100644 --- a/packages/engine/__test__/01_index.test.js +++ b/packages/engine/__test__/01_index.test.js @@ -3,8 +3,13 @@ import Engine from '../src/index'; describe('@logicflow/engine', () => { test('Execution Process Completed, Returning Data Containing executionId', async () => { // TODO: context在初始化engine时传入 - const engine = new Engine(); + const engine = new Engine({ + context: {}, + }); const flowData = { + /** + * node1 |--> node2 + */ graphData: { nodes: [ { @@ -24,7 +29,6 @@ describe('@logicflow/engine', () => { } ] }, - context: {}, globalData: {}, } const flowModel = engine.load(flowData); diff --git a/packages/engine/__test__/02_recorder.test.js b/packages/engine/__test__/02_recorder.test.js index f41f834e0..d5457e6f8 100644 --- a/packages/engine/__test__/02_recorder.test.js +++ b/packages/engine/__test__/02_recorder.test.js @@ -4,6 +4,9 @@ describe('@logicflow/engine Recorder', () => { test('Using the getExecutionRecord API, receive the complete execution record of the process.', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + */ graphData: { nodes: [ { @@ -54,4 +57,41 @@ describe('@logicflow/engine Recorder', () => { expect(execution[1].nodeId).toBe('node2'); expect(execution[1].nodeType).toBe('TaskNode'); }); + test('The execution record cannot be obtained when the number of executions exceeds the maximum number of executions.', async () => { + const engine = new Engine(); + const flowData = { + /** + * node1 |--> node2 + */ + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: {} + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + } + ] + }, + global: {}, + } + engine.load(flowData); + engine.recorder.setMaxRecorderNumber(2); + const result = await engine.execute(); + await engine.execute(); + await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + expect(execution).toBe(null); + }) }); \ No newline at end of file diff --git a/packages/engine/__test__/03_condition.test.js b/packages/engine/__test__/03_condition.test.js index 25a18c5b6..d161870d6 100644 --- a/packages/engine/__test__/03_condition.test.js +++ b/packages/engine/__test__/03_condition.test.js @@ -4,6 +4,10 @@ describe('@logicflow/engine condition', () => { test('The process will not continue its execution if the condition expression evaluates to false.', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + * |--> node3 + */ graphData: { nodes: [ { diff --git a/packages/engine/__test__/04_execute.test.js b/packages/engine/__test__/04_execute.test.js index 535818aaf..fdb0f058e 100644 --- a/packages/engine/__test__/04_execute.test.js +++ b/packages/engine/__test__/04_execute.test.js @@ -4,6 +4,10 @@ describe('@logicflow/engine Execute', () => { test('When there are multiple start nodes in a process, all of them are executed by default.', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + * node3 |--> node4 + */ graphData: { nodes: [ { @@ -59,6 +63,10 @@ describe('@logicflow/engine Execute', () => { test('When there are multiple start nodes in a process, you can specify which start node to execute.', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + * node3 |--> node4 + */ graphData: { nodes: [ { @@ -118,6 +126,10 @@ describe('@logicflow/engine Execute', () => { test('When attempting to execute a non-existent start node in a process, an execution exception is raised.', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + * node3 |--> node4 + */ graphData: { nodes: [ { @@ -177,6 +189,10 @@ describe('@logicflow/engine Execute', () => { test('When there are multiple start nodes in a process, parallel execution is supported, and each start node generates a unique execution ID', async () => { const engine = new Engine(); const flowData = { + /** + * node1 |--> node2 + * node3 |--> node4 + */ graphData: { nodes: [ { diff --git a/packages/engine/__test__/05_customNode.test.js b/packages/engine/__test__/05_customNode.test.js index a0ee383e1..66b7d7ac3 100644 --- a/packages/engine/__test__/05_customNode.test.js +++ b/packages/engine/__test__/05_customNode.test.js @@ -1,25 +1,51 @@ import Engine, { TaskNode } from '../src/index'; describe('@logicflow/engine Customize Node', () => { - class UserTask extends TaskNode { + class DataNode extends TaskNode { async action() { - return { - status: 'interrupted', - detail: { - formId: 'form_1' - } - }; + this.globalData['dataSource'] = { + time: this.context.getTime(), + } } - async onResume({ data }) { - this.globalData.formId = data.formId; + } + class Mod2Node extends TaskNode { + async action() { + const dataSource = this.globalData['dataSource']; + if (dataSource && dataSource.time) { + dataSource.time % 2 === 0 ? this.globalData['output'] = 'even' : this.globalData['output'] = 'odd'; + } + } + } + class OutputNode extends TaskNode { + async action() { + const output = this.globalData['output']; + this.properties['output'] = output; } } - const engine = new Engine(); + const engine = new Engine({ + context: { + getTime() { + return new Date().getTime(); + } + } + }); + engine.register({ + type: 'DataNode', + model: DataNode, + }) + engine.register({ + type: 'Mod2Node', + model: Mod2Node, + }) engine.register({ - type: 'UserTask', - model: UserTask, + type: 'OutputNode', + model: OutputNode, }) + const flowData = { + /** + * node1 |--> node2(DataNode) |--> node3(Mod2Node) |--> node4(OutputNode) + */ graphData: { nodes: [ { @@ -30,14 +56,19 @@ describe('@logicflow/engine Customize Node', () => { }, { id: 'node2', - type: 'UserTask', + type: 'DataNode', properties: {} }, { id: 'node3', - type: 'TaskNode', + type: 'Mod2Node', + properties: {} + }, + { + id: 'node4', + type: 'OutputNode', properties: {} - } + }, ], edges: [ { @@ -53,29 +84,23 @@ describe('@logicflow/engine Customize Node', () => { targetNodeId: 'node3', properties: { } - } + }, + { + id: 'edge3', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + }, ] }, globalData: { }, } engine.load(flowData); - test('After executing the process, receive the flow status as "interrupted" and include detailed information returned by the custom node.', async () => { + test('When the process is completed, the output field in the properties attribute of the last node is odd or even.', async () => { const result = await engine.execute(); - expect(result.status).toBe('interrupted'); - expect(result.detail.formId).toEqual('form_1'); - }); - test('After a process is interrupted, you can resume its execution using the API.', async () => { - const result = await engine.execute(); - const result2 = await engine.resume({ - executionId: result.executionId, - nodeId: result.nodeId, - actionId: result.actionId, - data: { - formId: 'form_2' - } - }) - expect(result2.status).toBe('completed') - expect(result2.nodeId).toEqual('node3') + const execution = await engine.getExecutionRecord(result.executionId); + expect(['odd', 'even'].indexOf(execution[execution.length - 1].properties.output) !== -1).toBe(true); }); }); diff --git a/packages/engine/__test__/06_parallelAndSerial.test.js b/packages/engine/__test__/06_parallelExecution.test.js similarity index 93% rename from packages/engine/__test__/06_parallelAndSerial.test.js rename to packages/engine/__test__/06_parallelExecution.test.js index bd6410ffd..fe829a80b 100644 --- a/packages/engine/__test__/06_parallelAndSerial.test.js +++ b/packages/engine/__test__/06_parallelExecution.test.js @@ -19,6 +19,10 @@ describe('@logicflow/engine parallel execution', () => { model: FetchNode, }) const flowData = { + /** + * node1 |--> node2(FetchTask) + * |--> node3 |--> node4 + */ graphData: { nodes: [ { @@ -78,4 +82,3 @@ describe('@logicflow/engine parallel execution', () => { expect(execution[3].nodeId).toEqual('node2') }); }) -// TODO: 增加某个节点出现异常和interrupt 后,控制其他分支节点是否要继续执行的测试用例。 diff --git a/packages/engine/__test__/07_interruptedAndResume.test.js b/packages/engine/__test__/07_interruptedAndResume.test.js new file mode 100644 index 000000000..ff4bec984 --- /dev/null +++ b/packages/engine/__test__/07_interruptedAndResume.test.js @@ -0,0 +1,139 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine interrupted and resume', () => { + class UserTask extends TaskNode { + async action() { + this.globalData['a'] = 1; + return { + status: 'interrupted', + detail: { + formId: 'form_1' + } + }; + } + async onResume({ data }) { + this.globalData.formId = data.formId; + } + } + class AsyncNode extends TaskNode { + async action() { + await this.wait(500); + } + wait(time) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, time); + }) + } + } + const engine = new Engine(); + engine.register({ + type: 'UserTask', + model: UserTask, + }) + engine.register({ + type: 'AsyncNode', + model: AsyncNode, + }) + const flowData = { + /** + * node1 |--> node2(UserTask) |--> node3 + * |--> node4(AsyncNode) |--> node5 + */ + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'UserTask', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + }, + { + id: 'node4', + type: 'AsyncNode', + properties: {} + }, + { + id: 'node5', + type: 'TaskNode', + properties: {} + }, + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node2', + targetNodeId: 'node3', + properties: { + conditionExpression: 'a === 1' + } + }, + { + id: 'edge3', + sourceNodeId: 'node1', + targetNodeId: 'node4', + properties: { + } + }, + { + id: 'edge4', + sourceNodeId: 'node4', + targetNodeId: 'node5', + properties: {} + } + ] + }, + globalData: { + a: 0 + }, + } + engine.load(flowData); + test('After executing the process, receive the flow status as "interrupted" and include detailed information returned by the custom node.', async () => { + const result = await engine.execute(); + expect(result.status).toBe('interrupted'); + expect(result.detail.formId).toEqual('form_1'); + }); + test('After a process is interrupted, you can resume its execution using the API.', async () => { + const result = await engine.execute(); + await wait(500); + const result2 = await engine.resume({ + executionId: result.executionId, + nodeId: result.nodeId, + actionId: result.actionId, + data: { + formId: 'form_2' + } + }) + expect(result2.status).toBe('completed') + expect(result2.nodeId).toEqual('node3') + const execution = await engine.getExecutionRecord(result2.executionId); + // interrupted node have two execution record + expect(execution.length).toEqual(6); + }); +}); + +function wait(time) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, time); + }) +} diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index aec76b83f..3451e5684 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -79,20 +79,19 @@ export default class Scheduler extends EventEmitter { actionId?: string; }) { const nodeQueue = this.nodeQueueMap.get(runParams.executionId); - if (nodeQueue.length > 0) { - this.nodeQueueMap.set(runParams.executionId, []); - // TODO: 并发执行,考虑用对列来实现一样的效果,可能会更好理解。 - for (let i = 0; i < nodeQueue.length; i++) { - const currentNode = nodeQueue[i]; - const actionId = createActionId(); - const actionParam = { - ...currentNode, - actionId, - }; - this.pushActionToRunningMap(actionParam); - this.exec(actionParam); - } - } else if (!this.hasRunningAction(runParams.executionId)) { + // 将同一个executionId当前待执行的节点一起执行 + // 避免出现某一个节点执行时间过长,导致其他节点等待时间过长。 + while (nodeQueue.length) { + const currentNode = nodeQueue.pop(); + const actionId = createActionId(); + const actionParam = { + ...currentNode, + actionId, + }; + this.pushActionToRunningMap(actionParam); + this.exec(actionParam); + } + if (!this.hasRunningAction(runParams.executionId)) { // 当一个流程在nodeQueueMap和actionRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 this.emit(EVENT_INSTANCE_COMPLETE, { executionId: runParams.executionId, diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 4a5694bc3..1f1339aaa 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -1,4 +1,4 @@ -import type { ResumeParams, GraphConfigData } from './types.d'; +import type { ResumeParams, GraphConfigData, EngineConstructorOptions } from './types.d'; import FlowModel, { ActionParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; @@ -7,16 +7,17 @@ import { createEngineId } from './util/ID'; import { NodeConstructor } from './nodes/BaseNode'; export default class Engine { + id: string; global: Record; graphData: GraphConfigData; nodeModelMap: Map; flowModel: FlowModel; - id: string; recorder: Recorder; - constructor() { + context: Record; + constructor(options?: EngineConstructorOptions) { this.nodeModelMap = new Map(); - this.recorder = new Recorder(); this.id = createEngineId(); + this.recorder = new Recorder(); this.register({ type: StartNode.nodeTypeName, model: StartNode, @@ -25,6 +26,7 @@ export default class Engine { type: TaskNode.nodeTypeName, model: TaskNode, }); + this.context = options?.context || {}; } /** * 注册节点 @@ -54,12 +56,11 @@ export default class Engine { graphData, startNodeType = 'StartNode', globalData = {}, - context = {}, }) { this.flowModel = new FlowModel({ nodeModelMap: this.nodeModelMap, recorder: this.recorder, - context, + context: this.context, globalData, startNodeType, }); @@ -85,6 +86,12 @@ export default class Engine { }); }); } + /** + * 恢复执行 + * 注意此方法只能恢复节点后面的执行,不能恢复流程其他分支的执行。 + * 同理,中断执行也只能中断节点后面的执行,不会中断其他分支的执行。 + * 在实际项目中,如果存在中断节点,建议流程所有的节点都是排他网关,这样可以保证执行的过程不存在分支。 + */ async resume(resumeParam: ResumeParams) { return new Promise((resolve, reject) => { this.flowModel.resume({ @@ -100,12 +107,28 @@ export default class Engine { } async getExecutionRecord(executionId) { const tasks = await this.recorder.getExecutionActions(executionId); + if (!tasks) { + return null; + } const records = []; for (let i = 0; i < tasks.length; i++) { records.push(this.recorder.getActionRecord(tasks[i])); } return Promise.all(records); } + getGlobalData() { + return this.flowModel?.globalData; + } + setGlobalData(data) { + if (this.flowModel) { + this.flowModel.globalData = data; + } + } + updateGlobalData(data) { + if (this.flowModel) { + Object.assign(this.flowModel.globalData, data); + } + } } export { diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 33986dfbc..56d780a5e 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -95,7 +95,7 @@ export default class BaseNode implements BaseNodeInterface { this.incoming = nodeConfig.incoming; this.nodeId = nodeConfig.id; this.type = nodeConfig.type; - this.properties = nodeConfig.properties; + this.properties = nodeConfig.properties || {}; this.context = context; this.globalData = globalData; this.baseType = 'base'; diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index d501d94ac..f58742ef3 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -5,8 +5,15 @@ import type { import storage from '../util/storage'; const LOGICFLOW_ENGINE_INSTANCES = 'LOGICFLOW_ENGINE_INSTANCES'; - +const MAX_RECORDER = 100; export default class Recorder implements RecorderInterface { + maxRecorder: number; + constructor() { + this.maxRecorder = MAX_RECORDER; + } + setMaxRecorderNumber(maxRecorder: number) { + this.maxRecorder = maxRecorder; + } /* * @param {Object} action * { @@ -20,7 +27,7 @@ export default class Recorder implements RecorderInterface { */ async addActionRecord(action: RecorderData) { const { executionId, actionId } = action; - const instanceData = this.getExecutionActions(executionId); + const instanceData = await this.getExecutionActions(executionId); if (!instanceData) { this.pushExecution(executionId); } @@ -46,9 +53,20 @@ export default class Recorder implements RecorderInterface { } private pushExecution(executionId) { const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + if (instance.length >= this.maxRecorder) { + const removeItem = instance.shift(); + this.popExecution(removeItem); + } instance.push(executionId); storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); } + private popExecution(executionId) { + const instanceData = storage.getItem(executionId) || []; + instanceData.forEach((actionId) => { + storage.removeItem(actionId); + }); + storage.removeItem(executionId); + } private pushActionToExecution(executionId, actionId) { const actions = storage.getItem(executionId) || []; actions.push(actionId); diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts index 352b754e9..6d376d556 100644 --- a/packages/engine/src/types.d.ts +++ b/packages/engine/src/types.d.ts @@ -110,3 +110,7 @@ export declare type EdgeConfig = { zIndex?: number; properties?: Record; }; + +export declare type EngineConstructorOptions = { + context?: Record; +}