From c7d80f4b4c19cf82af9be49dd8fd44433327db58 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Fri, 7 Jul 2023 16:42:30 +0800 Subject: [PATCH 01/11] feat: create logicflow eninge --- .eslintrc.js | 2 +- packages/core/package.json | 2 +- packages/engine/README.md | 22 ++++ packages/engine/__test__/index.test.js | 55 ++++++++ packages/engine/babel.config.js | 8 ++ packages/engine/package.json | 86 +++++++++++++ packages/engine/src/EventEmitter.ts | 64 +++++++++ packages/engine/src/FlowModel.ts | 157 +++++++++++++++++++++++ packages/engine/src/NodeManager.ts | 26 ++++ packages/engine/src/Scheduler.ts | 82 ++++++++++++ packages/engine/src/constant/LogCode.ts | 20 +++ packages/engine/src/constant/constant.ts | 4 + packages/engine/src/index.ts | 64 +++++++++ packages/engine/src/nodes/BaseNode.ts | 52 ++++++++ packages/engine/src/nodes/StartNode.ts | 9 ++ packages/engine/src/nodes/TaskNode.ts | 9 ++ packages/engine/src/util/ID.ts | 11 ++ packages/engine/tsconfig.json | 12 ++ packages/extension/package.json | 2 +- tsconfig.json | 2 + yarn.lock | 2 +- 21 files changed, 687 insertions(+), 4 deletions(-) create mode 100644 packages/engine/README.md create mode 100644 packages/engine/__test__/index.test.js create mode 100644 packages/engine/babel.config.js create mode 100644 packages/engine/package.json create mode 100644 packages/engine/src/EventEmitter.ts create mode 100644 packages/engine/src/FlowModel.ts create mode 100644 packages/engine/src/NodeManager.ts create mode 100644 packages/engine/src/Scheduler.ts create mode 100644 packages/engine/src/constant/LogCode.ts create mode 100644 packages/engine/src/constant/constant.ts create mode 100644 packages/engine/src/index.ts create mode 100644 packages/engine/src/nodes/BaseNode.ts create mode 100644 packages/engine/src/nodes/StartNode.ts create mode 100644 packages/engine/src/nodes/TaskNode.ts create mode 100644 packages/engine/src/util/ID.ts create mode 100644 packages/engine/tsconfig.json diff --git a/.eslintrc.js b/.eslintrc.js index fb01dcf14..e5c980287 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -27,7 +27,7 @@ module.exports = { pragma: 'h', }, }, - ignorePatterns: ["**/*.mjs"], + ignorePatterns: ["**/*.mjs", "**/*.test.js"], rules: { 'indent': ['error', 2, { SwitchCase: 1 }], 'linebreak-style': ['error', 'unix'], diff --git a/packages/core/package.json b/packages/core/package.json index 81129d2ec..9265ef8cd 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -8,7 +8,7 @@ "sideEffects": true, "jsdelivr": "dist/logic-flow.min.js", "license": "Apache-2.0", - "homepage": "https://docs.logic-flow.cn", + "homepage": "https://site.logic-flow.cn", "types": "types/index.d.ts", "repository": { "type": "git", diff --git a/packages/engine/README.md b/packages/engine/README.md new file mode 100644 index 000000000..fa6eef4e5 --- /dev/null +++ b/packages/engine/README.md @@ -0,0 +1,22 @@ +# engine + +一个可以在JavaScript环境执行的流程引擎 + +## 使用方式 + +```js +import LogicFlowEngine from '@logicflow/engine'; + +const flowModel = new LogicFlowEngine({ + graphData: { + nodes: [], + edges: [], + }, + global: { + // 全局数据 + } +}); + +flowModel.execute(); + +``` diff --git a/packages/engine/__test__/index.test.js b/packages/engine/__test__/index.test.js new file mode 100644 index 000000000..8ba330688 --- /dev/null +++ b/packages/engine/__test__/index.test.js @@ -0,0 +1,55 @@ +import Engine from '../src/index'; + +describe('流程引擎', () => { + // test('初始化流程引擎', () => { + // const engine = new Engine(); + // expect(engine).toBeInstanceOf(Engine); + // }); + // test('加载图数据', async () => { + // const engine = new Engine(); + // const flowData = { + // graphData: { + // nodes: [ + // { + // id: 'node1', + // type: 'StartNode', + // } + // ] + // }, + // global: {}, + // } + // const flowModel = engine.load(flowData); + // expect(flowModel.tasks.length).toBe(flowData.graphData.nodes.length); + // }); + test('执行流程', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + }, + { + id: 'node2', + type: 'TaskNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + } + ] + }, + global: {}, + } + engine.load(flowData); + const result = await engine.execute(); + expect(result).toHaveProperty('executionId'); + expect(result).toHaveProperty('taskId'); + expect(result).toHaveProperty('nodeId'); + expect(result.nodeId).toBe('node2'); + }) +}); diff --git a/packages/engine/babel.config.js b/packages/engine/babel.config.js new file mode 100644 index 000000000..73445bdcb --- /dev/null +++ b/packages/engine/babel.config.js @@ -0,0 +1,8 @@ +module.exports = { + presets: [ + ['@babel/preset-env', { targets: { node: 'current' } }], + '@babel/preset-typescript', + ], + plugins: [ + ], +}; diff --git a/packages/engine/package.json b/packages/engine/package.json new file mode 100644 index 000000000..aa7bde921 --- /dev/null +++ b/packages/engine/package.json @@ -0,0 +1,86 @@ +{ + "name": "@logicflow/engine", + "version": "0.0.1", + "description": "a process engine for javascript", + "main": "cjs/index.js", + "module": "es/index.js", + "homepage": "https://docs.logic-flow.cn", + "types": "es/index.d.ts", + "repository": { + "type": "git", + "url": "https://github.com/didi/LogicFlow", + "directory": "packages/engine" + }, + "license": "Apache-2.0", + "scripts": { + "dev": "tsc --module esnext --target es5 --outDir ./es -d -w", + "types": "tsc -d --declarationDir ./types --outDir temp && rimraf -R temp", + "build:esm": "tsc --module esnext --target es5 --outDir ./es -d", + "build:cjs": "tsc --module commonjs --target es5 --outDir ./cjs", + "build": "rimraf ./es ./cjs ./lib && npm run build:esm & npm run build:cjs", + "publish-lib": "npm run build && npm publish", + "test": "jest" + }, + "author": "", + "files": [ + "cjs", + "es", + "lib", + "types", + "readme.md" + ], + "dependencies": { + "@logicflow/core": "^1.2.9", + "uuid": "^8.2.0" + }, + "standard-version": { + "skip": { + "tag": true, + "commit": true + } + }, + "keywords": [ + "logicflow", + "workflow", + "process", + "diagram" + ], + "devDependencies": { + "@babel/core": "^7.9.0", + "@babel/plugin-proposal-decorators": "^7.12.1", + "@babel/plugin-syntax-jsx": "^7.8.3", + "@babel/plugin-transform-react-jsx": "^7.10.4", + "@babel/preset-env": "^7.9.5", + "@babel/preset-typescript": "^7.9.0", + "@commitlint/config-conventional": "^8.3.4", + "@typescript-eslint/eslint-plugin": "^4.7.0", + "@typescript-eslint/parser": "^3.2.0", + "babel-core": "^7.0.0-bridge.0", + "babel-loader": "^8.1.0", + "babel-plugin-import": "^1.13.0", + "case-sensitive-paths-webpack-plugin": "^2.3.0", + "core-js": "^3.6.5", + "cross-env": "^7.0.2", + "css-loader": "^4.2.1", + "eslint": "^7.0.0", + "eslint-config-airbnb-typescript": "^9.0.0", + "eslint-plugin-import": "^2.22.0", + "eslint-plugin-jsx-a11y": "^6.3.1", + "eslint-plugin-react": "^7.20.6", + "eslint-plugin-standard": "^4.0.1", + "eslint-webpack-plugin": "^2.1.0", + "html-webpack-plugin": "^4.2.0", + "less-loader": "^6.0.0", + "prettier": "^2.2.1", + "rimraf": "^3.0.2", + "standard-version": "^9.0.0", + "style-loader": "^1.2.0", + "typescript": "^3.8.3", + "url-loader": "^4.1.0", + "webpack": "^4.43.0", + "webpack-bundle-analyzer": "^4.1.0", + "webpack-cli": "^3.3.11", + "webpack-dev-server": "^3.10.3" + }, + "gitHead": "ab8cc6126e9dfa5c4fb18037538a374d3a0b0521" +} diff --git a/packages/engine/src/EventEmitter.ts b/packages/engine/src/EventEmitter.ts new file mode 100644 index 000000000..29f89f069 --- /dev/null +++ b/packages/engine/src/EventEmitter.ts @@ -0,0 +1,64 @@ +export default class EventEmitter { + _events: Record; + constructor() { + this._events = {}; + } + on(evKey, callback, once = false) { + evKey = evKey.trim(); + if (!this._events[evKey]) { + this._events[evKey] = []; + } + this._events[evKey].push({ + callback, + once: !!once, + }); + } + emit(evt, eventArgs) { + const events = this._events[evt] || []; + // 实际的处理 emit 方法 + const doEmit = (es) => { + let { length } = es; + for (let i = 0; i < length; i++) { + if (!es[i]) { + // eslint-disable-next-line no-continue + continue; + } + const { callback, once } = es[i]; + if (once) { + es.splice(i, 1); + if (es.length === 0) { + delete this._events[evt]; + } + length--; + i--; + } + callback.apply(this, [eventArgs]); + } + }; + doEmit(events); + } + off(evt, callback) { + if (!evt) { + // evt 为空全部清除 + this._events = {}; + } + if (!callback) { + // evt 存在,callback 为空,清除事件所有方法 + delete this._events[evt]; + } else { + // evt 存在,callback 存在,清除匹配的 + const events = this._events[evt] || []; + let { length } = events; + for (let i = 0; i < length; i++) { + if (events[i].callback === callback) { + events.splice(i, 1); + length--; + i--; + } + } + if (events.length === 0) { + delete this._events[evt]; + } + } + } +} diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts new file mode 100644 index 000000000..90c0625e5 --- /dev/null +++ b/packages/engine/src/FlowModel.ts @@ -0,0 +1,157 @@ +// import type { GraphConfigData } from '@logicflow/core'; +import type { BaseNodeInterface } from './nodes/BaseNode'; +import { + ErrorCode, + getErrorMsg, + getWarningMsg, + WarningCode, +} from './constant/LogCode'; +import { createExecId } from './util/ID'; +import Scheduler from './Scheduler'; +import NodeManager from './NodeManager'; + +export type TaskUnit = { + executionId: string; + taskId: string; + nodeId: string; +}; + +export type FlowResult = { + result?: Record; +} & TaskUnit; + +export type TaskParams = { + executionId?: string; + taskId?: string; + nodeId?: string; +}; + +export type ExecParams = { + callback?: (result: FlowResult) => void; +} & TaskParams; + +export default class FlowModel { + nodeModelMap: Map; + executionId: string; + scheduler: Scheduler; + NodeManager: NodeManager; + executeQueue: ExecParams[]; + executingMap: Map; + isRunning: boolean; + constructor(nodeModelMap) { + // 流程包含的节点类型 + this.nodeModelMap = nodeModelMap; + // 需要执行的队列 + this.executeQueue = []; + // 执行中的任务 + this.executingMap = new Map(); + this.isRunning = false; + this.NodeManager = new NodeManager(); + this.scheduler = new Scheduler({ + NodeManager: this.NodeManager, + }); + this.scheduler.on('taskFinished', (result) => { + this.onTaskFinished(result); + }); + + } + onTaskFinished(result) { + const { executionId, taskId, nodeId } = result; + const execParams = this.executingMap.get(executionId); + if (!execParams) { + return; + } + const { callback } = execParams; + if (callback) { + callback(result); + } + this.executingMap.delete(executionId); + } + load(graphData) { + const { nodes = [], edges = [] } = graphData; + nodes.forEach((node) => { + const Task = this.getNodeModel(node.type); + if (Task) { + const task = new Task(node); + this.NodeManager.addTask(task); + } else { + console.warn(`未识别的节点类型: ${node.type}`); + } + }); + edges.forEach((edge) => { + const sourceTask = this.NodeManager.getTask(edge.sourceNodeId); + const targetTask = this.NodeManager.getTask(edge.targetNodeId); + if (sourceTask) { + sourceTask.outgoing.push({ + id: edge.id, + condition: edge.properties, + target: edge.targetNodeId, + }); + } + if (targetTask && targetTask.baseType !== 'start') { + targetTask.incoming.push({ + id: edge.id, + condition: edge.properties, + source: edge.sourceNodeId, + }); + } + }); + } + /** + * 执行流程 + * 同一次执行,这次执行内部的节点执行顺序为并行。 + * 多次执行,多次执行之间为串行。 + * 允许一个流程多次执行,效率更高。 + * 例如: + * 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。 + * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 + * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 + */ + async execute(params ?: ExecParams) { + this.executeQueue.push(params); + if (this.isRunning) { + return; + } + this.isRunning = true; + this.createExecuteInstance(); + } + async createExecuteInstance() { + const execParams = this.executeQueue.shift(); + this.executionId = createExecId(); + console.log('createExecuteInstance', execParams); + const startNodes = this.NodeManager.getStartTasks(); + startNodes.forEach((startNode) => { + this.scheduler.addTask({ + executionId: this.executionId, + taskId: startNode.taskId, + nodeId: startNode.nodeId, + }); + this.scheduler.run({ + executionId: this.executionId, + taskId: startNode.taskId, + nodeId: startNode.nodeId, + }); + }); + } + /** + * 在没有指定开始节点的情况下,创建一个新的流程实例,从流程的所有开始节点开始执行。 + */ + // async createInstance() { + // this.executionId = createExecId(); + // const startNodes = this.NodeManager.getStartTasks(); + // startNodes.forEach((startNode) => { + // this.scheduler.addTask({ + // executionId: this.executionId, + // taskId: startNode.nodeId, + // nodeId: startNode.nodeId, + // }); + // }); + // const result = await this.scheduler.run({ + // executionId: this.executionId, + // }); + // return result; + // } + getNodeModel(type) { + return this.nodeModelMap.get(type); + } +} diff --git a/packages/engine/src/NodeManager.ts b/packages/engine/src/NodeManager.ts new file mode 100644 index 000000000..2c10f5db1 --- /dev/null +++ b/packages/engine/src/NodeManager.ts @@ -0,0 +1,26 @@ +import type { BaseNodeInterface } from './nodes/BaseNode'; +import { BASE_START_NODE } from './constant/constant'; + +export default class NodeManager { + taskMap: Map; + tasks: BaseNodeInterface[]; + startTasks: BaseNodeInterface[]; + constructor() { + this.taskMap = new Map(); + this.tasks = []; + this.startTasks = []; + } + addTask(task: BaseNodeInterface) { + this.tasks.push(task); + this.taskMap.set(task.nodeId, task); + if (task.baseType === BASE_START_NODE) { + this.startTasks.push(task); + } + } + getTask(taskId: string) { + return this.taskMap.get(taskId); + } + getStartTasks() { + return this.startTasks; + } +} diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts new file mode 100644 index 000000000..016310cca --- /dev/null +++ b/packages/engine/src/Scheduler.ts @@ -0,0 +1,82 @@ +import EventEmitter from './EventEmitter'; +import { TaskUnit } from './FlowModel'; +import NodeManager from './NodeManager'; + +/** + * 调度器 + * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 + * 当一个executionId对于的 + */ +export default class Scheduler extends EventEmitter { + taskQueueMap: Map; + taskRunningMap: Map; + NodeManager: NodeManager; + currentTask: TaskUnit | null; + constructor(config) { + super(); + this.taskQueueMap = new Map(); + this.taskRunningMap = new Map(); + this.NodeManager = config.NodeManager; + this.currentTask = null; + } + run(data) { + const { executionId } = data; + let currentTaskQueue = this.taskQueueMap.get(executionId); + if (!currentTaskQueue) { + currentTaskQueue = []; + this.taskQueueMap.set(executionId, currentTaskQueue); + } + const currentTask = currentTaskQueue.shift(); + if (currentTask) { + this.addRunningTask(currentTask); + } + } + addRunningTask(taskUnit: TaskUnit) { + const { executionId } = taskUnit; + if (!this.taskRunningMap.has(executionId)) { + this.taskRunningMap.set(executionId, []); + } + this.taskRunningMap.get(executionId).push(taskUnit); + this.exec(taskUnit); + } + async exec(taskUnit: TaskUnit) { + const task = this.NodeManager.getTask(taskUnit.nodeId); + if (!task) { + throw new Error(`task ${taskUnit.nodeId} not found`); + } + const result = await task.execute({ + ...taskUnit, + next: this.next.bind(this), + stop: this.stop.bind(this), + }); + return result; + } + + async next(data) { + console.log('next', data); + if (data.outgoing) { + data.outgoing.forEach((item) => { + this.run({ + executionId: data.executionId, + nodeId: item.target, + }); + }); + } else { + // this.emit('taskFinished') + } + } + + // 流程执行过程中出错,停止执行 + async stop(data) { + console.log('stop', data); + } + addTask(taskUnit: TaskUnit) { + console.log('addTask', taskUnit); + const { executionId } = taskUnit; + if (!this.taskQueueMap.has(executionId)) { + this.taskQueueMap.set(executionId, []); + } + const currentTaskQueue = this.taskQueueMap.get(executionId); + currentTaskQueue.push(taskUnit); + } +} diff --git a/packages/engine/src/constant/LogCode.ts b/packages/engine/src/constant/LogCode.ts new file mode 100644 index 000000000..1343ad0e8 --- /dev/null +++ b/packages/engine/src/constant/LogCode.ts @@ -0,0 +1,20 @@ +export enum ErrorCode { + // 模型数据错误 + NONE_START_NODE = 1000, +} + +export enum WarningCode { + NONE_START_NODE_IN_DATA = 2000, +} + +const errorMsgMapCn = { + [ErrorCode.NONE_START_NODE]: '未找到入度为0的节点', +}; + +const warningMsgMapCn = { + [WarningCode.NONE_START_NODE_IN_DATA]: '初始化数据中未找到入度为0的节点', +}; + +export const getErrorMsg = (code: ErrorCode) => `error[${code}]: ${errorMsgMapCn[code]}`; + +export const getWarningMsg = (code: WarningCode) => `warning[${code}]: ${warningMsgMapCn[code]}`; diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts new file mode 100644 index 000000000..5cd4a120a --- /dev/null +++ b/packages/engine/src/constant/constant.ts @@ -0,0 +1,4 @@ +// baseType +export const BASE_START_NODE = 'start'; + +// 事件名称 diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts new file mode 100644 index 000000000..c5399dea9 --- /dev/null +++ b/packages/engine/src/index.ts @@ -0,0 +1,64 @@ +import type { GraphConfigData } from '@logicflow/core'; +import FlowModel, { TaskParams } from './FlowModel'; +import StartNode from './nodes/StartNode'; +import TaskNode from './nodes/TaskNode'; + +export default class Engine { + global: Record; + graphData: GraphConfigData; + modelMap: Map; + flowModel: FlowModel; + constructor() { + this.modelMap = new Map(); + // register node + this.register({ + type: StartNode.nodeTypeName, + model: StartNode, + }); + this.register({ + type: TaskNode.nodeTypeName, + model: TaskNode, + }); + } + /** + * 注册节点 + * @param nodeConfig { type: 'custom-node', model: Class } + */ + register(nodeConfig) { + this.modelMap.set(nodeConfig.type, nodeConfig.model); + } + /** + * 加载流程图数据 + */ + load({ graphData }) { + this.flowModel = new FlowModel(this.modelMap); + this.flowModel.load(graphData); + return this.flowModel; + } + /** + * 执行流程,允许多次调用。 + */ + async execute(execParam?: TaskParams) { + // const result = await this.flowModel.execute(flowResult); + // return result; + return new Promise((resolve) => { + if (!execParam) { + execParam = {}; + } + this.flowModel.execute({ + ...execParam, + callback: (result) => { + resolve(result); + }, + }); + }); + } +} + +export { + StartNode, +}; + +export type { + TaskParams, +}; diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts new file mode 100644 index 000000000..1fb73f43d --- /dev/null +++ b/packages/engine/src/nodes/BaseNode.ts @@ -0,0 +1,52 @@ +import { createTaskId } from '../util/ID'; + +export interface BaseNodeInterface { + outgoing: Record[]; + incoming: Record[]; + nodeId: string; + taskId: string; + type: string; + readonly baseType: string; + execute(taskUnit): Promise; +} + +export default class BaseNode implements BaseNodeInterface { + static nodeTypeName = 'BaseNode'; + outgoing: Record[]; + incoming: Record[]; + nodeId: string; + taskId: string; + type: string; + readonly baseType: string; + constructor(nodeConfig) { + this.outgoing = []; + this.incoming = []; + this.nodeId = nodeConfig.id; + this.type = nodeConfig.type; + this.baseType = 'base'; + } + /** + * 节点的每一次执行都会生成一个唯一的taskId + */ + async execute(params) { + console.log('BaseNode execute'); + this.taskId = createTaskId(); + const r = await this.action(); + console.log('outgoing', this.outgoing); + const outgoing = await this.getOutgoing(); + r && params.next({ + executionId: params.executionId, + taskId: this.taskId, + nodeId: this.nodeId, + outgoing, + }); + // return r; + } + async getOutgoing() { + return this.outgoing; + } + + async action() { + return true; + } +} diff --git a/packages/engine/src/nodes/StartNode.ts b/packages/engine/src/nodes/StartNode.ts new file mode 100644 index 000000000..6fb4d82b9 --- /dev/null +++ b/packages/engine/src/nodes/StartNode.ts @@ -0,0 +1,9 @@ +import BaseNode from './BaseNode'; + +export default class StartNode extends BaseNode { + static nodeTypeName = 'StartNode'; + readonly baseType = 'start'; + async action() { + console.log('StartNode run'); + } +} diff --git a/packages/engine/src/nodes/TaskNode.ts b/packages/engine/src/nodes/TaskNode.ts new file mode 100644 index 000000000..84d1af8b0 --- /dev/null +++ b/packages/engine/src/nodes/TaskNode.ts @@ -0,0 +1,9 @@ +import BaseNode from './BaseNode'; + +export default class TaskNode extends BaseNode { + static nodeTypeName = 'TaskNode'; + readonly baseType = 'task'; + async execute() { + console.log('TaskNode run'); + } +} diff --git a/packages/engine/src/util/ID.ts b/packages/engine/src/util/ID.ts new file mode 100644 index 000000000..a58491d36 --- /dev/null +++ b/packages/engine/src/util/ID.ts @@ -0,0 +1,11 @@ +import { v4 as uuidv4 } from 'uuid'; + +export const createExecId = (): string => { + const uuid = uuidv4(); + return `exec-${uuid}`; +}; + +export const createTaskId = (): string => { + const uuid = uuidv4(); + return `task-${uuid}`; +}; diff --git a/packages/engine/tsconfig.json b/packages/engine/tsconfig.json new file mode 100644 index 000000000..d5a17a097 --- /dev/null +++ b/packages/engine/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.json", + "include": [ + "src/*", "src/tools/auto-layout", "src/tools/snapshot", + ], + "exclude": [ + "node_modules", + "**/*.spec.ts", + "**/*.test.js", + "**/*.d.ts" + ] +} diff --git a/packages/extension/package.json b/packages/extension/package.json index c690bba23..77bc0e196 100644 --- a/packages/extension/package.json +++ b/packages/extension/package.json @@ -4,7 +4,7 @@ "description": "LogicFlow extension", "main": "cjs/index.js", "module": "es/index.js", - "homepage": "https://docs.logic-flow.cn", + "homepage": "https://site.logic-flow.cn", "types": "es/index.d.ts", "repository": { "type": "git", diff --git a/tsconfig.json b/tsconfig.json index 9eb7db4fb..eb51aee08 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -21,6 +21,8 @@ "exclude": [ "node_modules", "**/*.spec.ts", + "**/*.test.js", + ".eslintrc.js", "**/*.d.ts", "packages/core/types" ] diff --git a/yarn.lock b/yarn.lock index 47e0c06ba..9a613e5ee 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15144,7 +15144,7 @@ prompts@2.4.0: kleur "^3.0.3" sisteransi "^1.0.5" -prompts@^2.0.1: +prompts@2.4.2, prompts@^2.0.1: version "2.4.2" resolved "https://registry.yarnpkg.com/prompts/-/prompts-2.4.2.tgz#7b57e73b3a48029ad10ebd44f74b01722a4cb069" integrity sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q== From c2a704449772445387f324924491f15e526dfc4e Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Mon, 10 Jul 2023 16:47:40 +0800 Subject: [PATCH 02/11] feat(engine): added workflow scheduling feature --- packages/engine/__test__/index.test.js | 3 - packages/engine/src/FlowModel.ts | 120 ++++++++++++++++------- packages/engine/src/Scheduler.ts | 92 ++++++++++------- packages/engine/src/constant/LogCode.ts | 2 + packages/engine/src/constant/constant.ts | 10 +- packages/engine/src/index.ts | 3 +- packages/engine/src/nodes/BaseNode.ts | 57 +++++++++-- packages/engine/src/nodes/StartNode.ts | 1 + packages/engine/src/nodes/TaskNode.ts | 3 +- 9 files changed, 206 insertions(+), 85 deletions(-) diff --git a/packages/engine/__test__/index.test.js b/packages/engine/__test__/index.test.js index 8ba330688..2827d1a9e 100644 --- a/packages/engine/__test__/index.test.js +++ b/packages/engine/__test__/index.test.js @@ -48,8 +48,5 @@ describe('流程引擎', () => { engine.load(flowData); const result = await engine.execute(); expect(result).toHaveProperty('executionId'); - expect(result).toHaveProperty('taskId'); - expect(result).toHaveProperty('nodeId'); - expect(result.nodeId).toBe('node2'); }) }); diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 90c0625e5..c1ef55ac2 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -1,18 +1,22 @@ // import type { GraphConfigData } from '@logicflow/core'; -import type { BaseNodeInterface } from './nodes/BaseNode'; +import type BaseNode from './nodes/BaseNode'; +import type { NodeConfig, NodeConstructor } from './nodes/BaseNode'; import { ErrorCode, getErrorMsg, getWarningMsg, WarningCode, } from './constant/LogCode'; +import { + EVENT_INSTANCE_COMPLETE, +} from './constant/constant'; import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; import NodeManager from './NodeManager'; export type TaskUnit = { executionId: string; - taskId: string; + taskId?: string; nodeId: string; }; @@ -31,65 +35,110 @@ export type ExecParams = { } & TaskParams; export default class FlowModel { - nodeModelMap: Map; + /** + * 流程支持的节点类型 + */ + nodeModelMap: Map; + /** + * 每一次执行流程都会生成一个唯一的executionId。 + */ executionId: string; + /** + * 调度器,用于调度节点的执行。 + */ scheduler: Scheduler; NodeManager: NodeManager; + /** + * 待执行的队列,当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 + */ executeQueue: ExecParams[]; - executingMap: Map; + /** + * 当前正在执行。当监听到调度器执行完成时,出触发执行参数中的回调,告知外部执行完成。 + */ + executingInstance: ExecParams; + /** + * 当前流程模型中的所有节点,边会被转换成节点的incoming和outgoing属性。 + */ + nodeMap: Map; + /** + * 当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 + */ isRunning: boolean; - constructor(nodeModelMap) { + /** + * 开始节点类型,在执行流程时,会从这些节点开始执行。 + */ + startNodeType: string; + /** + * 当前流程中开始节点组成的数组。 + */ + startNodes: NodeConfig[] = []; + constructor(nodeModelMap: Map) { // 流程包含的节点类型 this.nodeModelMap = nodeModelMap; // 需要执行的队列 this.executeQueue = []; // 执行中的任务 - this.executingMap = new Map(); + this.executingInstance = null; + this.nodeMap = new Map(); this.isRunning = false; this.NodeManager = new NodeManager(); this.scheduler = new Scheduler({ - NodeManager: this.NodeManager, + flowModel: this, }); - this.scheduler.on('taskFinished', (result) => { + this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => { this.onTaskFinished(result); }); - } onTaskFinished(result) { - const { executionId, taskId, nodeId } = result; - const execParams = this.executingMap.get(executionId); - if (!execParams) { + const { executionId } = result; + if (executionId !== this.executionId) { return; } - const { callback } = execParams; + const { callback } = this.executingInstance; if (callback) { callback(result); } - this.executingMap.delete(executionId); + this.executingInstance = null; + if (this.executeQueue.length) { + this.createExecuteInstance(); + } else { + this.isRunning = false; + } + } + setStartNodeType(startNodeType) { + this.startNodeType = startNodeType; } load(graphData) { const { nodes = [], edges = [] } = graphData; nodes.forEach((node) => { - const Task = this.getNodeModel(node.type); - if (Task) { - const task = new Task(node); - this.NodeManager.addTask(task); + if (this.nodeModelMap.has(node.type)) { + const nodeConfig = { + id: node.id, + type: node.type, + properties: node.properties, + incoming: [], + outgoing: [], + }; + this.nodeMap.set(node.id, nodeConfig); + if (node.type === this.startNodeType) { + this.startNodes.push(nodeConfig); + } } else { console.warn(`未识别的节点类型: ${node.type}`); } }); edges.forEach((edge) => { - const sourceTask = this.NodeManager.getTask(edge.sourceNodeId); - const targetTask = this.NodeManager.getTask(edge.targetNodeId); - if (sourceTask) { - sourceTask.outgoing.push({ + const sourceNode = this.nodeMap.get(edge.sourceNodeId); + const targetNode = this.nodeMap.get(edge.targetNodeId); + if (sourceNode) { + sourceNode.outgoing.push({ id: edge.id, condition: edge.properties, target: edge.targetNodeId, }); } - if (targetTask && targetTask.baseType !== 'start') { - targetTask.incoming.push({ + if (targetNode && targetNode.type !== this.startNodeType) { + targetNode.incoming.push({ id: edge.id, condition: edge.properties, source: edge.sourceNodeId, @@ -118,19 +167,14 @@ export default class FlowModel { async createExecuteInstance() { const execParams = this.executeQueue.shift(); this.executionId = createExecId(); - console.log('createExecuteInstance', execParams); - const startNodes = this.NodeManager.getStartTasks(); - startNodes.forEach((startNode) => { + this.executingInstance = execParams; + this.startNodes.forEach((startNode) => { this.scheduler.addTask({ executionId: this.executionId, - taskId: startNode.taskId, - nodeId: startNode.nodeId, - }); - this.scheduler.run({ - executionId: this.executionId, - taskId: startNode.taskId, - nodeId: startNode.nodeId, + nodeId: startNode.id, }); + // 所有的开始节点都执行 + this.scheduler.run(this.executionId); }); } /** @@ -151,7 +195,11 @@ export default class FlowModel { // }); // return result; // } - getNodeModel(type) { - return this.nodeModelMap.get(type); + + createTask(nodeId: string) { + const nodeConfig = this.nodeMap.get(nodeId); + const NodeModel = this.nodeModelMap.get(nodeConfig.type); + const model = new NodeModel(nodeConfig); + return model; } } diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index 016310cca..50c635912 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -1,77 +1,103 @@ import EventEmitter from './EventEmitter'; import { TaskUnit } from './FlowModel'; -import NodeManager from './NodeManager'; +import type FlowModel from './FlowModel'; +import { EVENT_INSTANCE_COMPLETE, FlowStatus } from './constant/constant'; +import type { NextTaskUnit } from './nodes/BaseNode'; +import { createTaskId } from './util/ID'; + +type TaskUnitMap = Map; /** * 调度器 * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 - * 当一个executionId对于的 */ export default class Scheduler extends EventEmitter { taskQueueMap: Map; - taskRunningMap: Map; - NodeManager: NodeManager; + taskRunningMap: Map; + flowModel: FlowModel; currentTask: TaskUnit | null; constructor(config) { super(); this.taskQueueMap = new Map(); this.taskRunningMap = new Map(); - this.NodeManager = config.NodeManager; + this.flowModel = config.flowModel; this.currentTask = null; } - run(data) { - const { executionId } = data; - let currentTaskQueue = this.taskQueueMap.get(executionId); - if (!currentTaskQueue) { - currentTaskQueue = []; - this.taskQueueMap.set(executionId, currentTaskQueue); - } - const currentTask = currentTaskQueue.shift(); + run(executionId) { + const currentTask = this.getNextTask(executionId); if (currentTask) { - this.addRunningTask(currentTask); + const task = this.addRunningTask(currentTask); + this.exec(task); + } else if (!this.hasRunningTask(executionId)) { + // 当一个流程在taskQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 + this.emit(EVENT_INSTANCE_COMPLETE, { + executionId, + status: FlowStatus.COMPLETED, + }); } } addRunningTask(taskUnit: TaskUnit) { const { executionId } = taskUnit; + const taskId = createTaskId(); + taskUnit.taskId = taskId; if (!this.taskRunningMap.has(executionId)) { - this.taskRunningMap.set(executionId, []); + const runningMap = new Map(); + this.taskRunningMap.set(executionId, runningMap); } - this.taskRunningMap.get(executionId).push(taskUnit); - this.exec(taskUnit); + this.taskRunningMap.get(executionId).set(taskId, taskUnit); + return taskUnit; } - async exec(taskUnit: TaskUnit) { - const task = this.NodeManager.getTask(taskUnit.nodeId); - if (!task) { - throw new Error(`task ${taskUnit.nodeId} not found`); + removeRunningTask(taskUnit: TaskUnit) { + const { executionId, taskId } = taskUnit; + if (!taskId) return; + const runningMap = this.taskRunningMap.get(executionId); + if (!runningMap) return; + runningMap.delete(taskId); + } + hasRunningTask(executionId) { + const runningMap = this.taskRunningMap.get(executionId); + if (!runningMap) return false; + if (runningMap.size === 0) { + this.taskRunningMap.delete(executionId); + return false; } - const result = await task.execute({ - ...taskUnit, + return true; + } + async exec(taskUnit: TaskUnit) { + const model = this.flowModel.createTask(taskUnit.nodeId); + model.execute({ + executionId: taskUnit.executionId, + taskId: taskUnit.taskId, + nodeId: taskUnit.nodeId, next: this.next.bind(this), - stop: this.stop.bind(this), }); - return result; } - async next(data) { - console.log('next', data); - if (data.outgoing) { + async next(data: NextTaskUnit) { + if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { - this.run({ + this.addTask({ executionId: data.executionId, nodeId: item.target, }); }); - } else { - // this.emit('taskFinished') } + this.removeRunningTask(data); + this.run(data.executionId); + } + getNextTask(executionId) { + const currentTaskQueue = this.taskQueueMap.get(executionId); + if (!currentTaskQueue || currentTaskQueue.length === 0) { + return null; + } + const currentTask = currentTaskQueue.shift(); + return currentTask; } - // 流程执行过程中出错,停止执行 async stop(data) { console.log('stop', data); } addTask(taskUnit: TaskUnit) { - console.log('addTask', taskUnit); const { executionId } = taskUnit; if (!this.taskQueueMap.has(executionId)) { this.taskQueueMap.set(executionId, []); diff --git a/packages/engine/src/constant/LogCode.ts b/packages/engine/src/constant/LogCode.ts index 1343ad0e8..d6b5c59cc 100644 --- a/packages/engine/src/constant/LogCode.ts +++ b/packages/engine/src/constant/LogCode.ts @@ -5,6 +5,7 @@ export enum ErrorCode { export enum WarningCode { NONE_START_NODE_IN_DATA = 2000, + START_NODE_INCOMING = 2001, } const errorMsgMapCn = { @@ -13,6 +14,7 @@ const errorMsgMapCn = { const warningMsgMapCn = { [WarningCode.NONE_START_NODE_IN_DATA]: '初始化数据中未找到入度为0的节点', + [WarningCode.START_NODE_INCOMING]: '开始节点不允许被连入', }; export const getErrorMsg = (code: ErrorCode) => `error[${code}]: ${errorMsgMapCn[code]}`; diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts index 5cd4a120a..3b6acc101 100644 --- a/packages/engine/src/constant/constant.ts +++ b/packages/engine/src/constant/constant.ts @@ -1,4 +1,12 @@ // baseType export const BASE_START_NODE = 'start'; -// 事件名称 +// event name +export const EVENT_INSTANCE_COMPLETE = 'instance:complete'; + +// flow status +export enum FlowStatus { + COMPLETED = 'completed', + RUNNING = 'running', + ERROR = 'error', +} diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index c5399dea9..950dcc553 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -30,8 +30,9 @@ export default class Engine { /** * 加载流程图数据 */ - load({ graphData }) { + load({ graphData, startNodeType = 'StartNode' }) { this.flowModel = new FlowModel(this.modelMap); + this.flowModel.setStartNodeType(startNodeType); this.flowModel.load(graphData); return this.flowModel; } diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 1fb73f43d..338e7fb98 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -10,33 +10,70 @@ export interface BaseNodeInterface { execute(taskUnit): Promise; } +export type NodeConstructor = { + new (config: NodeConfig): BaseNode; +}; + +export type IncomingConfig = { + id: string; + condition?: Record; + source: string; +}; + +export type OutgoingConfig = { + id: string; + condition?: Record; + target: string; +}; + +export type NodeConfig = { + id: string; + type: string; + properties?: Record; + incoming: IncomingConfig[]; + outgoing: OutgoingConfig[]; +}; + +export type NextTaskUnit = { + executionId: string; + nodeId: string; + taskId: string; + outgoing: OutgoingConfig[]; +}; + +export type ExecParams = { + executionId: string; + taskId: string; + nodeId: string; + next: (data: NextTaskUnit) => void; +}; + export default class BaseNode implements BaseNodeInterface { static nodeTypeName = 'BaseNode'; - outgoing: Record[]; - incoming: Record[]; + outgoing: OutgoingConfig[]; + incoming: IncomingConfig[]; + properties?: Record; nodeId: string; taskId: string; type: string; readonly baseType: string; - constructor(nodeConfig) { - this.outgoing = []; - this.incoming = []; + constructor(nodeConfig: NodeConfig) { + this.outgoing = nodeConfig.outgoing; + this.incoming = nodeConfig.incoming; this.nodeId = nodeConfig.id; this.type = nodeConfig.type; + this.properties = nodeConfig.properties; this.baseType = 'base'; } /** * 节点的每一次执行都会生成一个唯一的taskId */ - async execute(params) { - console.log('BaseNode execute'); - this.taskId = createTaskId(); + async execute(params: ExecParams) { const r = await this.action(); - console.log('outgoing', this.outgoing); const outgoing = await this.getOutgoing(); r && params.next({ executionId: params.executionId, - taskId: this.taskId, + taskId: params.taskId, nodeId: this.nodeId, outgoing, }); diff --git a/packages/engine/src/nodes/StartNode.ts b/packages/engine/src/nodes/StartNode.ts index 6fb4d82b9..a9c2793fd 100644 --- a/packages/engine/src/nodes/StartNode.ts +++ b/packages/engine/src/nodes/StartNode.ts @@ -5,5 +5,6 @@ export default class StartNode extends BaseNode { readonly baseType = 'start'; async action() { console.log('StartNode run'); + return true; } } diff --git a/packages/engine/src/nodes/TaskNode.ts b/packages/engine/src/nodes/TaskNode.ts index 84d1af8b0..b58be057d 100644 --- a/packages/engine/src/nodes/TaskNode.ts +++ b/packages/engine/src/nodes/TaskNode.ts @@ -3,7 +3,8 @@ import BaseNode from './BaseNode'; export default class TaskNode extends BaseNode { static nodeTypeName = 'TaskNode'; readonly baseType = 'task'; - async execute() { + async action() { console.log('TaskNode run'); + return true; } } From d73aa46675f35bae5362e6024232d44cbfe5bcaa Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Tue, 11 Jul 2023 10:56:57 +0800 Subject: [PATCH 03/11] feat: implemented execution record query functionality --- packages/engine/__test__/index.test.js | 44 ++++++++--------- packages/engine/__test__/recorder.test.js | 56 ++++++++++++++++++++++ packages/engine/src/FlowModel.ts | 58 +++++++++-------------- packages/engine/src/Scheduler.ts | 21 +++++++- packages/engine/src/index.ts | 38 ++++++++++++--- packages/engine/src/nodes/BaseNode.ts | 15 ++++-- packages/engine/src/nodes/StartNode.ts | 4 -- packages/engine/src/nodes/TaskNode.ts | 4 -- packages/engine/src/recorder/index.ts | 56 ++++++++++++++++++++++ packages/engine/src/util/global.ts | 22 +++++++++ packages/engine/src/util/storage.ts | 40 ++++++++++++++++ 11 files changed, 281 insertions(+), 77 deletions(-) create mode 100644 packages/engine/__test__/recorder.test.js create mode 100644 packages/engine/src/recorder/index.ts create mode 100644 packages/engine/src/util/global.ts create mode 100644 packages/engine/src/util/storage.ts diff --git a/packages/engine/__test__/index.test.js b/packages/engine/__test__/index.test.js index 2827d1a9e..2f5e7e6b5 100644 --- a/packages/engine/__test__/index.test.js +++ b/packages/engine/__test__/index.test.js @@ -1,27 +1,27 @@ import Engine from '../src/index'; describe('流程引擎', () => { - // test('初始化流程引擎', () => { - // const engine = new Engine(); - // expect(engine).toBeInstanceOf(Engine); - // }); - // test('加载图数据', async () => { - // const engine = new Engine(); - // const flowData = { - // graphData: { - // nodes: [ - // { - // id: 'node1', - // type: 'StartNode', - // } - // ] - // }, - // global: {}, - // } - // const flowModel = engine.load(flowData); - // expect(flowModel.tasks.length).toBe(flowData.graphData.nodes.length); - // }); - test('执行流程', async () => { + test('初始化流程引擎', () => { + const engine = new Engine(); + expect(engine).toBeInstanceOf(Engine); + }); + test('加载图数据', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + } + ] + }, + global: {}, + } + const flowModel = engine.load(flowData); + expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length); + }); + test('执行流程完成, 返回数据包含executionId', async () => { const engine = new Engine(); const flowData = { graphData: { @@ -48,5 +48,5 @@ describe('流程引擎', () => { engine.load(flowData); const result = await engine.execute(); expect(result).toHaveProperty('executionId'); - }) + }); }); diff --git a/packages/engine/__test__/recorder.test.js b/packages/engine/__test__/recorder.test.js new file mode 100644 index 000000000..563f0d787 --- /dev/null +++ b/packages/engine/__test__/recorder.test.js @@ -0,0 +1,56 @@ +import Engine from '../src/index'; + +describe('流程引擎执行记录器', () => { + test('获取流程执行记录', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: {} + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + } + ] + }, + global: {}, + } + engine.load(flowData); + const result = await engine.execute(); + const executionId = result.executionId; + /** + * [ + * { + * taskId: '', + * nodeId: '', + * instanceId: '', + * nodeType: '', + * timestamp: '', + * properties: {}, + * } + * ] + */ + const execution = await engine.getExecutionRecord(executionId); + expect(execution.length).toBe(2); + expect(execution[1]).toHaveProperty('taskId'); + expect(execution[1]).toHaveProperty('nodeId'); + expect(execution[1]).toHaveProperty('executionId'); + expect(execution[1]).toHaveProperty('nodeType'); + expect(execution[1]).toHaveProperty('timestamp'); + expect(execution[1]).toHaveProperty('properties'); + expect(execution[1].nodeId).toBe('node2'); + expect(execution[1].nodeType).toBe('TaskNode'); + }); +}); \ No newline at end of file diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index c1ef55ac2..632425791 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -1,12 +1,7 @@ -// import type { GraphConfigData } from '@logicflow/core'; -import type BaseNode from './nodes/BaseNode'; -import type { NodeConfig, NodeConstructor } from './nodes/BaseNode'; -import { - ErrorCode, - getErrorMsg, - getWarningMsg, - WarningCode, -} from './constant/LogCode'; +import type { + NodeConfig, + NodeConstructor, +} from './nodes/BaseNode'; import { EVENT_INSTANCE_COMPLETE, } from './constant/constant'; @@ -59,7 +54,7 @@ export default class FlowModel { /** * 当前流程模型中的所有节点,边会被转换成节点的incoming和outgoing属性。 */ - nodeMap: Map; + nodeConfigMap: Map; /** * 当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 */ @@ -72,18 +67,25 @@ export default class FlowModel { * 当前流程中开始节点组成的数组。 */ startNodes: NodeConfig[] = []; - constructor(nodeModelMap: Map) { + constructor({ + nodeModelMap, + recorder, + }: { + nodeModelMap: Map; + recorder?: any; + }) { // 流程包含的节点类型 this.nodeModelMap = nodeModelMap; // 需要执行的队列 this.executeQueue = []; // 执行中的任务 this.executingInstance = null; - this.nodeMap = new Map(); + this.nodeConfigMap = new Map(); this.isRunning = false; this.NodeManager = new NodeManager(); this.scheduler = new Scheduler({ flowModel: this, + recorder, }); this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => { this.onTaskFinished(result); @@ -119,7 +121,7 @@ export default class FlowModel { incoming: [], outgoing: [], }; - this.nodeMap.set(node.id, nodeConfig); + this.nodeConfigMap.set(node.id, nodeConfig); if (node.type === this.startNodeType) { this.startNodes.push(nodeConfig); } @@ -128,8 +130,8 @@ export default class FlowModel { } }); edges.forEach((edge) => { - const sourceNode = this.nodeMap.get(edge.sourceNodeId); - const targetNode = this.nodeMap.get(edge.targetNodeId); + const sourceNode = this.nodeConfigMap.get(edge.sourceNodeId); + const targetNode = this.nodeConfigMap.get(edge.targetNodeId); if (sourceNode) { sourceNode.outgoing.push({ id: edge.id, @@ -178,28 +180,14 @@ export default class FlowModel { }); } /** - * 在没有指定开始节点的情况下,创建一个新的流程实例,从流程的所有开始节点开始执行。 + * 创建节点实例 + * @param nodeId 节点Id + * @returns 节点示例 */ - // async createInstance() { - // this.executionId = createExecId(); - // const startNodes = this.NodeManager.getStartTasks(); - // startNodes.forEach((startNode) => { - // this.scheduler.addTask({ - // executionId: this.executionId, - // taskId: startNode.nodeId, - // nodeId: startNode.nodeId, - // }); - // }); - // const result = await this.scheduler.run({ - // executionId: this.executionId, - // }); - // return result; - // } - createTask(nodeId: string) { - const nodeConfig = this.nodeMap.get(nodeId); + const nodeConfig = this.nodeConfigMap.get(nodeId); const NodeModel = this.nodeModelMap.get(nodeConfig.type); - const model = new NodeModel(nodeConfig); - return model; + const task = new NodeModel(nodeConfig); + return task; } } diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index 50c635912..6ab120503 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -4,6 +4,7 @@ import type FlowModel from './FlowModel'; import { EVENT_INSTANCE_COMPLETE, FlowStatus } from './constant/constant'; import type { NextTaskUnit } from './nodes/BaseNode'; import { createTaskId } from './util/ID'; +import type Recorder from './recorder'; type TaskUnitMap = Map; @@ -15,12 +16,14 @@ export default class Scheduler extends EventEmitter { taskQueueMap: Map; taskRunningMap: Map; flowModel: FlowModel; + recorder: Recorder; currentTask: TaskUnit | null; constructor(config) { super(); this.taskQueueMap = new Map(); this.taskRunningMap = new Map(); this.flowModel = config.flowModel; + this.recorder = config.recorder; this.currentTask = null; } run(executionId) { @@ -65,14 +68,17 @@ export default class Scheduler extends EventEmitter { } async exec(taskUnit: TaskUnit) { const model = this.flowModel.createTask(taskUnit.nodeId); - model.execute({ + const r = await model.execute({ executionId: taskUnit.executionId, taskId: taskUnit.taskId, nodeId: taskUnit.nodeId, next: this.next.bind(this), }); + if (!r) this.cancel(taskUnit); + } + cancel(taskUnit: TaskUnit) { + // TODO: 流程执行异常中断 } - async next(data: NextTaskUnit) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { @@ -82,9 +88,20 @@ export default class Scheduler extends EventEmitter { }); }); } + this.saveTaskResult(data); this.removeRunningTask(data); this.run(data.executionId); } + saveTaskResult(data: NextTaskUnit) { + this.recorder.addTask({ + executionId: data.executionId, + taskId: data.taskId, + nodeId: data.nodeId, + nodeType: data.nodeType, + timestamp: Date.now(), + properties: data.properties, + }); + } getNextTask(executionId) { const currentTaskQueue = this.taskQueueMap.get(executionId); if (!currentTaskQueue || currentTaskQueue.length === 0) { diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 950dcc553..c14236096 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -2,14 +2,17 @@ import type { GraphConfigData } from '@logicflow/core'; import FlowModel, { TaskParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; +import Recorder from './recorder'; export default class Engine { global: Record; graphData: GraphConfigData; - modelMap: Map; + nodeModelMap: Map; flowModel: FlowModel; + recorder: Recorder; constructor() { - this.modelMap = new Map(); + this.nodeModelMap = new Map(); + this.recorder = new Recorder(); // register node this.register({ type: StartNode.nodeTypeName, @@ -25,13 +28,30 @@ export default class Engine { * @param nodeConfig { type: 'custom-node', model: Class } */ register(nodeConfig) { - this.modelMap.set(nodeConfig.type, nodeConfig.model); + this.nodeModelMap.set(nodeConfig.type, nodeConfig.model); + } + /** + * 自定义执行记录的存储,默认浏览器使用 sessionStorage,nodejs 使用内存存储。 + * 注意:由于执行记录不会主动删除,所以需要自行清理。 + * nodejs环境建议自定义为持久化存储。 + * engine.setCustomRecorder({ + * async addTask(task) {} + * async getTask(taskId) {} + * async getExecutionTasks(executionId) {} + * clear() {} + * }); + */ + setCustomRecorder(recorder: Recorder) { + this.recorder = recorder; } /** * 加载流程图数据 */ load({ graphData, startNodeType = 'StartNode' }) { - this.flowModel = new FlowModel(this.modelMap); + this.flowModel = new FlowModel({ + nodeModelMap: this.nodeModelMap, + recorder: this.recorder, + }); this.flowModel.setStartNodeType(startNodeType); this.flowModel.load(graphData); return this.flowModel; @@ -40,8 +60,6 @@ export default class Engine { * 执行流程,允许多次调用。 */ async execute(execParam?: TaskParams) { - // const result = await this.flowModel.execute(flowResult); - // return result; return new Promise((resolve) => { if (!execParam) { execParam = {}; @@ -54,6 +72,14 @@ export default class Engine { }); }); } + async getExecutionRecord(executionId) { + const tasks = await this.recorder.getExecutionTasks(executionId); + const records = []; + for (let i = 0; i < tasks.length; i++) { + records.push(this.recorder.getTask(tasks[i])); + } + return Promise.all(records); + } } export { diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 338e7fb98..657f9ab73 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -7,7 +7,7 @@ export interface BaseNodeInterface { taskId: string; type: string; readonly baseType: string; - execute(taskUnit): Promise; + execute(taskUnit): Promise; } export type NodeConstructor = { @@ -38,7 +38,9 @@ export type NextTaskUnit = { executionId: string; nodeId: string; taskId: string; + nodeType: string; outgoing: OutgoingConfig[]; + properties?: Record; }; export type ExecParams = { @@ -68,21 +70,26 @@ export default class BaseNode implements BaseNodeInterface { /** * 节点的每一次执行都会生成一个唯一的taskId */ - async execute(params: ExecParams) { + async execute(params: ExecParams): Promise { const r = await this.action(); const outgoing = await this.getOutgoing(); r && params.next({ executionId: params.executionId, taskId: params.taskId, nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, outgoing, }); - // return r; + return r; } async getOutgoing() { return this.outgoing; } - + /** + * 节点的执行逻辑 + * @returns {boolean} 返回true表示执行成功,返回false表示执行失败,中断流程执行 + */ async action() { return true; } diff --git a/packages/engine/src/nodes/StartNode.ts b/packages/engine/src/nodes/StartNode.ts index a9c2793fd..3c6c34961 100644 --- a/packages/engine/src/nodes/StartNode.ts +++ b/packages/engine/src/nodes/StartNode.ts @@ -3,8 +3,4 @@ import BaseNode from './BaseNode'; export default class StartNode extends BaseNode { static nodeTypeName = 'StartNode'; readonly baseType = 'start'; - async action() { - console.log('StartNode run'); - return true; - } } diff --git a/packages/engine/src/nodes/TaskNode.ts b/packages/engine/src/nodes/TaskNode.ts index b58be057d..3f04d232d 100644 --- a/packages/engine/src/nodes/TaskNode.ts +++ b/packages/engine/src/nodes/TaskNode.ts @@ -3,8 +3,4 @@ import BaseNode from './BaseNode'; export default class TaskNode extends BaseNode { static nodeTypeName = 'TaskNode'; readonly baseType = 'task'; - async action() { - console.log('TaskNode run'); - return true; - } } diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts new file mode 100644 index 000000000..fa1872c3a --- /dev/null +++ b/packages/engine/src/recorder/index.ts @@ -0,0 +1,56 @@ +import storage from '../util/storage'; + +const LOGICFLOW_ENGINE_INSTANCES = 'LOGICFLOW_ENGINE_INSTANCES'; + +export type RecorderData = { + taskId: string; + nodeId: string; + executionId: string; + nodeType: string; + timestamp: number; + properties?: Record; +}; + +export default class Recorder { + /* + * @param {Object} task + * { + * taskId: '', + * nodeId: '', + * executionId: '', + * nodeType: '', + * timestamp: '', + * properties: {}, + * } + */ + async addTask(task: RecorderData) { + const { executionId, taskId } = task; + let instanceData = await this.getExecutionTasks(executionId); + if (!instanceData) { + instanceData = []; + const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + instance.push(executionId); + storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); + } + instanceData.push(taskId); + storage.setItem(executionId, instanceData); + storage.setItem(taskId, task); + } + async getTask(taskId) { + return storage.getItem(taskId); + } + async getExecutionTasks(executionId) { + return storage.getItem(executionId); + } + clear() { + const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + instance.forEach((executionId) => { + storage.removeItem(executionId); + const instanceData = storage.getItem(executionId) || []; + instanceData.forEach((taskId) => { + storage.removeItem(taskId); + }); + }); + storage.removeItem(LOGICFLOW_ENGINE_INSTANCES); + } +} diff --git a/packages/engine/src/util/global.ts b/packages/engine/src/util/global.ts new file mode 100644 index 000000000..ffafa9191 --- /dev/null +++ b/packages/engine/src/util/global.ts @@ -0,0 +1,22 @@ +// The one and only way of getting global scope in all environments +// https://stackoverflow.com/q/3277182/1008999 + +const globalScope = (() => { + if (typeof window === 'object' && window.window === window) { + return window; + } + // eslint-disable-next-line no-restricted-globals + if (typeof self === 'object' && self.self === self) { + // eslint-disable-next-line no-restricted-globals + return self; + } + if (typeof global === 'object' && global.global === global) { + return global; + } + if (typeof globalThis === 'object') { + return globalThis; + } + return {} as Record; +})(); + +export default globalScope; diff --git a/packages/engine/src/util/storage.ts b/packages/engine/src/util/storage.ts new file mode 100644 index 000000000..e0e12f433 --- /dev/null +++ b/packages/engine/src/util/storage.ts @@ -0,0 +1,40 @@ +/** + * 存储执行记录 + */ +import globalScope from './global'; + +if (!globalScope.sessionStorage) { + const storage = { + data: {} as Record, + setItem(key, value) { + storage.data[key] = value; + }, + getItem(key) { + return storage.data[key]; + }, + removeItem(key) { + delete storage.data[key]; + }, + }; + globalScope.sessionStorage = storage; +} + +export default { + setItem(key, value) { + if (typeof value === 'object') { + value = JSON.stringify(value); + } + globalScope.sessionStorage.setItem(key, value); + }, + getItem(key) { + const value = globalScope.sessionStorage.getItem(key); + try { + return JSON.parse(value); + } catch (e) { + return value; + } + }, + removeItem(key) { + globalScope.sessionStorage.removeItem(key); + }, +}; From a7759f69d4f7b294397c8502da654ea7c729d930 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Tue, 11 Jul 2023 20:02:37 +0800 Subject: [PATCH 04/11] feat: added expression evaluation functionality for Node.js and browser --- .../{index.test.js => 01_index.test.js} | 3 +- .../{recorder.test.js => 02_recorder.test.js} | 0 packages/engine/__test__/03_condition.test.js | 56 +++++++++ packages/engine/example/browser/index.html | 16 +++ packages/engine/example/browser/main.js | 52 ++++++++ packages/engine/example/nodejs/index.js | 52 ++++++++ packages/engine/package.json | 2 +- .../engine/scripts/webpack.config.base.js | 45 +++++++ .../engine/scripts/webpack.config.build.js | 19 +++ packages/engine/scripts/webpack.config.dev.js | 29 +++++ packages/engine/src/FlowModel.ts | 116 ++++++++++++------ packages/engine/src/NodeManager.ts | 26 ---- packages/engine/src/constant/LogCode.ts | 6 + packages/engine/src/expression/browserVm.ts | 32 +++++ packages/engine/src/expression/index.ts | 19 +++ packages/engine/src/expression/nodeVm.ts | 12 ++ packages/engine/src/index.ts | 16 ++- packages/engine/src/nodes/BaseNode.ts | 63 ++++++++-- packages/engine/src/util/global.ts | 23 +++- packages/engine/src/util/storage.ts | 2 +- packages/engine/tsconfig.json | 2 +- 21 files changed, 507 insertions(+), 84 deletions(-) rename packages/engine/__test__/{index.test.js => 01_index.test.js} (96%) rename packages/engine/__test__/{recorder.test.js => 02_recorder.test.js} (100%) create mode 100644 packages/engine/__test__/03_condition.test.js create mode 100644 packages/engine/example/browser/index.html create mode 100644 packages/engine/example/browser/main.js create mode 100644 packages/engine/example/nodejs/index.js create mode 100644 packages/engine/scripts/webpack.config.base.js create mode 100644 packages/engine/scripts/webpack.config.build.js create mode 100644 packages/engine/scripts/webpack.config.dev.js delete mode 100644 packages/engine/src/NodeManager.ts create mode 100644 packages/engine/src/expression/browserVm.ts create mode 100644 packages/engine/src/expression/index.ts create mode 100644 packages/engine/src/expression/nodeVm.ts diff --git a/packages/engine/__test__/index.test.js b/packages/engine/__test__/01_index.test.js similarity index 96% rename from packages/engine/__test__/index.test.js rename to packages/engine/__test__/01_index.test.js index 2f5e7e6b5..8aaf880ee 100644 --- a/packages/engine/__test__/index.test.js +++ b/packages/engine/__test__/01_index.test.js @@ -43,7 +43,8 @@ describe('流程引擎', () => { } ] }, - global: {}, + context: {}, + globalData: {}, } engine.load(flowData); const result = await engine.execute(); diff --git a/packages/engine/__test__/recorder.test.js b/packages/engine/__test__/02_recorder.test.js similarity index 100% rename from packages/engine/__test__/recorder.test.js rename to packages/engine/__test__/02_recorder.test.js diff --git a/packages/engine/__test__/03_condition.test.js b/packages/engine/__test__/03_condition.test.js new file mode 100644 index 000000000..b4b55ce3f --- /dev/null +++ b/packages/engine/__test__/03_condition.test.js @@ -0,0 +1,56 @@ +import Engine from '../src/index'; + +describe('流程引擎条件计算', () => { + test('边edge1的条件不满足, 边edge2的条件满足', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + conditionExpression: 'a === 1' + } + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + conditionExpression: 'a === 2' + } + } + ] + }, + globalData: { + a: 2 + }, + } + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + expect(execution.length).toBe(2); + expect(execution[1].nodeId).toBe('node3'); + expect(execution[1].nodeType).toBe('TaskNode'); + }); +}); \ No newline at end of file diff --git a/packages/engine/example/browser/index.html b/packages/engine/example/browser/index.html new file mode 100644 index 000000000..c42b47fc1 --- /dev/null +++ b/packages/engine/example/browser/index.html @@ -0,0 +1,16 @@ + + + + + + Page Title + + + + + +
+ + + + \ No newline at end of file diff --git a/packages/engine/example/browser/main.js b/packages/engine/example/browser/main.js new file mode 100644 index 000000000..a280e68f8 --- /dev/null +++ b/packages/engine/example/browser/main.js @@ -0,0 +1,52 @@ +const { Engine } = window + +// console.log(Engine); +async function test() { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + }, + }, + { + id: 'node2', + type: 'TaskNode', + properties: {}, + }, + { + id: 'node3', + type: 'TaskNode', + properties: {}, + }, + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + conditionExpression: 'a === 1', + }, + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + conditionExpression: 'true', + }, + }, + ], + }, + globalData: {}, + }; + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + console.log(execution); +} +test(); diff --git a/packages/engine/example/nodejs/index.js b/packages/engine/example/nodejs/index.js new file mode 100644 index 000000000..0829bc18c --- /dev/null +++ b/packages/engine/example/nodejs/index.js @@ -0,0 +1,52 @@ +const { Engine } = require('@logicflow/engine'); + +// console.log(Engine); +async function test() { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + }, + }, + { + id: 'node2', + type: 'TaskNode', + properties: {}, + }, + { + id: 'node3', + type: 'TaskNode', + properties: {}, + }, + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + conditionExpression: 'false', + }, + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + conditionExpression: 'true', + }, + }, + ], + }, + global: {}, + }; + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + console.log(execution); +} +test(); diff --git a/packages/engine/package.json b/packages/engine/package.json index aa7bde921..d3e0d34ec 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -13,7 +13,7 @@ }, "license": "Apache-2.0", "scripts": { - "dev": "tsc --module esnext --target es5 --outDir ./es -d -w", + "dev": "cross-env NODE_ENV=development webpack-dev-server --client-log-level warning --config scripts/webpack.config.dev.js", "types": "tsc -d --declarationDir ./types --outDir temp && rimraf -R temp", "build:esm": "tsc --module esnext --target es5 --outDir ./es -d", "build:cjs": "tsc --module commonjs --target es5 --outDir ./cjs", diff --git a/packages/engine/scripts/webpack.config.base.js b/packages/engine/scripts/webpack.config.base.js new file mode 100644 index 000000000..26891f1c2 --- /dev/null +++ b/packages/engine/scripts/webpack.config.base.js @@ -0,0 +1,45 @@ +const path = require('path'); + +module.exports = { + entry: path.resolve(__dirname, '../src/index.ts'), + output: { + path: path.resolve(__dirname, '../lib'), + filename: 'index.js', + libraryTarget: 'umd', + // libraryExport: 'default', // 兼容 ES6(ES2015) 的模块系统、CommonJS 和 AMD 模块规范 + }, + resolve: { + extensions: ['.ts', '.js'], + symlinks: false, + }, + module: { + rules: [ + { + test: /\.(js|ts)$/, + exclude: /node_modules/, + use: [ + { + loader: 'babel-loader', + options: { + presets: [ + [ + '@babel/preset-typescript', + { + allExtensions: true, + }, + ], + [ + '@babel/preset-env', + { + useBuiltIns: 'usage', + corejs: '3.3', + }, + ], + ], + }, + }, + ], + }, + ], + }, +}; diff --git a/packages/engine/scripts/webpack.config.build.js b/packages/engine/scripts/webpack.config.build.js new file mode 100644 index 000000000..65c5835c2 --- /dev/null +++ b/packages/engine/scripts/webpack.config.build.js @@ -0,0 +1,19 @@ +const path = require('path'); +const webpack = require('webpack'); +const CaseSensitivePathsPlugin = require('case-sensitive-paths-webpack-plugin'); +const baseWebpackConfig = require('./webpack.config.base.js'); + +module.exports = [ + { + ...baseWebpackConfig, + mode: 'production', + output: { + path: path.resolve(__dirname, '../lib'), + filename: '[name].js', + libraryTarget: 'umd', + }, + plugins: [ + new CaseSensitivePathsPlugin() + ], + }, +]; diff --git a/packages/engine/scripts/webpack.config.dev.js b/packages/engine/scripts/webpack.config.dev.js new file mode 100644 index 000000000..14cf13f8b --- /dev/null +++ b/packages/engine/scripts/webpack.config.dev.js @@ -0,0 +1,29 @@ +const path = require('path'); +const webpack = require('webpack'); +const CaseSensitivePathsPlugin = require('case-sensitive-paths-webpack-plugin'); +// const ESLintPlugin = require('eslint-webpack-plugin'); +const baseWebpackConfig = require('./webpack.config.base.js'); + +// 先不用webpack merge +module.exports = [ + { + ...baseWebpackConfig, + devtool: 'inline-source-map', + mode: 'development', + devServer: { + contentBase: path.join(__dirname, '../../'), + stats: 'errors-warnings', + port: 9094, + host: '127.0.0.1', + watchOptions: { + poll: true, + }, + }, + plugins: [ + new CaseSensitivePathsPlugin(), + // new ESLintPlugin({ + // extensions: ['ts', 'tsx'], + // }), + ], + }, +]; diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 632425791..94b5e54ab 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -2,12 +2,12 @@ import type { NodeConfig, NodeConstructor, } from './nodes/BaseNode'; +import type Recorder from './recorder'; import { EVENT_INSTANCE_COMPLETE, } from './constant/constant'; import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; -import NodeManager from './NodeManager'; export type TaskUnit = { executionId: string; @@ -42,7 +42,6 @@ export default class FlowModel { * 调度器,用于调度节点的执行。 */ scheduler: Scheduler; - NodeManager: NodeManager; /** * 待执行的队列,当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 */ @@ -54,7 +53,7 @@ export default class FlowModel { /** * 当前流程模型中的所有节点,边会被转换成节点的incoming和outgoing属性。 */ - nodeConfigMap: Map; + nodeConfigMap: Map = new Map(); /** * 当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 */ @@ -67,12 +66,34 @@ export default class FlowModel { * 当前流程中开始节点组成的数组。 */ startNodes: NodeConfig[] = []; + /** + * 用于存储全局数据,可以在流程中共享。 + */ + globalData: Record = {}; + /** + * 外部传入的上下文,最终会传递给每个节点 + * 例如: + * const context = { + * request: { + * get: (url) => { + * return fetch(url); + * } + * } + * 在节点内部可以通过 this.context.request.get(url) 来调用。 + */ + context: Record; constructor({ nodeModelMap, recorder, + context = {}, + globalData = {}, + startNodeType = 'StartNode', }: { nodeModelMap: Map; - recorder?: any; + recorder: Recorder; + context?: Record; + globalData?: Record; + startNodeType?: string; }) { // 流程包含的节点类型 this.nodeModelMap = nodeModelMap; @@ -80,9 +101,13 @@ export default class FlowModel { this.executeQueue = []; // 执行中的任务 this.executingInstance = null; - this.nodeConfigMap = new Map(); + // 外部传入的上下文,最终会传递给每个节点 + this.context = context; + // 用于存储全局数据,可以在流程中共享。 + this.globalData = globalData; + // 开始节点类型,在执行流程时,会从这些节点开始执行。 + this.startNodeType = startNodeType; this.isRunning = false; - this.NodeManager = new NodeManager(); this.scheduler = new Scheduler({ flowModel: this, recorder, @@ -91,26 +116,10 @@ export default class FlowModel { this.onTaskFinished(result); }); } - onTaskFinished(result) { - const { executionId } = result; - if (executionId !== this.executionId) { - return; - } - const { callback } = this.executingInstance; - if (callback) { - callback(result); - } - this.executingInstance = null; - if (this.executeQueue.length) { - this.createExecuteInstance(); - } else { - this.isRunning = false; - } - } - setStartNodeType(startNodeType) { + public setStartNodeType(startNodeType) { this.startNodeType = startNodeType; } - load(graphData) { + public load(graphData) { const { nodes = [], edges = [] } = graphData; nodes.forEach((node) => { if (this.nodeModelMap.has(node.type)) { @@ -135,14 +144,14 @@ export default class FlowModel { if (sourceNode) { sourceNode.outgoing.push({ id: edge.id, - condition: edge.properties, + properties: edge.properties, target: edge.targetNodeId, }); } if (targetNode && targetNode.type !== this.startNodeType) { targetNode.incoming.push({ id: edge.id, - condition: edge.properties, + properties: edge.properties, source: edge.sourceNodeId, }); } @@ -158,7 +167,7 @@ export default class FlowModel { * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 */ - async execute(params ?: ExecParams) { + public async execute(params ?: ExecParams) { this.executeQueue.push(params); if (this.isRunning) { return; @@ -166,7 +175,47 @@ export default class FlowModel { this.isRunning = true; this.createExecuteInstance(); } - async createExecuteInstance() { + /** + * 创建节点实例 + * @param nodeId 节点Id + * @returns 节点示例 + */ + public createTask(nodeId: string) { + const nodeConfig = this.nodeConfigMap.get(nodeId); + const NodeModel = this.nodeModelMap.get(nodeConfig.type); + const task = new NodeModel({ + nodeConfig, + globalData: this.globalData, + context: this.context, + }); + return task; + } + /** + * 更新流程全局数据 + */ + public updateGlobalData(data) { + this.globalData = { + ...this.globalData, + ...data, + }; + } + private onTaskFinished(result) { + const { executionId } = result; + if (executionId !== this.executionId) { + return; + } + const { callback } = this.executingInstance; + if (callback) { + callback(result); + } + this.executingInstance = null; + if (this.executeQueue.length > 0) { + this.createExecuteInstance(); + } else { + this.isRunning = false; + } + } + private async createExecuteInstance() { const execParams = this.executeQueue.shift(); this.executionId = createExecId(); this.executingInstance = execParams; @@ -179,15 +228,4 @@ export default class FlowModel { this.scheduler.run(this.executionId); }); } - /** - * 创建节点实例 - * @param nodeId 节点Id - * @returns 节点示例 - */ - createTask(nodeId: string) { - const nodeConfig = this.nodeConfigMap.get(nodeId); - const NodeModel = this.nodeModelMap.get(nodeConfig.type); - const task = new NodeModel(nodeConfig); - return task; - } } diff --git a/packages/engine/src/NodeManager.ts b/packages/engine/src/NodeManager.ts deleted file mode 100644 index 2c10f5db1..000000000 --- a/packages/engine/src/NodeManager.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type { BaseNodeInterface } from './nodes/BaseNode'; -import { BASE_START_NODE } from './constant/constant'; - -export default class NodeManager { - taskMap: Map; - tasks: BaseNodeInterface[]; - startTasks: BaseNodeInterface[]; - constructor() { - this.taskMap = new Map(); - this.tasks = []; - this.startTasks = []; - } - addTask(task: BaseNodeInterface) { - this.tasks.push(task); - this.taskMap.set(task.nodeId, task); - if (task.baseType === BASE_START_NODE) { - this.startTasks.push(task); - } - } - getTask(taskId: string) { - return this.taskMap.get(taskId); - } - getStartTasks() { - return this.startTasks; - } -} diff --git a/packages/engine/src/constant/LogCode.ts b/packages/engine/src/constant/LogCode.ts index d6b5c59cc..3b464a6ea 100644 --- a/packages/engine/src/constant/LogCode.ts +++ b/packages/engine/src/constant/LogCode.ts @@ -1,20 +1,26 @@ export enum ErrorCode { // 模型数据错误 NONE_START_NODE = 1000, + // 表达式错误 + NO_DOCUMENT_BODY = 2001, } export enum WarningCode { NONE_START_NODE_IN_DATA = 2000, START_NODE_INCOMING = 2001, + // 表达式判断异常 + EXPRESSION_EXEC_ERROR = 3000, } const errorMsgMapCn = { [ErrorCode.NONE_START_NODE]: '未找到入度为0的节点', + [ErrorCode.NO_DOCUMENT_BODY]: '找不到document.body, 请在DOM加载完成后再执行', }; const warningMsgMapCn = { [WarningCode.NONE_START_NODE_IN_DATA]: '初始化数据中未找到入度为0的节点', [WarningCode.START_NODE_INCOMING]: '开始节点不允许被连入', + [WarningCode.EXPRESSION_EXEC_ERROR]: '表达式执行异常', }; export const getErrorMsg = (code: ErrorCode) => `error[${code}]: ${errorMsgMapCn[code]}`; diff --git a/packages/engine/src/expression/browserVm.ts b/packages/engine/src/expression/browserVm.ts new file mode 100644 index 000000000..46f0b13e3 --- /dev/null +++ b/packages/engine/src/expression/browserVm.ts @@ -0,0 +1,32 @@ +import { + ErrorCode, + WarningCode, + getErrorMsg, + getWarningMsg, +} from '../constant/LogCode'; + +const runInBrowserContext = async (code: string, globalData: any = {}) => { + const iframe = document.createElement('iframe'); + iframe.style.display = 'none'; + if (!document || !document.body) { + console.error(getErrorMsg(ErrorCode.NO_DOCUMENT_BODY)); + } + document.body.appendChild(iframe); + const iframeWindow = iframe.contentWindow as any; + const iframeEval = iframeWindow.eval; + Object.keys(globalData).forEach((key) => { + iframeWindow[key] = globalData[key]; + }); + let res = null; + try { + res = iframeEval.call(iframeWindow, code); + } catch (e) { + console.warn(getWarningMsg(WarningCode.EXPRESSION_EXEC_ERROR), { code, globalData, e }); + } + document.body.removeChild(iframe); + return res; +}; + +export { + runInBrowserContext, +}; diff --git a/packages/engine/src/expression/index.ts b/packages/engine/src/expression/index.ts new file mode 100644 index 000000000..3bbabe10a --- /dev/null +++ b/packages/engine/src/expression/index.ts @@ -0,0 +1,19 @@ +import { runInNewContext } from './nodeVm'; +import { runInBrowserContext } from './browserVm'; +import { isInNodeJS, isInBrowser, globalScope } from '../util/global'; + +const getExpressionResult = async (code: string, context: any) => { + if (isInNodeJS) { + const r = await runInNewContext(code, context); + return r; + } + if (isInBrowser) { + const r = await runInBrowserContext(code, context); + return r; + } + return globalScope.eval(code); // eslint-disable-line no-eval +}; + +export { + getExpressionResult, +}; diff --git a/packages/engine/src/expression/nodeVm.ts b/packages/engine/src/expression/nodeVm.ts new file mode 100644 index 000000000..b2f8672d0 --- /dev/null +++ b/packages/engine/src/expression/nodeVm.ts @@ -0,0 +1,12 @@ +// import vm from 'node:vm'; +const vm = require('vm'); + +const runInNewContext = async (code: string, globalData: any = {}) => { + const context = vm.createContext(globalData); + vm.runInContext(code, context); + return context; +}; + +export { + runInNewContext, +}; diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index c14236096..c743406af 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -47,12 +47,19 @@ export default class Engine { /** * 加载流程图数据 */ - load({ graphData, startNodeType = 'StartNode' }) { + load({ + graphData, + startNodeType = 'StartNode', + globalData = {}, + context = {}, + }) { this.flowModel = new FlowModel({ nodeModelMap: this.nodeModelMap, recorder: this.recorder, + context, + globalData, + startNodeType, }); - this.flowModel.setStartNodeType(startNodeType); this.flowModel.load(graphData); return this.flowModel; } @@ -83,7 +90,12 @@ export default class Engine { } export { + Engine, +}; + +export const EngineNode = { StartNode, + TaskNode, }; export type { diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 657f9ab73..88e32fa0a 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -1,29 +1,32 @@ -import { createTaskId } from '../util/ID'; +import { getExpressionResult } from '../expression'; export interface BaseNodeInterface { outgoing: Record[]; incoming: Record[]; nodeId: string; - taskId: string; type: string; readonly baseType: string; execute(taskUnit): Promise; } export type NodeConstructor = { - new (config: NodeConfig): BaseNode; + new (config: { + nodeConfig: NodeConfig; + context: Record; + globalData: Record; + }): BaseNode; }; export type IncomingConfig = { id: string; - condition?: Record; + properties?: Record; source: string; }; export type OutgoingConfig = { id: string; - condition?: Record; target: string; + properties?: Record; }; export type NodeConfig = { @@ -52,19 +55,38 @@ export type ExecParams = { export default class BaseNode implements BaseNodeInterface { static nodeTypeName = 'BaseNode'; + /** + * 节点的出边 + */ outgoing: OutgoingConfig[]; + /** + * 节点的入边 + */ incoming: IncomingConfig[]; + /** + * 节点的属性 + */ properties?: Record; nodeId: string; - taskId: string; type: string; + /** + * 节点的上下文,是调用流程时传入的上下文 + */ + context: Record; + /** + * 节点的全局数据,是调用流程时传入的全局数据。 + * 在计算表达式时,即基于全局数据进行计算。 + */ + globalData: Record; readonly baseType: string; - constructor(nodeConfig: NodeConfig) { + constructor({ nodeConfig, context, globalData }) { this.outgoing = nodeConfig.outgoing; this.incoming = nodeConfig.incoming; this.nodeId = nodeConfig.id; this.type = nodeConfig.type; this.properties = nodeConfig.properties; + this.context = context; + this.globalData = globalData; this.baseType = 'base'; } /** @@ -84,7 +106,32 @@ export default class BaseNode implements BaseNodeInterface { return r; } async getOutgoing() { - return this.outgoing; + const outgoing = []; + const expressions = []; + for (const item of this.outgoing) { + const { id, target, properties } = item; + expressions.push(this.isPass(properties)); + } + const result = await Promise.all(expressions); + result.forEach((item, index) => { + if (item) { + outgoing.push(this.outgoing[index]); + } + }); + return outgoing; + } + async isPass(properties) { + if (!properties) return true; + const { conditionExpression } = properties; + if (!conditionExpression) return true; + try { + const result = await getExpressionResult(`result${this.nodeId} = (${conditionExpression})`, { + ...this.globalData, + }); + return result[`result${this.nodeId}`]; + } catch (e) { + return false; + } } /** * 节点的执行逻辑 diff --git a/packages/engine/src/util/global.ts b/packages/engine/src/util/global.ts index ffafa9191..736c3b905 100644 --- a/packages/engine/src/util/global.ts +++ b/packages/engine/src/util/global.ts @@ -1,22 +1,35 @@ // The one and only way of getting global scope in all environments // https://stackoverflow.com/q/3277182/1008999 +const isInBrowser = typeof window === 'object' && window.window === window; + +const isInNodeJS = typeof global === 'object' && global.global === global; +// eslint-disable-next-line no-restricted-globals +const isInWebWorker = !isInBrowser && typeof self === 'object' && self.constructor; + const globalScope = (() => { - if (typeof window === 'object' && window.window === window) { + if (isInBrowser) { return window; } // eslint-disable-next-line no-restricted-globals - if (typeof self === 'object' && self.self === self) { + if (typeof self === 'object' && self.self === self) { // web workers // eslint-disable-next-line no-restricted-globals return self; } - if (typeof global === 'object' && global.global === global) { + if (isInNodeJS) { return global; } if (typeof globalThis === 'object') { return globalThis; } - return {} as Record; + return { + eval: () => undefined, + } as Record; })(); -export default globalScope; +export { + globalScope, + isInBrowser, + isInWebWorker, + isInNodeJS, +}; diff --git a/packages/engine/src/util/storage.ts b/packages/engine/src/util/storage.ts index e0e12f433..109af89e0 100644 --- a/packages/engine/src/util/storage.ts +++ b/packages/engine/src/util/storage.ts @@ -1,7 +1,7 @@ /** * 存储执行记录 */ -import globalScope from './global'; +import { globalScope } from './global'; if (!globalScope.sessionStorage) { const storage = { diff --git a/packages/engine/tsconfig.json b/packages/engine/tsconfig.json index d5a17a097..20afc0f61 100644 --- a/packages/engine/tsconfig.json +++ b/packages/engine/tsconfig.json @@ -1,7 +1,7 @@ { "extends": "../../tsconfig.json", "include": [ - "src/*", "src/tools/auto-layout", "src/tools/snapshot", + "src/*" ], "exclude": [ "node_modules", From f96b27824ff1e4b03eec271594ed128ea17e5a74 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Tue, 11 Jul 2023 20:23:02 +0800 Subject: [PATCH 05/11] chore: add umd package --- packages/engine/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/engine/package.json b/packages/engine/package.json index d3e0d34ec..ff0899034 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -17,7 +17,8 @@ "types": "tsc -d --declarationDir ./types --outDir temp && rimraf -R temp", "build:esm": "tsc --module esnext --target es5 --outDir ./es -d", "build:cjs": "tsc --module commonjs --target es5 --outDir ./cjs", - "build": "rimraf ./es ./cjs ./lib && npm run build:esm & npm run build:cjs", + "build:umd": "cross-env NODE_ENV=production webpack --config scripts/webpack.config.build.js", + "build": "rimraf ./es ./cjs ./lib && npm run build:esm & npm run build:cjs & npm run build:umd", "publish-lib": "npm run build && npm publish", "test": "jest" }, From 113ad880cdb564431933eeeed3b661c50fca2a9c Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Thu, 13 Jul 2023 15:07:13 +0800 Subject: [PATCH 06/11] feat: add support for specifying the start node and multiple executions --- packages/engine/__test__/01_index.test.js | 28 +- packages/engine/__test__/02_recorder.test.js | 4 +- packages/engine/__test__/03_condition.test.js | 4 +- packages/engine/__test__/04_execute.test.js | 244 ++++++++++++++++++ packages/engine/example/nodejs/index.js | 32 ++- packages/engine/package.json | 3 +- packages/engine/src/FlowModel.ts | 20 +- packages/engine/src/constant/LogCode.ts | 2 + packages/engine/src/index.ts | 5 +- 9 files changed, 297 insertions(+), 45 deletions(-) create mode 100644 packages/engine/__test__/04_execute.test.js diff --git a/packages/engine/__test__/01_index.test.js b/packages/engine/__test__/01_index.test.js index 8aaf880ee..aa8b92eab 100644 --- a/packages/engine/__test__/01_index.test.js +++ b/packages/engine/__test__/01_index.test.js @@ -1,27 +1,7 @@ import Engine from '../src/index'; -describe('流程引擎', () => { - test('初始化流程引擎', () => { - const engine = new Engine(); - expect(engine).toBeInstanceOf(Engine); - }); - test('加载图数据', async () => { - const engine = new Engine(); - const flowData = { - graphData: { - nodes: [ - { - id: 'node1', - type: 'StartNode', - } - ] - }, - global: {}, - } - const flowModel = engine.load(flowData); - expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length); - }); - test('执行流程完成, 返回数据包含executionId', async () => { +describe('@logicflow/engine', () => { + test('Execution Process Completed, Returning Data Containing executionId', async () => { const engine = new Engine(); const flowData = { graphData: { @@ -46,8 +26,10 @@ describe('流程引擎', () => { context: {}, globalData: {}, } - engine.load(flowData); + const flowModel = engine.load(flowData); const result = await engine.execute(); + expect(engine).toBeInstanceOf(Engine); + expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length); expect(result).toHaveProperty('executionId'); }); }); diff --git a/packages/engine/__test__/02_recorder.test.js b/packages/engine/__test__/02_recorder.test.js index 563f0d787..7d238853d 100644 --- a/packages/engine/__test__/02_recorder.test.js +++ b/packages/engine/__test__/02_recorder.test.js @@ -1,7 +1,7 @@ import Engine from '../src/index'; -describe('流程引擎执行记录器', () => { - test('获取流程执行记录', async () => { +describe('@logicflow/engine Recorder', () => { + test('Retrieve Execution Process Records.', async () => { const engine = new Engine(); const flowData = { graphData: { diff --git a/packages/engine/__test__/03_condition.test.js b/packages/engine/__test__/03_condition.test.js index b4b55ce3f..48c571077 100644 --- a/packages/engine/__test__/03_condition.test.js +++ b/packages/engine/__test__/03_condition.test.js @@ -1,7 +1,7 @@ import Engine from '../src/index'; -describe('流程引擎条件计算', () => { - test('边edge1的条件不满足, 边edge2的条件满足', async () => { +describe('@logicflow/engine condition', () => { + test('Condition of Edge edge1 not satisfied, condition of Edge edge2 satisfied.', async () => { const engine = new Engine(); const flowData = { graphData: { diff --git a/packages/engine/__test__/04_execute.test.js b/packages/engine/__test__/04_execute.test.js new file mode 100644 index 000000000..535818aaf --- /dev/null +++ b/packages/engine/__test__/04_execute.test.js @@ -0,0 +1,244 @@ +import Engine from '../src/index'; + +describe('@logicflow/engine Execute', () => { + test('When there are multiple start nodes in a process, all of them are executed by default.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + + expect(execution.length).toBe(5); + }); + test('When there are multiple start nodes in a process, you can specify which start node to execute.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const result = await engine.execute({ + nodeId: 'node3' + }); + const execution = await engine.getExecutionRecord(result.executionId); + + expect(execution.length).toBe(2); + expect(execution[0].nodeId).toBe('node3'); + expect(execution[1].nodeId).toBe('node4'); + }); + test('When attempting to execute a non-existent start node in a process, an execution exception is raised.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + try { + await engine.execute({ + nodeId: 'node6' + }); + } catch (error) { + expect(error.message).toContain('node6') + expect(error.message).toContain('1001'); + } + }); + test('When there are multiple start nodes in a process, parallel execution is supported, and each start node generates a unique execution ID', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const r = engine.execute({ + nodeId: 'node1' + }); + const r2 = engine.execute({ + nodeId: 'node3' + }); + const result = await Promise.all([r, r2]); + const execution1 = await engine.getExecutionRecord(result[0].executionId); + const execution2 = await engine.getExecutionRecord(result[1].executionId); + expect(execution1.length).toBe(2); + expect(execution1[0].nodeId).toBe('node1'); + expect(execution1[1].nodeId).toBe('node2'); + expect(execution2.length).toBe(2); + expect(execution2[0].nodeId).toBe('node3'); + expect(execution2[1].nodeId).toBe('node4'); + expect(result[0].executionId).not.toBe(result[1].executionId); + }); +}); \ No newline at end of file diff --git a/packages/engine/example/nodejs/index.js b/packages/engine/example/nodejs/index.js index 0829bc18c..fbe01288d 100644 --- a/packages/engine/example/nodejs/index.js +++ b/packages/engine/example/nodejs/index.js @@ -10,18 +10,27 @@ async function test() { id: 'node1', type: 'StartNode', properties: { - }, + } }, { id: 'node2', type: 'TaskNode', - properties: {}, + properties: {} }, { id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', type: 'TaskNode', - properties: {}, + properties: {} }, + { + id: 'node5', + type: 'StartNode', + } ], edges: [ { @@ -29,21 +38,18 @@ async function test() { sourceNodeId: 'node1', targetNodeId: 'node2', properties: { - conditionExpression: 'false', - }, + } }, { id: 'edge2', - sourceNodeId: 'node1', - targetNodeId: 'node3', + sourceNodeId: 'node3', + targetNodeId: 'node4', properties: { - conditionExpression: 'true', - }, - }, - ], + } + } + ] }, - global: {}, - }; + } engine.load(flowData); const result = await engine.execute(); const execution = await engine.getExecutionRecord(result.executionId); diff --git a/packages/engine/package.json b/packages/engine/package.json index ff0899034..7d2301ea5 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -20,7 +20,8 @@ "build:umd": "cross-env NODE_ENV=production webpack --config scripts/webpack.config.build.js", "build": "rimraf ./es ./cjs ./lib && npm run build:esm & npm run build:cjs & npm run build:umd", "publish-lib": "npm run build && npm publish", - "test": "jest" + "test": "jest", + "test:watch": "jest --watch" }, "author": "", "files": [ diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 94b5e54ab..2c87ab313 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -8,6 +8,7 @@ import { } from './constant/constant'; import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; +import { ErrorCode, getErrorMsg } from './constant/LogCode'; export type TaskUnit = { executionId: string; @@ -27,6 +28,7 @@ export type TaskParams = { export type ExecParams = { callback?: (result: FlowResult) => void; + onError?: (error: Error) => void; } & TaskParams; export default class FlowModel { @@ -215,9 +217,21 @@ export default class FlowModel { this.isRunning = false; } } - private async createExecuteInstance() { + private createExecuteInstance() { const execParams = this.executeQueue.shift(); - this.executionId = createExecId(); + if (execParams.executionId) { + this.executionId = execParams.executionId; + } else { + this.executionId = createExecId(); + } + if (execParams.nodeId) { + const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); + if (!nodeConfig) { + execParams.onError(new Error(`${getErrorMsg(ErrorCode.NONE_NODE_ID)}(${execParams.nodeId})`)); + return; + } + this.startNodes = [nodeConfig]; + } this.executingInstance = execParams; this.startNodes.forEach((startNode) => { this.scheduler.addTask({ @@ -225,7 +239,7 @@ export default class FlowModel { nodeId: startNode.id, }); // 所有的开始节点都执行 - this.scheduler.run(this.executionId); }); + this.scheduler.run(this.executionId); } } diff --git a/packages/engine/src/constant/LogCode.ts b/packages/engine/src/constant/LogCode.ts index 3b464a6ea..90ae66049 100644 --- a/packages/engine/src/constant/LogCode.ts +++ b/packages/engine/src/constant/LogCode.ts @@ -1,6 +1,7 @@ export enum ErrorCode { // 模型数据错误 NONE_START_NODE = 1000, + NONE_NODE_ID = 1001, // 表达式错误 NO_DOCUMENT_BODY = 2001, } @@ -14,6 +15,7 @@ export enum WarningCode { const errorMsgMapCn = { [ErrorCode.NONE_START_NODE]: '未找到入度为0的节点', + [ErrorCode.NONE_NODE_ID]: '流程数据中存在没有此节点', [ErrorCode.NO_DOCUMENT_BODY]: '找不到document.body, 请在DOM加载完成后再执行', }; diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index c743406af..dd7efe17a 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -67,7 +67,7 @@ export default class Engine { * 执行流程,允许多次调用。 */ async execute(execParam?: TaskParams) { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { if (!execParam) { execParam = {}; } @@ -76,6 +76,9 @@ export default class Engine { callback: (result) => { resolve(result); }, + onError: (error) => { + reject(error); + }, }); }); } From 7c4e3855ad0a7af4121de6552be61f690b4e0e6c Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Mon, 17 Jul 2023 14:28:47 +0800 Subject: [PATCH 07/11] feat(engine): add the ability to pause and resume workflows --- packages/engine/__test__/01_index.test.js | 2 + packages/engine/__test__/02_recorder.test.js | 2 +- packages/engine/__test__/03_condition.test.js | 2 +- .../engine/__test__/05_customNode.test.js | 78 ++++++++ packages/engine/example/browser/main.js | 4 +- packages/engine/src/FlowModel.ts | 47 +++-- packages/engine/src/Scheduler.ts | 185 +++++++++++++----- packages/engine/src/constant/constant.ts | 9 + packages/engine/src/index.ts | 19 +- packages/engine/src/nodes/BaseNode.ts | 96 +++++++-- packages/engine/src/recorder/index.ts | 17 +- packages/engine/src/types.d.ts | 58 ++++++ 12 files changed, 417 insertions(+), 102 deletions(-) create mode 100644 packages/engine/__test__/05_customNode.test.js create mode 100644 packages/engine/src/types.d.ts diff --git a/packages/engine/__test__/01_index.test.js b/packages/engine/__test__/01_index.test.js index aa8b92eab..3b1f166c9 100644 --- a/packages/engine/__test__/01_index.test.js +++ b/packages/engine/__test__/01_index.test.js @@ -31,5 +31,7 @@ describe('@logicflow/engine', () => { expect(engine).toBeInstanceOf(Engine); expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length); expect(result).toHaveProperty('executionId'); + expect(result.status).toBe('completed'); + expect(result.nodeId).toEqual('node2'); }); }); diff --git a/packages/engine/__test__/02_recorder.test.js b/packages/engine/__test__/02_recorder.test.js index 7d238853d..814494474 100644 --- a/packages/engine/__test__/02_recorder.test.js +++ b/packages/engine/__test__/02_recorder.test.js @@ -1,7 +1,7 @@ import Engine from '../src/index'; describe('@logicflow/engine Recorder', () => { - test('Retrieve Execution Process Records.', async () => { + test('Using the getExecutionRecord API, receive the complete execution record of the process.', async () => { const engine = new Engine(); const flowData = { graphData: { diff --git a/packages/engine/__test__/03_condition.test.js b/packages/engine/__test__/03_condition.test.js index 48c571077..25a18c5b6 100644 --- a/packages/engine/__test__/03_condition.test.js +++ b/packages/engine/__test__/03_condition.test.js @@ -1,7 +1,7 @@ import Engine from '../src/index'; describe('@logicflow/engine condition', () => { - test('Condition of Edge edge1 not satisfied, condition of Edge edge2 satisfied.', async () => { + test('The process will not continue its execution if the condition expression evaluates to false.', async () => { const engine = new Engine(); const flowData = { graphData: { diff --git a/packages/engine/__test__/05_customNode.test.js b/packages/engine/__test__/05_customNode.test.js new file mode 100644 index 000000000..749c8ba44 --- /dev/null +++ b/packages/engine/__test__/05_customNode.test.js @@ -0,0 +1,78 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine Customize Node', () => { + class UserTask extends TaskNode { + async action() { + return { + status: 'interrupted', + detail: { + formId: 'form_1' + } + }; + } + } + const engine = new Engine(); + engine.register({ + type: 'UserTask', + model: UserTask, + }) + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'UserTask', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node2', + targetNodeId: 'node3', + properties: { + } + } + ] + }, + globalData: { + }, + } + engine.load(flowData); + test('After executing the process, receive the flow status as "interrupted" and include detailed information returned by the custom node.', async () => { + const result = await engine.execute(); + expect(result.status).toBe('interrupted'); + expect(result.detail.formId).toEqual('form_1'); + }); + test('After a process is interrupted, you can resume its execution using the API.', async () => { + const result = await engine.execute(); + const result2 = await engine.resume({ + executionId: result.executionId, + nodeId: result.nodeId, + taskId: result.taskId, + data: { + formId: 'form_2' + } + }) + expect(result2.status).toBe('completed') + expect(result2.nodeId).toEqual('node3') + }); +}); diff --git a/packages/engine/example/browser/main.js b/packages/engine/example/browser/main.js index a280e68f8..64a760a79 100644 --- a/packages/engine/example/browser/main.js +++ b/packages/engine/example/browser/main.js @@ -42,7 +42,9 @@ async function test() { }, ], }, - globalData: {}, + globalData: { + a: 2, + }, }; engine.load(flowData); const result = await engine.execute(); diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 2c87ab313..74eca8233 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -4,26 +4,22 @@ import type { } from './nodes/BaseNode'; import type Recorder from './recorder'; import { - EVENT_INSTANCE_COMPLETE, + EVENT_INSTANCE_COMPLETE, EVENT_INSTANCE_INTERRUPTED, } from './constant/constant'; import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; import { ErrorCode, getErrorMsg } from './constant/LogCode'; - -export type TaskUnit = { - executionId: string; - taskId?: string; - nodeId: string; -}; +import type { TaskParam } from './types.d'; export type FlowResult = { result?: Record; -} & TaskUnit; +} & TaskParam; export type TaskParams = { executionId?: string; taskId?: string; nodeId?: string; + data?: Record; }; export type ExecParams = { @@ -117,6 +113,9 @@ export default class FlowModel { this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => { this.onTaskFinished(result); }); + this.scheduler.on(EVENT_INSTANCE_INTERRUPTED, (result) => { + this.onTaskFinished(result); + }); } public setStartNodeType(startNodeType) { this.startNodeType = startNodeType; @@ -169,13 +168,21 @@ export default class FlowModel { * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 */ - public async execute(params ?: ExecParams) { + public async execute(params: ExecParams) { this.executeQueue.push(params); if (this.isRunning) { return; } this.isRunning = true; - this.createExecuteInstance(); + this.createExecution(); + } + public async resume(params: ExecParams) { + this.executeQueue.push(params); + if (this.isRunning) { + return; + } + this.isRunning = true; + this.createExecution(); } /** * 创建节点实例 @@ -212,18 +219,29 @@ export default class FlowModel { } this.executingInstance = null; if (this.executeQueue.length > 0) { - this.createExecuteInstance(); + this.createExecution(); } else { this.isRunning = false; } } - private createExecuteInstance() { + private createExecution() { const execParams = this.executeQueue.shift(); + this.executingInstance = execParams; if (execParams.executionId) { this.executionId = execParams.executionId; } else { this.executionId = createExecId(); } + // 如果有taskId,那么表示恢复执行 + if (execParams.taskId) { + this.scheduler.resume({ + executionId: this.executionId, + taskId: execParams.taskId, + nodeId: execParams.nodeId, + data: execParams.data, + }); + return; + } if (execParams.nodeId) { const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); if (!nodeConfig) { @@ -232,7 +250,6 @@ export default class FlowModel { } this.startNodes = [nodeConfig]; } - this.executingInstance = execParams; this.startNodes.forEach((startNode) => { this.scheduler.addTask({ executionId: this.executionId, @@ -240,6 +257,8 @@ export default class FlowModel { }); // 所有的开始节点都执行 }); - this.scheduler.run(this.executionId); + this.scheduler.run({ + executionId: this.executionId, + }); } } diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index 6ab120503..c0783a128 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -1,63 +1,124 @@ import EventEmitter from './EventEmitter'; -import { TaskUnit } from './FlowModel'; -import type FlowModel from './FlowModel'; -import { EVENT_INSTANCE_COMPLETE, FlowStatus } from './constant/constant'; -import type { NextTaskUnit } from './nodes/BaseNode'; +import { + EVENT_INSTANCE_COMPLETE, + EVENT_INSTANCE_INTERRUPTED, + FlowStatus, +} from './constant/constant'; import { createTaskId } from './util/ID'; +import type { + ActionResult, + TaskParam, + NodeParam, + ResumeParam, +} from './types.d'; +import type FlowModel from './FlowModel'; +import type { NextTaskParam } from './nodes/BaseNode'; import type Recorder from './recorder'; -type TaskUnitMap = Map; +type TaskParamMap = Map; + +type TaskResult = { + extraInfo?: Record; +} & NextTaskParam; /** * 调度器 * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 */ export default class Scheduler extends EventEmitter { - taskQueueMap: Map; - taskRunningMap: Map; + nodeQueueMap: Map; + taskRunningMap: Map; flowModel: FlowModel; recorder: Recorder; - currentTask: TaskUnit | null; + currentTask: TaskParam | null; constructor(config) { super(); - this.taskQueueMap = new Map(); + this.nodeQueueMap = new Map(); this.taskRunningMap = new Map(); this.flowModel = config.flowModel; this.recorder = config.recorder; this.currentTask = null; } - run(executionId) { - const currentTask = this.getNextTask(executionId); - if (currentTask) { - const task = this.addRunningTask(currentTask); - this.exec(task); + /** + * 添加一个任务到队列中。 + * 1. 由流程模型将所有的开始节点添加到队列中。 + * 2. 当一个节点执行完成后,将后续的节点添加到队列中。 + */ + public addTask(nodeParam: NodeParam) { + const { executionId } = nodeParam; + if (!this.nodeQueueMap.has(executionId)) { + this.nodeQueueMap.set(executionId, []); + } + const currentTaskQueue = this.nodeQueueMap.get(executionId); + currentTaskQueue.push(nodeParam); + } + /** + * 调度器执行下一个任务 + * 1. 提供给流程模型,用户开始执行第一个任务。 + * 2. 内部任务执行完成后,调用此方法继续执行下一个任务。 + * 3. 当判断没有可以继续执行的任务后,触发流程结束事件。 + */ + public run(runParams: { + executionId: string; + nodeId?: string; + taskId?: string; + }) { + const { executionId } = runParams; + const currentNode = this.getNextNode(executionId); + if (currentNode) { + const taskId = createTaskId(); + const taskParam = { + ...currentNode, + taskId, + }; + this.pushTaskToRunningMap(taskParam); + this.exec(taskParam); } else if (!this.hasRunningTask(executionId)) { - // 当一个流程在taskQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 + // 当一个流程在nodeQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 this.emit(EVENT_INSTANCE_COMPLETE, { executionId, + nodeId: runParams.nodeId, + taskId: runParams.taskId, status: FlowStatus.COMPLETED, }); } } - addRunningTask(taskUnit: TaskUnit) { - const { executionId } = taskUnit; - const taskId = createTaskId(); - taskUnit.taskId = taskId; + /** + * 恢复某个任务的执行。 + * 可以自定义节点手动实现流程中断,然后通过此方法恢复流程的执行。 + */ + public async resume(resumeParam: ResumeParam) { + this.pushTaskToRunningMap({ + executionId: resumeParam.executionId, + nodeId: resumeParam.nodeId, + taskId: resumeParam.taskId, + }); + const model = this.flowModel.createTask(resumeParam.nodeId); + await model.resume({ + ...resumeParam, + next: this.next.bind(this), + }); + } + // 流程执行过程中出错,停止执行 + stop(data) { + console.log('stop', data); + } + private pushTaskToRunningMap(taskParam) { + const { executionId, taskId } = taskParam; if (!this.taskRunningMap.has(executionId)) { - const runningMap = new Map(); + const runningMap = new Map(); this.taskRunningMap.set(executionId, runningMap); } - this.taskRunningMap.get(executionId).set(taskId, taskUnit); - return taskUnit; + this.taskRunningMap.get(executionId).set(taskId, taskParam); } - removeRunningTask(taskUnit: TaskUnit) { - const { executionId, taskId } = taskUnit; + private removeTaskFromRunningMap(taskParam: TaskParam) { + const { executionId, taskId } = taskParam; if (!taskId) return; const runningMap = this.taskRunningMap.get(executionId); if (!runningMap) return; runningMap.delete(taskId); } - hasRunningTask(executionId) { + private hasRunningTask(executionId) { const runningMap = this.taskRunningMap.get(executionId); if (!runningMap) return false; if (runningMap.size === 0) { @@ -66,20 +127,50 @@ export default class Scheduler extends EventEmitter { } return true; } - async exec(taskUnit: TaskUnit) { - const model = this.flowModel.createTask(taskUnit.nodeId); - const r = await model.execute({ - executionId: taskUnit.executionId, - taskId: taskUnit.taskId, - nodeId: taskUnit.nodeId, + private async exec(taskParam: TaskParam) { + const model = this.flowModel.createTask(taskParam.nodeId); + const execResult = await model.execute({ + executionId: taskParam.executionId, + taskId: taskParam.taskId, + nodeId: taskParam.nodeId, next: this.next.bind(this), }); - if (!r) this.cancel(taskUnit); + if (execResult && execResult.status === FlowStatus.INTERRUPTED) { + this.interrupted({ + execResult, + taskParam, + }); + this.saveTaskResult({ + executionId: taskParam.executionId, + nodeId: taskParam.nodeId, + taskId: taskParam.taskId, + nodeType: execResult.nodeType, + properties: execResult.properties, + outgoing: [], + extraInfo: { + status: execResult.status, + detail: execResult.detail, + }, + }); + this.removeTaskFromRunningMap(taskParam); + } + } + private interrupted({ + execResult, + taskParam, + } : { execResult: ActionResult, taskParam: TaskParam}) { + this.emit(EVENT_INSTANCE_INTERRUPTED, { + executionId: taskParam.executionId, + status: FlowStatus.INTERRUPTED, + nodeId: taskParam.nodeId, + taskId: taskParam.taskId, + detail: execResult.detail, + }); } - cancel(taskUnit: TaskUnit) { + private cancel(taskParam: TaskParam) { // TODO: 流程执行异常中断 } - async next(data: NextTaskUnit) { + private async next(data: NextTaskParam) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { this.addTask({ @@ -89,10 +180,14 @@ export default class Scheduler extends EventEmitter { }); } this.saveTaskResult(data); - this.removeRunningTask(data); - this.run(data.executionId); + this.removeTaskFromRunningMap(data); + this.run({ + executionId: data.executionId, + nodeId: data.nodeId, + taskId: data.taskId, + }); } - saveTaskResult(data: NextTaskUnit) { + private saveTaskResult(data: TaskResult) { this.recorder.addTask({ executionId: data.executionId, taskId: data.taskId, @@ -102,24 +197,12 @@ export default class Scheduler extends EventEmitter { properties: data.properties, }); } - getNextTask(executionId) { - const currentTaskQueue = this.taskQueueMap.get(executionId); + private getNextNode(executionId): NodeParam | null { + const currentTaskQueue = this.nodeQueueMap.get(executionId); if (!currentTaskQueue || currentTaskQueue.length === 0) { return null; } const currentTask = currentTaskQueue.shift(); return currentTask; } - // 流程执行过程中出错,停止执行 - async stop(data) { - console.log('stop', data); - } - addTask(taskUnit: TaskUnit) { - const { executionId } = taskUnit; - if (!this.taskQueueMap.has(executionId)) { - this.taskQueueMap.set(executionId, []); - } - const currentTaskQueue = this.taskQueueMap.get(executionId); - currentTaskQueue.push(taskUnit); - } } diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts index 3b6acc101..ddc96a178 100644 --- a/packages/engine/src/constant/constant.ts +++ b/packages/engine/src/constant/constant.ts @@ -3,10 +3,19 @@ export const BASE_START_NODE = 'start'; // event name export const EVENT_INSTANCE_COMPLETE = 'instance:complete'; +export const EVENT_INSTANCE_INTERRUPTED = 'instance:interrupted'; // flow status export enum FlowStatus { COMPLETED = 'completed', + INTERRUPTED = 'interrupted', RUNNING = 'running', ERROR = 'error', } + +// task status +export enum TaskStatus { + SUCCESS = 'success', + ERROR = 'error', + INTERRUPTED = 'interrupted', +} diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index dd7efe17a..7fb9b01e4 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -1,4 +1,5 @@ import type { GraphConfigData } from '@logicflow/core'; +import type { ResumeParams } from './types.d'; import FlowModel, { TaskParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; @@ -82,6 +83,19 @@ export default class Engine { }); }); } + async resume(resumeParam: ResumeParams) { + return new Promise((resolve, reject) => { + this.flowModel.resume({ + ...resumeParam, + callback: (result) => { + resolve(result); + }, + onError: (error) => { + reject(error); + }, + }); + }); + } async getExecutionRecord(executionId) { const tasks = await this.recorder.getExecutionTasks(executionId); const records = []; @@ -94,11 +108,8 @@ export default class Engine { export { Engine, -}; - -export const EngineNode = { - StartNode, TaskNode, + StartNode, }; export type { diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 88e32fa0a..ef177b6a8 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -1,4 +1,11 @@ +import { TaskStatus } from '../constant/constant'; import { getExpressionResult } from '../expression'; +import type { + ActionResult, + NodeExecResult, + ExecResumeParams, + ExecParams, +} from '../types.d'; export interface BaseNodeInterface { outgoing: Record[]; @@ -6,7 +13,7 @@ export interface BaseNodeInterface { nodeId: string; type: string; readonly baseType: string; - execute(taskUnit): Promise; + execute(taskParam): Promise; } export type NodeConstructor = { @@ -37,7 +44,7 @@ export type NodeConfig = { outgoing: OutgoingConfig[]; }; -export type NextTaskUnit = { +export type NextTaskParam = { executionId: string; nodeId: string; taskId: string; @@ -46,13 +53,6 @@ export type NextTaskUnit = { properties?: Record; }; -export type ExecParams = { - executionId: string; - taskId: string; - nodeId: string; - next: (data: NextTaskUnit) => void; -}; - export default class BaseNode implements BaseNodeInterface { static nodeTypeName = 'BaseNode'; /** @@ -92,10 +92,46 @@ export default class BaseNode implements BaseNodeInterface { /** * 节点的每一次执行都会生成一个唯一的taskId */ - async execute(params: ExecParams): Promise { - const r = await this.action(); + public async execute(params: ExecParams): Promise { + const r = await this.action({ + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + }); + if (!r || r.status === TaskStatus.SUCCESS) { + const outgoing = await this.getOutgoing(); + params.next({ + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, + outgoing, + }); + } + return { + status: r && r.status, + detail: r && r.detail, + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, + }; + } + /** + * 节点在执行中断后,可以通过resume方法恢复执行。 + * 自定义节点时不建议重写此方法 + */ + public async resume(params: ExecResumeParams): Promise { const outgoing = await this.getOutgoing(); - r && params.next({ + await this.onResume({ + executionId: params.executionId, + nodeId: params.nodeId, + taskId: params.taskId, + data: params.data, + }); + params.next({ executionId: params.executionId, taskId: params.taskId, nodeId: this.nodeId, @@ -103,13 +139,13 @@ export default class BaseNode implements BaseNodeInterface { properties: this.properties, outgoing, }); - return r; + return undefined; } - async getOutgoing() { + private async getOutgoing(): Promise { const outgoing = []; const expressions = []; for (const item of this.outgoing) { - const { id, target, properties } = item; + const { properties } = item; expressions.push(this.isPass(properties)); } const result = await Promise.all(expressions); @@ -120,7 +156,7 @@ export default class BaseNode implements BaseNodeInterface { }); return outgoing; } - async isPass(properties) { + private async isPass(properties) { if (!properties) return true; const { conditionExpression } = properties; if (!conditionExpression) return true; @@ -135,9 +171,31 @@ export default class BaseNode implements BaseNodeInterface { } /** * 节点的执行逻辑 - * @returns {boolean} 返回true表示执行成功,返回false表示执行失败,中断流程执行 + * @overridable 可以自定义节点重写此方法。 + * @param params.executionId 流程执行记录ID + * @param params.taskId 此节点执行记录ID + * @param params.nodeId 节点ID + */ + public async action(params: { + executionId: string; + taskId: string; + nodeId: string; + }): Promise { + return undefined; + } + /** + * 节点的重新恢复执行逻辑 + * @overridable 可以自定义节点重写此方法。 + * @param params.executionId 流程执行记录ID + * @param params.taskId 此节点执行记录ID + * @param params.nodeId 节点ID */ - async action() { - return true; + public async onResume(params: { + executionId: string, + taskId: string, + nodeId: string, + data?: Record, + }): Promise { + return undefined; } } diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index fa1872c3a..0d372c85c 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -1,17 +1,12 @@ +import type { + RecorderData, + RecorderInterface, +} from '../types.d'; import storage from '../util/storage'; const LOGICFLOW_ENGINE_INSTANCES = 'LOGICFLOW_ENGINE_INSTANCES'; -export type RecorderData = { - taskId: string; - nodeId: string; - executionId: string; - nodeType: string; - timestamp: number; - properties?: Record; -}; - -export default class Recorder { +export default class Recorder implements RecorderInterface { /* * @param {Object} task * { @@ -36,7 +31,7 @@ export default class Recorder { storage.setItem(executionId, instanceData); storage.setItem(taskId, task); } - async getTask(taskId) { + async getTask(taskId: string): Promise { return storage.getItem(taskId); } async getExecutionTasks(executionId) { diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts new file mode 100644 index 000000000..d853d45aa --- /dev/null +++ b/packages/engine/src/types.d.ts @@ -0,0 +1,58 @@ +/** + * 即将执行的节点参数 + */ +export type NodeParam = { + executionId: string; + nodeId: string; +} +/** + * 执行节点的参数 + */ +export type TaskParam = { + taskId: string; +} & NodeParam; + +export type ExecParams = { + next: (data: NextTaskParam) => void; +} & TaskParam; + +export type ResumeParam = { + data: Record; +} & TaskParam; + +export type ExecResumeParams = { + next: (data: NextTaskParam) => void; +} & ResumeParam; + +export type RecorderData = { + nodeType: string; + timestamp: number; + properties?: Record; +} & TaskParam; + +export interface RecorderInterface { + addTask: (task: RecorderData) => Promise; + getTask: (taskId: string) => Promise; + getExecutionTasks: (executionId: string) => Promise; + clear: () => void; +}; + +export type ActionResult = void | { + status: TaskStatus; + detail?: Record; +}; + +export type NodeExecResult = { + executionId: string, + taskId: string, + nodeId: string, + nodeType: string, + properties?: Record, +} & ActionResult; + +export type ResumeParams = { + executionId: string; + taskId: string; + nodeId: string; + data?: Record; +} From 8e17ea614c5c3567e532a67240cf8e0e9110b2bf Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Mon, 17 Jul 2023 16:24:29 +0800 Subject: [PATCH 08/11] feat(engie): support parallel execution within workflows --- .../__test__/06_parallelAndSerial.test.js | 88 +++++++++++++++++++ packages/engine/src/Scheduler.ts | 50 +++++------ packages/engine/src/recorder/index.ts | 20 +++-- packages/engine/src/types.d.ts | 6 +- 4 files changed, 128 insertions(+), 36 deletions(-) create mode 100644 packages/engine/__test__/06_parallelAndSerial.test.js diff --git a/packages/engine/__test__/06_parallelAndSerial.test.js b/packages/engine/__test__/06_parallelAndSerial.test.js new file mode 100644 index 000000000..1e015113d --- /dev/null +++ b/packages/engine/__test__/06_parallelAndSerial.test.js @@ -0,0 +1,88 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine parallel and serial', () => { + class FetchNode extends TaskNode { + async action() { + await this.fetch() + } + fetch() { + return new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100); + }) + } + } + const engine = new Engine(); + engine.register({ + type: 'FetchTask', + model: FetchNode, + }) + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'FetchTask', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + } + }, + { + id: 'edge3', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + globalData: { + }, + } + engine.load(flowData); + test('When the process is executed, the asynchronous node will not block the execution of other branch nodes.', async () => { + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + expect(execution.length).toBe(4); + expect(execution[3].nodeId).toEqual('node2') + }); + test('When the process is executed twice, the second execution will start only after the first execution is completed.', async () => { + const r = engine.execute(); + const r2 = engine.execute(); + const result = await Promise.all([r, r2]); + const execution1 = await engine.getExecutionRecord(result[0].executionId); + const execution2 = await engine.getExecutionRecord(result[1].executionId); + expect(execution2[0].timestamp >= execution1[3].timestamp).toBe(true) + }); +}) \ No newline at end of file diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index c0783a128..6014396bd 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -10,6 +10,7 @@ import type { TaskParam, NodeParam, ResumeParam, + NodeExecResult, } from './types.d'; import type FlowModel from './FlowModel'; import type { NextTaskParam } from './nodes/BaseNode'; @@ -63,20 +64,23 @@ export default class Scheduler extends EventEmitter { nodeId?: string; taskId?: string; }) { - const { executionId } = runParams; - const currentNode = this.getNextNode(executionId); - if (currentNode) { - const taskId = createTaskId(); - const taskParam = { - ...currentNode, - taskId, - }; - this.pushTaskToRunningMap(taskParam); - this.exec(taskParam); - } else if (!this.hasRunningTask(executionId)) { + const nodeQueue = this.nodeQueueMap.get(runParams.executionId); + if (nodeQueue.length > 0) { + this.nodeQueueMap.set(runParams.executionId, []); + for (let i = 0; i < nodeQueue.length; i++) { + const currentNode = nodeQueue[i]; + const taskId = createTaskId(); + const taskParam = { + ...currentNode, + taskId, + }; + this.pushTaskToRunningMap(taskParam); + this.exec(taskParam); + } + } else if (!this.hasRunningTask(runParams.executionId)) { // 当一个流程在nodeQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 this.emit(EVENT_INSTANCE_COMPLETE, { - executionId, + executionId: runParams.executionId, nodeId: runParams.nodeId, taskId: runParams.taskId, status: FlowStatus.COMPLETED, @@ -158,7 +162,7 @@ export default class Scheduler extends EventEmitter { private interrupted({ execResult, taskParam, - } : { execResult: ActionResult, taskParam: TaskParam}) { + } : { execResult: NodeExecResult, taskParam: TaskParam}) { this.emit(EVENT_INSTANCE_INTERRUPTED, { executionId: taskParam.executionId, status: FlowStatus.INTERRUPTED, @@ -167,9 +171,6 @@ export default class Scheduler extends EventEmitter { detail: execResult.detail, }); } - private cancel(taskParam: TaskParam) { - // TODO: 流程执行异常中断 - } private async next(data: NextTaskParam) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { @@ -179,7 +180,7 @@ export default class Scheduler extends EventEmitter { }); }); } - this.saveTaskResult(data); + await this.saveTaskResult(data); this.removeTaskFromRunningMap(data); this.run({ executionId: data.executionId, @@ -187,8 +188,11 @@ export default class Scheduler extends EventEmitter { taskId: data.taskId, }); } - private saveTaskResult(data: TaskResult) { - this.recorder.addTask({ + /** + * 为了防止多次添加导致 + */ + private async saveTaskResult(data: TaskResult) { + await this.recorder.addTask({ executionId: data.executionId, taskId: data.taskId, nodeId: data.nodeId, @@ -197,12 +201,4 @@ export default class Scheduler extends EventEmitter { properties: data.properties, }); } - private getNextNode(executionId): NodeParam | null { - const currentTaskQueue = this.nodeQueueMap.get(executionId); - if (!currentTaskQueue || currentTaskQueue.length === 0) { - return null; - } - const currentTask = currentTaskQueue.shift(); - return currentTask; - } } diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index 0d372c85c..7221fc155 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -20,15 +20,11 @@ export default class Recorder implements RecorderInterface { */ async addTask(task: RecorderData) { const { executionId, taskId } = task; - let instanceData = await this.getExecutionTasks(executionId); + const instanceData = await this.getExecutionTasks(executionId); if (!instanceData) { - instanceData = []; - const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; - instance.push(executionId); - storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); + this.pushExecution(executionId); } - instanceData.push(taskId); - storage.setItem(executionId, instanceData); + this.pushTaskToExecution(executionId, taskId); storage.setItem(taskId, task); } async getTask(taskId: string): Promise { @@ -48,4 +44,14 @@ export default class Recorder implements RecorderInterface { }); storage.removeItem(LOGICFLOW_ENGINE_INSTANCES); } + private pushExecution(executionId) { + const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + instance.push(executionId); + storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); + } + private pushTaskToExecution(executionId, taskId) { + const tasks = storage.getItem(executionId) || []; + tasks.push(taskId); + storage.setItem(executionId, tasks); + } } diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts index d853d45aa..5f0770eb1 100644 --- a/packages/engine/src/types.d.ts +++ b/packages/engine/src/types.d.ts @@ -37,8 +37,10 @@ export interface RecorderInterface { clear: () => void; }; -export type ActionResult = void | { - status: TaskStatus; +export type TaskStatus = 'success' | 'error' | 'interrupted' | ''; + +export type ActionResult = { + status?: TaskStatus; detail?: Record; }; From f3e905bd77f5357babaf3646e5aaed2904fb6f9b Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Tue, 25 Jul 2023 11:41:34 +0800 Subject: [PATCH 09/11] chore(engine): remove @logicflow/core from dependencies --- packages/engine/package.json | 1 - packages/engine/src/index.ts | 3 +- packages/engine/src/types.d.ts | 52 ++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/packages/engine/package.json b/packages/engine/package.json index 7d2301ea5..484038042 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -32,7 +32,6 @@ "readme.md" ], "dependencies": { - "@logicflow/core": "^1.2.9", "uuid": "^8.2.0" }, "standard-version": { diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 7fb9b01e4..f4ec55cfa 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -1,5 +1,4 @@ -import type { GraphConfigData } from '@logicflow/core'; -import type { ResumeParams } from './types.d'; +import type { ResumeParams, GraphConfigData } from './types.d'; import FlowModel, { TaskParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts index 5f0770eb1..e87283785 100644 --- a/packages/engine/src/types.d.ts +++ b/packages/engine/src/types.d.ts @@ -58,3 +58,55 @@ export type ResumeParams = { nodeId: string; data?: Record; } + +export declare type Point = { + id?: string; + x: number; + y: number; + [key: string]: unknown; +}; + +export declare type TextConfig = { + value: string; +} & Point; +export declare type GraphConfigData = { + nodes: NodeConfig[]; + edges: EdgeConfig[]; +}; +export declare type NodeConfig = { + id?: string; + type: string; + x: number; + y: number; + text?: TextConfig | string; + zIndex?: number; + properties?: Record; +}; +export declare type EdgeConfig = { + id?: string; + /** + * 边的类型,不传默认为lf.setDefaultEdgeType(type)传入的类型。 + * LogicFlow内部默认为polyline + */ + type?: string; + sourceNodeId: string; + sourceAnchorId?: string; + targetNodeId: string; + targetAnchorId?: string; + startPoint?: { + x: number; + y: number; + }; + endPoint?: { + x: number; + y: number; + }; + text?: { + x: number; + y: number; + value: string; + } | string; + pointsList?: Point[]; + zIndex?: number; + properties?: Record; +}; From 6fa0904ddee88254d4af5c246ff0247c988dfdd2 Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Thu, 27 Jul 2023 15:13:12 +0800 Subject: [PATCH 10/11] feat(engine): add comments and sync storage execution --- packages/engine/src/FlowModel.ts | 81 +++++++++++++++++++-------- packages/engine/src/Scheduler.ts | 38 ++++++++----- packages/engine/src/recorder/index.ts | 2 +- 3 files changed, 81 insertions(+), 40 deletions(-) diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 74eca8233..98a47d3c9 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -32,10 +32,6 @@ export default class FlowModel { * 流程支持的节点类型 */ nodeModelMap: Map; - /** - * 每一次执行流程都会生成一个唯一的executionId。 - */ - executionId: string; /** * 调度器,用于调度节点的执行。 */ @@ -120,8 +116,43 @@ export default class FlowModel { public setStartNodeType(startNodeType) { this.startNodeType = startNodeType; } + /** + * 解析LogicFlow图数据,将nodes和edges转换成节点格式。 + * 例如: + * graphData: { + * nodes: [ + * { id: 'node1', type: 'StartNode', properties: {} }, + * { id: 'node2', type: 'TaskNode', properties: {} }, + * ], + * edges: [ + * { id: 'edge1', sourceNodeId: 'node1', targetNodeId: 'node2', properties: {} }, + * ] + * } + * 转换成: + * nodeConfigMap: { + * node1: { + * id: 'node1', + * type: 'StartNode', + * properties: {}, + * incoming: [], + * outgoing: [{ id: 'edge1', properties: {}, target: 'node2' }] + * }, + * node2: { + * id: 'node2', + * type: 'TaskNode', + * properties: {}, + * incoming: [{ id: 'edge1', properties: {}, source: 'node1' }], + * outgoing: [], + * } + * } + * 此格式方便后续执行时,根据节点id快速找到节点和执行初始化节点模型。 + * 同时此方法还会找到所有的开始节点,方便后续执行时,从开始节点开始执行。 + * @param graphData 流程图数据 + */ public load(graphData) { const { nodes = [], edges = [] } = graphData; + this.startNodes = []; + this.nodeConfigMap = new Map(); nodes.forEach((node) => { if (this.nodeModelMap.has(node.type)) { const nodeConfig = { @@ -159,11 +190,10 @@ export default class FlowModel { }); } /** - * 执行流程 - * 同一次执行,这次执行内部的节点执行顺序为并行。 - * 多次执行,多次执行之间为串行。 - * 允许一个流程多次执行,效率更高。 - * 例如: + * 执行流程, 每次执行都会生成一个唯一的executionId,用于区分不同的执行。 + * 同一次执行,这次执行内部的节点执行顺序为并行。内部并行是为了避免异步节点阻塞其他节点的执行。 + * 多次执行,多次执行之间为串行,这里选择串行的原因是避免多次执行之间的数据冲突。 + * example: * 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。 * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 @@ -185,7 +215,8 @@ export default class FlowModel { this.createExecution(); } /** - * 创建节点实例 + * 创建节点实例, 每个节点实例都会有一个唯一的taskId。 + * 通过executionId、nodeId、taskId可以唯一确定一个节点的某一次执行。 * @param nodeId 节点Id * @returns 节点示例 */ @@ -208,11 +239,11 @@ export default class FlowModel { ...data, }; } + /** + * 在执行完成后,通知外部此次执行完成。 + * 如果还存在待执行的任务,那么继续执行。 + */ private onTaskFinished(result) { - const { executionId } = result; - if (executionId !== this.executionId) { - return; - } const { callback } = this.executingInstance; if (callback) { callback(result); @@ -224,24 +255,27 @@ export default class FlowModel { this.isRunning = false; } } + /** + * 从待执行队列中取出需要执行的内容。 + * 会依次判断是否有taskId、nodeId、executionId。 + * 若存在taskId,那么表示恢复执行。 + * 若存在nodeId,那么表示从指定节点开始执行。 + * 若都不存在,那么新建一个executionId,从开始节点开始执行。 + */ private createExecution() { const execParams = this.executeQueue.shift(); this.executingInstance = execParams; - if (execParams.executionId) { - this.executionId = execParams.executionId; - } else { - this.executionId = createExecId(); - } // 如果有taskId,那么表示恢复执行 - if (execParams.taskId) { + if (execParams.taskId && execParams.executionId && execParams.nodeId) { this.scheduler.resume({ - executionId: this.executionId, + executionId: execParams.executionId, taskId: execParams.taskId, nodeId: execParams.nodeId, data: execParams.data, }); return; } + const executionId = execParams.executionId || createExecId(); if (execParams.nodeId) { const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); if (!nodeConfig) { @@ -252,13 +286,12 @@ export default class FlowModel { } this.startNodes.forEach((startNode) => { this.scheduler.addTask({ - executionId: this.executionId, + executionId, nodeId: startNode.id, }); - // 所有的开始节点都执行 }); this.scheduler.run({ - executionId: this.executionId, + executionId, }); } } diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index 6014396bd..b3ca9a05e 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -22,23 +22,38 @@ type TaskResult = { extraInfo?: Record; } & NextTaskParam; +type ExecutionId = string; + /** * 调度器 * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 */ export default class Scheduler extends EventEmitter { - nodeQueueMap: Map; - taskRunningMap: Map; + /** + * 当前需要执行的节点队列 + */ + nodeQueueMap: Map; + /** + * 当前正在执行的节点集合 + * 在每个节点执行完成后,会从集合中删除。 + * 同时会判断此集合中是否还存在和此节点相同的executionId,如果不存在,说明此流程已经执行完成。 + */ + taskRunningMap: Map; + /** + * 流程模型,用于创建节点模型。 + */ flowModel: FlowModel; + /** + * 执行记录存储器 + * 用于存储节点执行的结果。 + */ recorder: Recorder; - currentTask: TaskParam | null; constructor(config) { super(); this.nodeQueueMap = new Map(); this.taskRunningMap = new Map(); this.flowModel = config.flowModel; this.recorder = config.recorder; - this.currentTask = null; } /** * 添加一个任务到队列中。 @@ -103,10 +118,6 @@ export default class Scheduler extends EventEmitter { next: this.next.bind(this), }); } - // 流程执行过程中出错,停止执行 - stop(data) { - console.log('stop', data); - } private pushTaskToRunningMap(taskParam) { const { executionId, taskId } = taskParam; if (!this.taskRunningMap.has(executionId)) { @@ -171,7 +182,7 @@ export default class Scheduler extends EventEmitter { detail: execResult.detail, }); } - private async next(data: NextTaskParam) { + private next(data: NextTaskParam) { if (data.outgoing && data.outgoing.length > 0) { data.outgoing.forEach((item) => { this.addTask({ @@ -180,7 +191,7 @@ export default class Scheduler extends EventEmitter { }); }); } - await this.saveTaskResult(data); + this.saveTaskResult(data); this.removeTaskFromRunningMap(data); this.run({ executionId: data.executionId, @@ -188,11 +199,8 @@ export default class Scheduler extends EventEmitter { taskId: data.taskId, }); } - /** - * 为了防止多次添加导致 - */ - private async saveTaskResult(data: TaskResult) { - await this.recorder.addTask({ + private saveTaskResult(data: TaskResult) { + this.recorder.addTask({ executionId: data.executionId, taskId: data.taskId, nodeId: data.nodeId, diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts index 7221fc155..6cdfcdb46 100644 --- a/packages/engine/src/recorder/index.ts +++ b/packages/engine/src/recorder/index.ts @@ -20,7 +20,7 @@ export default class Recorder implements RecorderInterface { */ async addTask(task: RecorderData) { const { executionId, taskId } = task; - const instanceData = await this.getExecutionTasks(executionId); + const instanceData = this.getExecutionTasks(executionId); if (!instanceData) { this.pushExecution(executionId); } From dfc7f17a52e0fa7b978b3f2724dd2735e005914b Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Thu, 27 Jul 2023 21:10:49 +0800 Subject: [PATCH 11/11] docs: update readme --- packages/engine/README.md | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/packages/engine/README.md b/packages/engine/README.md index fa6eef4e5..c697fe4a9 100644 --- a/packages/engine/README.md +++ b/packages/engine/README.md @@ -4,19 +4,7 @@ ## 使用方式 -```js -import LogicFlowEngine from '@logicflow/engine'; - -const flowModel = new LogicFlowEngine({ - graphData: { - nodes: [], - edges: [], - }, - global: { - // 全局数据 - } -}); - -flowModel.execute(); - +```shell +npm test ``` +