diff --git a/.eslintrc.js b/.eslintrc.js index fb01dcf14..e5c980287 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -27,7 +27,7 @@ module.exports = { pragma: 'h', }, }, - ignorePatterns: ["**/*.mjs"], + ignorePatterns: ["**/*.mjs", "**/*.test.js"], rules: { 'indent': ['error', 2, { SwitchCase: 1 }], 'linebreak-style': ['error', 'unix'], diff --git a/packages/core/package.json b/packages/core/package.json index 332f0d055..454eb2d63 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -8,7 +8,7 @@ "sideEffects": true, "jsdelivr": "dist/logic-flow.min.js", "license": "Apache-2.0", - "homepage": "https://docs.logic-flow.cn", + "homepage": "https://site.logic-flow.cn", "types": "types/index.d.ts", "repository": { "type": "git", diff --git a/packages/engine/README.md b/packages/engine/README.md new file mode 100644 index 000000000..c697fe4a9 --- /dev/null +++ b/packages/engine/README.md @@ -0,0 +1,10 @@ +# engine + +一个可以在JavaScript环境执行的流程引擎 + +## 使用方式 + +```shell +npm test +``` + diff --git a/packages/engine/__test__/01_index.test.js b/packages/engine/__test__/01_index.test.js new file mode 100644 index 000000000..3b1f166c9 --- /dev/null +++ b/packages/engine/__test__/01_index.test.js @@ -0,0 +1,37 @@ +import Engine from '../src/index'; + +describe('@logicflow/engine', () => { + test('Execution Process Completed, Returning Data Containing executionId', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + }, + { + id: 'node2', + type: 'TaskNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + } + ] + }, + context: {}, + globalData: {}, + } + const flowModel = engine.load(flowData); + const result = await engine.execute(); + expect(engine).toBeInstanceOf(Engine); + expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length); + expect(result).toHaveProperty('executionId'); + expect(result.status).toBe('completed'); + expect(result.nodeId).toEqual('node2'); + }); +}); diff --git a/packages/engine/__test__/02_recorder.test.js b/packages/engine/__test__/02_recorder.test.js new file mode 100644 index 000000000..814494474 --- /dev/null +++ b/packages/engine/__test__/02_recorder.test.js @@ -0,0 +1,56 @@ +import Engine from '../src/index'; + +describe('@logicflow/engine Recorder', () => { + test('Using the getExecutionRecord API, receive the complete execution record of the process.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: {} + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + } + ] + }, + global: {}, + } + engine.load(flowData); + const result = await engine.execute(); + const executionId = result.executionId; + /** + * [ + * { + * taskId: '', + * nodeId: '', + * instanceId: '', + * nodeType: '', + * timestamp: '', + * properties: {}, + * } + * ] + */ + const execution = await engine.getExecutionRecord(executionId); + expect(execution.length).toBe(2); + expect(execution[1]).toHaveProperty('taskId'); + expect(execution[1]).toHaveProperty('nodeId'); + expect(execution[1]).toHaveProperty('executionId'); + expect(execution[1]).toHaveProperty('nodeType'); + expect(execution[1]).toHaveProperty('timestamp'); + expect(execution[1]).toHaveProperty('properties'); + expect(execution[1].nodeId).toBe('node2'); + expect(execution[1].nodeType).toBe('TaskNode'); + }); +}); \ No newline at end of file diff --git a/packages/engine/__test__/03_condition.test.js b/packages/engine/__test__/03_condition.test.js new file mode 100644 index 000000000..25a18c5b6 --- /dev/null +++ b/packages/engine/__test__/03_condition.test.js @@ -0,0 +1,56 @@ +import Engine from '../src/index'; + +describe('@logicflow/engine condition', () => { + test('The process will not continue its execution if the condition expression evaluates to false.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + conditionExpression: 'a === 1' + } + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + conditionExpression: 'a === 2' + } + } + ] + }, + globalData: { + a: 2 + }, + } + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + expect(execution.length).toBe(2); + expect(execution[1].nodeId).toBe('node3'); + expect(execution[1].nodeType).toBe('TaskNode'); + }); +}); \ No newline at end of file diff --git a/packages/engine/__test__/04_execute.test.js b/packages/engine/__test__/04_execute.test.js new file mode 100644 index 000000000..535818aaf --- /dev/null +++ b/packages/engine/__test__/04_execute.test.js @@ -0,0 +1,244 @@ +import Engine from '../src/index'; + +describe('@logicflow/engine Execute', () => { + test('When there are multiple start nodes in a process, all of them are executed by default.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + + expect(execution.length).toBe(5); + }); + test('When there are multiple start nodes in a process, you can specify which start node to execute.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const result = await engine.execute({ + nodeId: 'node3' + }); + const execution = await engine.getExecutionRecord(result.executionId); + + expect(execution.length).toBe(2); + expect(execution[0].nodeId).toBe('node3'); + expect(execution[1].nodeId).toBe('node4'); + }); + test('When attempting to execute a non-existent start node in a process, an execution exception is raised.', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + try { + await engine.execute({ + nodeId: 'node6' + }); + } catch (error) { + expect(error.message).toContain('node6') + expect(error.message).toContain('1001'); + } + }); + test('When there are multiple start nodes in a process, parallel execution is supported, and each start node generates a unique execution ID', async () => { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const r = engine.execute({ + nodeId: 'node1' + }); + const r2 = engine.execute({ + nodeId: 'node3' + }); + const result = await Promise.all([r, r2]); + const execution1 = await engine.getExecutionRecord(result[0].executionId); + const execution2 = await engine.getExecutionRecord(result[1].executionId); + expect(execution1.length).toBe(2); + expect(execution1[0].nodeId).toBe('node1'); + expect(execution1[1].nodeId).toBe('node2'); + expect(execution2.length).toBe(2); + expect(execution2[0].nodeId).toBe('node3'); + expect(execution2[1].nodeId).toBe('node4'); + expect(result[0].executionId).not.toBe(result[1].executionId); + }); +}); \ No newline at end of file diff --git a/packages/engine/__test__/05_customNode.test.js b/packages/engine/__test__/05_customNode.test.js new file mode 100644 index 000000000..749c8ba44 --- /dev/null +++ b/packages/engine/__test__/05_customNode.test.js @@ -0,0 +1,78 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine Customize Node', () => { + class UserTask extends TaskNode { + async action() { + return { + status: 'interrupted', + detail: { + formId: 'form_1' + } + }; + } + } + const engine = new Engine(); + engine.register({ + type: 'UserTask', + model: UserTask, + }) + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'UserTask', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node2', + targetNodeId: 'node3', + properties: { + } + } + ] + }, + globalData: { + }, + } + engine.load(flowData); + test('After executing the process, receive the flow status as "interrupted" and include detailed information returned by the custom node.', async () => { + const result = await engine.execute(); + expect(result.status).toBe('interrupted'); + expect(result.detail.formId).toEqual('form_1'); + }); + test('After a process is interrupted, you can resume its execution using the API.', async () => { + const result = await engine.execute(); + const result2 = await engine.resume({ + executionId: result.executionId, + nodeId: result.nodeId, + taskId: result.taskId, + data: { + formId: 'form_2' + } + }) + expect(result2.status).toBe('completed') + expect(result2.nodeId).toEqual('node3') + }); +}); diff --git a/packages/engine/__test__/06_parallelAndSerial.test.js b/packages/engine/__test__/06_parallelAndSerial.test.js new file mode 100644 index 000000000..1e015113d --- /dev/null +++ b/packages/engine/__test__/06_parallelAndSerial.test.js @@ -0,0 +1,88 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine parallel and serial', () => { + class FetchNode extends TaskNode { + async action() { + await this.fetch() + } + fetch() { + return new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100); + }) + } + } + const engine = new Engine(); + engine.register({ + type: 'FetchTask', + model: FetchNode, + }) + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'FetchTask', + properties: {} + }, + { + id: 'node3', + type: 'TaskNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + } + }, + { + id: 'edge3', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + globalData: { + }, + } + engine.load(flowData); + test('When the process is executed, the asynchronous node will not block the execution of other branch nodes.', async () => { + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + expect(execution.length).toBe(4); + expect(execution[3].nodeId).toEqual('node2') + }); + test('When the process is executed twice, the second execution will start only after the first execution is completed.', async () => { + const r = engine.execute(); + const r2 = engine.execute(); + const result = await Promise.all([r, r2]); + const execution1 = await engine.getExecutionRecord(result[0].executionId); + const execution2 = await engine.getExecutionRecord(result[1].executionId); + expect(execution2[0].timestamp >= execution1[3].timestamp).toBe(true) + }); +}) \ No newline at end of file diff --git a/packages/engine/babel.config.js b/packages/engine/babel.config.js new file mode 100644 index 000000000..73445bdcb --- /dev/null +++ b/packages/engine/babel.config.js @@ -0,0 +1,8 @@ +module.exports = { + presets: [ + ['@babel/preset-env', { targets: { node: 'current' } }], + '@babel/preset-typescript', + ], + plugins: [ + ], +}; diff --git a/packages/engine/example/browser/index.html b/packages/engine/example/browser/index.html new file mode 100644 index 000000000..c42b47fc1 --- /dev/null +++ b/packages/engine/example/browser/index.html @@ -0,0 +1,16 @@ + + + + + + Page Title + + + + + +
+ + + + \ No newline at end of file diff --git a/packages/engine/example/browser/main.js b/packages/engine/example/browser/main.js new file mode 100644 index 000000000..64a760a79 --- /dev/null +++ b/packages/engine/example/browser/main.js @@ -0,0 +1,54 @@ +const { Engine } = window + +// console.log(Engine); +async function test() { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + }, + }, + { + id: 'node2', + type: 'TaskNode', + properties: {}, + }, + { + id: 'node3', + type: 'TaskNode', + properties: {}, + }, + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + conditionExpression: 'a === 1', + }, + }, + { + id: 'edge2', + sourceNodeId: 'node1', + targetNodeId: 'node3', + properties: { + conditionExpression: 'true', + }, + }, + ], + }, + globalData: { + a: 2, + }, + }; + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + console.log(execution); +} +test(); diff --git a/packages/engine/example/nodejs/index.js b/packages/engine/example/nodejs/index.js new file mode 100644 index 000000000..fbe01288d --- /dev/null +++ b/packages/engine/example/nodejs/index.js @@ -0,0 +1,58 @@ +const { Engine } = require('@logicflow/engine'); + +// console.log(Engine); +async function test() { + const engine = new Engine(); + const flowData = { + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + } + }, + { + id: 'node2', + type: 'TaskNode', + properties: {} + }, + { + id: 'node3', + type: 'StartNode', + properties: {} + }, + { + id: 'node4', + type: 'TaskNode', + properties: {} + }, + { + id: 'node5', + type: 'StartNode', + } + ], + edges: [ + { + id: 'edge1', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: { + } + }, + { + id: 'edge2', + sourceNodeId: 'node3', + targetNodeId: 'node4', + properties: { + } + } + ] + }, + } + engine.load(flowData); + const result = await engine.execute(); + const execution = await engine.getExecutionRecord(result.executionId); + console.log(execution); +} +test(); diff --git a/packages/engine/package.json b/packages/engine/package.json new file mode 100644 index 000000000..484038042 --- /dev/null +++ b/packages/engine/package.json @@ -0,0 +1,87 @@ +{ + "name": "@logicflow/engine", + "version": "0.0.1", + "description": "a process engine for javascript", + "main": "cjs/index.js", + "module": "es/index.js", + "homepage": "https://docs.logic-flow.cn", + "types": "es/index.d.ts", + "repository": { + "type": "git", + "url": "https://github.com/didi/LogicFlow", + "directory": "packages/engine" + }, + "license": "Apache-2.0", + "scripts": { + "dev": "cross-env NODE_ENV=development webpack-dev-server --client-log-level warning --config scripts/webpack.config.dev.js", + "types": "tsc -d --declarationDir ./types --outDir temp && rimraf -R temp", + "build:esm": "tsc --module esnext --target es5 --outDir ./es -d", + "build:cjs": "tsc --module commonjs --target es5 --outDir ./cjs", + "build:umd": "cross-env NODE_ENV=production webpack --config scripts/webpack.config.build.js", + "build": "rimraf ./es ./cjs ./lib && npm run build:esm & npm run build:cjs & npm run build:umd", + "publish-lib": "npm run build && npm publish", + "test": "jest", + "test:watch": "jest --watch" + }, + "author": "", + "files": [ + "cjs", + "es", + "lib", + "types", + "readme.md" + ], + "dependencies": { + "uuid": "^8.2.0" + }, + "standard-version": { + "skip": { + "tag": true, + "commit": true + } + }, + "keywords": [ + "logicflow", + "workflow", + "process", + "diagram" + ], + "devDependencies": { + "@babel/core": "^7.9.0", + "@babel/plugin-proposal-decorators": "^7.12.1", + "@babel/plugin-syntax-jsx": "^7.8.3", + "@babel/plugin-transform-react-jsx": "^7.10.4", + "@babel/preset-env": "^7.9.5", + "@babel/preset-typescript": "^7.9.0", + "@commitlint/config-conventional": "^8.3.4", + "@typescript-eslint/eslint-plugin": "^4.7.0", + "@typescript-eslint/parser": "^3.2.0", + "babel-core": "^7.0.0-bridge.0", + "babel-loader": "^8.1.0", + "babel-plugin-import": "^1.13.0", + "case-sensitive-paths-webpack-plugin": "^2.3.0", + "core-js": "^3.6.5", + "cross-env": "^7.0.2", + "css-loader": "^4.2.1", + "eslint": "^7.0.0", + "eslint-config-airbnb-typescript": "^9.0.0", + "eslint-plugin-import": "^2.22.0", + "eslint-plugin-jsx-a11y": "^6.3.1", + "eslint-plugin-react": "^7.20.6", + "eslint-plugin-standard": "^4.0.1", + "eslint-webpack-plugin": "^2.1.0", + "html-webpack-plugin": "^4.2.0", + "less-loader": "^6.0.0", + "prettier": "^2.2.1", + "rimraf": "^3.0.2", + "standard-version": "^9.0.0", + "style-loader": "^1.2.0", + "typescript": "^3.8.3", + "url-loader": "^4.1.0", + "webpack": "^4.43.0", + "webpack-bundle-analyzer": "^4.1.0", + "webpack-cli": "^3.3.11", + "webpack-dev-server": "^3.10.3" + }, + "gitHead": "ab8cc6126e9dfa5c4fb18037538a374d3a0b0521" +} diff --git a/packages/engine/scripts/webpack.config.base.js b/packages/engine/scripts/webpack.config.base.js new file mode 100644 index 000000000..26891f1c2 --- /dev/null +++ b/packages/engine/scripts/webpack.config.base.js @@ -0,0 +1,45 @@ +const path = require('path'); + +module.exports = { + entry: path.resolve(__dirname, '../src/index.ts'), + output: { + path: path.resolve(__dirname, '../lib'), + filename: 'index.js', + libraryTarget: 'umd', + // libraryExport: 'default', // 兼容 ES6(ES2015) 的模块系统、CommonJS 和 AMD 模块规范 + }, + resolve: { + extensions: ['.ts', '.js'], + symlinks: false, + }, + module: { + rules: [ + { + test: /\.(js|ts)$/, + exclude: /node_modules/, + use: [ + { + loader: 'babel-loader', + options: { + presets: [ + [ + '@babel/preset-typescript', + { + allExtensions: true, + }, + ], + [ + '@babel/preset-env', + { + useBuiltIns: 'usage', + corejs: '3.3', + }, + ], + ], + }, + }, + ], + }, + ], + }, +}; diff --git a/packages/engine/scripts/webpack.config.build.js b/packages/engine/scripts/webpack.config.build.js new file mode 100644 index 000000000..65c5835c2 --- /dev/null +++ b/packages/engine/scripts/webpack.config.build.js @@ -0,0 +1,19 @@ +const path = require('path'); +const webpack = require('webpack'); +const CaseSensitivePathsPlugin = require('case-sensitive-paths-webpack-plugin'); +const baseWebpackConfig = require('./webpack.config.base.js'); + +module.exports = [ + { + ...baseWebpackConfig, + mode: 'production', + output: { + path: path.resolve(__dirname, '../lib'), + filename: '[name].js', + libraryTarget: 'umd', + }, + plugins: [ + new CaseSensitivePathsPlugin() + ], + }, +]; diff --git a/packages/engine/scripts/webpack.config.dev.js b/packages/engine/scripts/webpack.config.dev.js new file mode 100644 index 000000000..14cf13f8b --- /dev/null +++ b/packages/engine/scripts/webpack.config.dev.js @@ -0,0 +1,29 @@ +const path = require('path'); +const webpack = require('webpack'); +const CaseSensitivePathsPlugin = require('case-sensitive-paths-webpack-plugin'); +// const ESLintPlugin = require('eslint-webpack-plugin'); +const baseWebpackConfig = require('./webpack.config.base.js'); + +// 先不用webpack merge +module.exports = [ + { + ...baseWebpackConfig, + devtool: 'inline-source-map', + mode: 'development', + devServer: { + contentBase: path.join(__dirname, '../../'), + stats: 'errors-warnings', + port: 9094, + host: '127.0.0.1', + watchOptions: { + poll: true, + }, + }, + plugins: [ + new CaseSensitivePathsPlugin(), + // new ESLintPlugin({ + // extensions: ['ts', 'tsx'], + // }), + ], + }, +]; diff --git a/packages/engine/src/EventEmitter.ts b/packages/engine/src/EventEmitter.ts new file mode 100644 index 000000000..29f89f069 --- /dev/null +++ b/packages/engine/src/EventEmitter.ts @@ -0,0 +1,64 @@ +export default class EventEmitter { + _events: Record; + constructor() { + this._events = {}; + } + on(evKey, callback, once = false) { + evKey = evKey.trim(); + if (!this._events[evKey]) { + this._events[evKey] = []; + } + this._events[evKey].push({ + callback, + once: !!once, + }); + } + emit(evt, eventArgs) { + const events = this._events[evt] || []; + // 实际的处理 emit 方法 + const doEmit = (es) => { + let { length } = es; + for (let i = 0; i < length; i++) { + if (!es[i]) { + // eslint-disable-next-line no-continue + continue; + } + const { callback, once } = es[i]; + if (once) { + es.splice(i, 1); + if (es.length === 0) { + delete this._events[evt]; + } + length--; + i--; + } + callback.apply(this, [eventArgs]); + } + }; + doEmit(events); + } + off(evt, callback) { + if (!evt) { + // evt 为空全部清除 + this._events = {}; + } + if (!callback) { + // evt 存在,callback 为空,清除事件所有方法 + delete this._events[evt]; + } else { + // evt 存在,callback 存在,清除匹配的 + const events = this._events[evt] || []; + let { length } = events; + for (let i = 0; i < length; i++) { + if (events[i].callback === callback) { + events.splice(i, 1); + length--; + i--; + } + } + if (events.length === 0) { + delete this._events[evt]; + } + } + } +} diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts new file mode 100644 index 000000000..98a47d3c9 --- /dev/null +++ b/packages/engine/src/FlowModel.ts @@ -0,0 +1,297 @@ +import type { + NodeConfig, + NodeConstructor, +} from './nodes/BaseNode'; +import type Recorder from './recorder'; +import { + EVENT_INSTANCE_COMPLETE, EVENT_INSTANCE_INTERRUPTED, +} from './constant/constant'; +import { createExecId } from './util/ID'; +import Scheduler from './Scheduler'; +import { ErrorCode, getErrorMsg } from './constant/LogCode'; +import type { TaskParam } from './types.d'; + +export type FlowResult = { + result?: Record; +} & TaskParam; + +export type TaskParams = { + executionId?: string; + taskId?: string; + nodeId?: string; + data?: Record; +}; + +export type ExecParams = { + callback?: (result: FlowResult) => void; + onError?: (error: Error) => void; +} & TaskParams; + +export default class FlowModel { + /** + * 流程支持的节点类型 + */ + nodeModelMap: Map; + /** + * 调度器,用于调度节点的执行。 + */ + scheduler: Scheduler; + /** + * 待执行的队列,当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 + */ + executeQueue: ExecParams[]; + /** + * 当前正在执行。当监听到调度器执行完成时,出触发执行参数中的回调,告知外部执行完成。 + */ + executingInstance: ExecParams; + /** + * 当前流程模型中的所有节点,边会被转换成节点的incoming和outgoing属性。 + */ + nodeConfigMap: Map = new Map(); + /** + * 当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。 + */ + isRunning: boolean; + /** + * 开始节点类型,在执行流程时,会从这些节点开始执行。 + */ + startNodeType: string; + /** + * 当前流程中开始节点组成的数组。 + */ + startNodes: NodeConfig[] = []; + /** + * 用于存储全局数据,可以在流程中共享。 + */ + globalData: Record = {}; + /** + * 外部传入的上下文,最终会传递给每个节点 + * 例如: + * const context = { + * request: { + * get: (url) => { + * return fetch(url); + * } + * } + * 在节点内部可以通过 this.context.request.get(url) 来调用。 + */ + context: Record; + constructor({ + nodeModelMap, + recorder, + context = {}, + globalData = {}, + startNodeType = 'StartNode', + }: { + nodeModelMap: Map; + recorder: Recorder; + context?: Record; + globalData?: Record; + startNodeType?: string; + }) { + // 流程包含的节点类型 + this.nodeModelMap = nodeModelMap; + // 需要执行的队列 + this.executeQueue = []; + // 执行中的任务 + this.executingInstance = null; + // 外部传入的上下文,最终会传递给每个节点 + this.context = context; + // 用于存储全局数据,可以在流程中共享。 + this.globalData = globalData; + // 开始节点类型,在执行流程时,会从这些节点开始执行。 + this.startNodeType = startNodeType; + this.isRunning = false; + this.scheduler = new Scheduler({ + flowModel: this, + recorder, + }); + this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => { + this.onTaskFinished(result); + }); + this.scheduler.on(EVENT_INSTANCE_INTERRUPTED, (result) => { + this.onTaskFinished(result); + }); + } + public setStartNodeType(startNodeType) { + this.startNodeType = startNodeType; + } + /** + * 解析LogicFlow图数据,将nodes和edges转换成节点格式。 + * 例如: + * graphData: { + * nodes: [ + * { id: 'node1', type: 'StartNode', properties: {} }, + * { id: 'node2', type: 'TaskNode', properties: {} }, + * ], + * edges: [ + * { id: 'edge1', sourceNodeId: 'node1', targetNodeId: 'node2', properties: {} }, + * ] + * } + * 转换成: + * nodeConfigMap: { + * node1: { + * id: 'node1', + * type: 'StartNode', + * properties: {}, + * incoming: [], + * outgoing: [{ id: 'edge1', properties: {}, target: 'node2' }] + * }, + * node2: { + * id: 'node2', + * type: 'TaskNode', + * properties: {}, + * incoming: [{ id: 'edge1', properties: {}, source: 'node1' }], + * outgoing: [], + * } + * } + * 此格式方便后续执行时,根据节点id快速找到节点和执行初始化节点模型。 + * 同时此方法还会找到所有的开始节点,方便后续执行时,从开始节点开始执行。 + * @param graphData 流程图数据 + */ + public load(graphData) { + const { nodes = [], edges = [] } = graphData; + this.startNodes = []; + this.nodeConfigMap = new Map(); + nodes.forEach((node) => { + if (this.nodeModelMap.has(node.type)) { + const nodeConfig = { + id: node.id, + type: node.type, + properties: node.properties, + incoming: [], + outgoing: [], + }; + this.nodeConfigMap.set(node.id, nodeConfig); + if (node.type === this.startNodeType) { + this.startNodes.push(nodeConfig); + } + } else { + console.warn(`未识别的节点类型: ${node.type}`); + } + }); + edges.forEach((edge) => { + const sourceNode = this.nodeConfigMap.get(edge.sourceNodeId); + const targetNode = this.nodeConfigMap.get(edge.targetNodeId); + if (sourceNode) { + sourceNode.outgoing.push({ + id: edge.id, + properties: edge.properties, + target: edge.targetNodeId, + }); + } + if (targetNode && targetNode.type !== this.startNodeType) { + targetNode.incoming.push({ + id: edge.id, + properties: edge.properties, + source: edge.sourceNodeId, + }); + } + }); + } + /** + * 执行流程, 每次执行都会生成一个唯一的executionId,用于区分不同的执行。 + * 同一次执行,这次执行内部的节点执行顺序为并行。内部并行是为了避免异步节点阻塞其他节点的执行。 + * 多次执行,多次执行之间为串行,这里选择串行的原因是避免多次执行之间的数据冲突。 + * example: + * 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。 + * 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。 + * 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。 + */ + public async execute(params: ExecParams) { + this.executeQueue.push(params); + if (this.isRunning) { + return; + } + this.isRunning = true; + this.createExecution(); + } + public async resume(params: ExecParams) { + this.executeQueue.push(params); + if (this.isRunning) { + return; + } + this.isRunning = true; + this.createExecution(); + } + /** + * 创建节点实例, 每个节点实例都会有一个唯一的taskId。 + * 通过executionId、nodeId、taskId可以唯一确定一个节点的某一次执行。 + * @param nodeId 节点Id + * @returns 节点示例 + */ + public createTask(nodeId: string) { + const nodeConfig = this.nodeConfigMap.get(nodeId); + const NodeModel = this.nodeModelMap.get(nodeConfig.type); + const task = new NodeModel({ + nodeConfig, + globalData: this.globalData, + context: this.context, + }); + return task; + } + /** + * 更新流程全局数据 + */ + public updateGlobalData(data) { + this.globalData = { + ...this.globalData, + ...data, + }; + } + /** + * 在执行完成后,通知外部此次执行完成。 + * 如果还存在待执行的任务,那么继续执行。 + */ + private onTaskFinished(result) { + const { callback } = this.executingInstance; + if (callback) { + callback(result); + } + this.executingInstance = null; + if (this.executeQueue.length > 0) { + this.createExecution(); + } else { + this.isRunning = false; + } + } + /** + * 从待执行队列中取出需要执行的内容。 + * 会依次判断是否有taskId、nodeId、executionId。 + * 若存在taskId,那么表示恢复执行。 + * 若存在nodeId,那么表示从指定节点开始执行。 + * 若都不存在,那么新建一个executionId,从开始节点开始执行。 + */ + private createExecution() { + const execParams = this.executeQueue.shift(); + this.executingInstance = execParams; + // 如果有taskId,那么表示恢复执行 + if (execParams.taskId && execParams.executionId && execParams.nodeId) { + this.scheduler.resume({ + executionId: execParams.executionId, + taskId: execParams.taskId, + nodeId: execParams.nodeId, + data: execParams.data, + }); + return; + } + const executionId = execParams.executionId || createExecId(); + if (execParams.nodeId) { + const nodeConfig = this.nodeConfigMap.get(execParams.nodeId); + if (!nodeConfig) { + execParams.onError(new Error(`${getErrorMsg(ErrorCode.NONE_NODE_ID)}(${execParams.nodeId})`)); + return; + } + this.startNodes = [nodeConfig]; + } + this.startNodes.forEach((startNode) => { + this.scheduler.addTask({ + executionId, + nodeId: startNode.id, + }); + }); + this.scheduler.run({ + executionId, + }); + } +} diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts new file mode 100644 index 000000000..b3ca9a05e --- /dev/null +++ b/packages/engine/src/Scheduler.ts @@ -0,0 +1,212 @@ +import EventEmitter from './EventEmitter'; +import { + EVENT_INSTANCE_COMPLETE, + EVENT_INSTANCE_INTERRUPTED, + FlowStatus, +} from './constant/constant'; +import { createTaskId } from './util/ID'; +import type { + ActionResult, + TaskParam, + NodeParam, + ResumeParam, + NodeExecResult, +} from './types.d'; +import type FlowModel from './FlowModel'; +import type { NextTaskParam } from './nodes/BaseNode'; +import type Recorder from './recorder'; + +type TaskParamMap = Map; + +type TaskResult = { + extraInfo?: Record; +} & NextTaskParam; + +type ExecutionId = string; + +/** + * 调度器 + * 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点 + */ +export default class Scheduler extends EventEmitter { + /** + * 当前需要执行的节点队列 + */ + nodeQueueMap: Map; + /** + * 当前正在执行的节点集合 + * 在每个节点执行完成后,会从集合中删除。 + * 同时会判断此集合中是否还存在和此节点相同的executionId,如果不存在,说明此流程已经执行完成。 + */ + taskRunningMap: Map; + /** + * 流程模型,用于创建节点模型。 + */ + flowModel: FlowModel; + /** + * 执行记录存储器 + * 用于存储节点执行的结果。 + */ + recorder: Recorder; + constructor(config) { + super(); + this.nodeQueueMap = new Map(); + this.taskRunningMap = new Map(); + this.flowModel = config.flowModel; + this.recorder = config.recorder; + } + /** + * 添加一个任务到队列中。 + * 1. 由流程模型将所有的开始节点添加到队列中。 + * 2. 当一个节点执行完成后,将后续的节点添加到队列中。 + */ + public addTask(nodeParam: NodeParam) { + const { executionId } = nodeParam; + if (!this.nodeQueueMap.has(executionId)) { + this.nodeQueueMap.set(executionId, []); + } + const currentTaskQueue = this.nodeQueueMap.get(executionId); + currentTaskQueue.push(nodeParam); + } + /** + * 调度器执行下一个任务 + * 1. 提供给流程模型,用户开始执行第一个任务。 + * 2. 内部任务执行完成后,调用此方法继续执行下一个任务。 + * 3. 当判断没有可以继续执行的任务后,触发流程结束事件。 + */ + public run(runParams: { + executionId: string; + nodeId?: string; + taskId?: string; + }) { + const nodeQueue = this.nodeQueueMap.get(runParams.executionId); + if (nodeQueue.length > 0) { + this.nodeQueueMap.set(runParams.executionId, []); + for (let i = 0; i < nodeQueue.length; i++) { + const currentNode = nodeQueue[i]; + const taskId = createTaskId(); + const taskParam = { + ...currentNode, + taskId, + }; + this.pushTaskToRunningMap(taskParam); + this.exec(taskParam); + } + } else if (!this.hasRunningTask(runParams.executionId)) { + // 当一个流程在nodeQueueMap和taskRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 + this.emit(EVENT_INSTANCE_COMPLETE, { + executionId: runParams.executionId, + nodeId: runParams.nodeId, + taskId: runParams.taskId, + status: FlowStatus.COMPLETED, + }); + } + } + /** + * 恢复某个任务的执行。 + * 可以自定义节点手动实现流程中断,然后通过此方法恢复流程的执行。 + */ + public async resume(resumeParam: ResumeParam) { + this.pushTaskToRunningMap({ + executionId: resumeParam.executionId, + nodeId: resumeParam.nodeId, + taskId: resumeParam.taskId, + }); + const model = this.flowModel.createTask(resumeParam.nodeId); + await model.resume({ + ...resumeParam, + next: this.next.bind(this), + }); + } + private pushTaskToRunningMap(taskParam) { + const { executionId, taskId } = taskParam; + if (!this.taskRunningMap.has(executionId)) { + const runningMap = new Map(); + this.taskRunningMap.set(executionId, runningMap); + } + this.taskRunningMap.get(executionId).set(taskId, taskParam); + } + private removeTaskFromRunningMap(taskParam: TaskParam) { + const { executionId, taskId } = taskParam; + if (!taskId) return; + const runningMap = this.taskRunningMap.get(executionId); + if (!runningMap) return; + runningMap.delete(taskId); + } + private hasRunningTask(executionId) { + const runningMap = this.taskRunningMap.get(executionId); + if (!runningMap) return false; + if (runningMap.size === 0) { + this.taskRunningMap.delete(executionId); + return false; + } + return true; + } + private async exec(taskParam: TaskParam) { + const model = this.flowModel.createTask(taskParam.nodeId); + const execResult = await model.execute({ + executionId: taskParam.executionId, + taskId: taskParam.taskId, + nodeId: taskParam.nodeId, + next: this.next.bind(this), + }); + if (execResult && execResult.status === FlowStatus.INTERRUPTED) { + this.interrupted({ + execResult, + taskParam, + }); + this.saveTaskResult({ + executionId: taskParam.executionId, + nodeId: taskParam.nodeId, + taskId: taskParam.taskId, + nodeType: execResult.nodeType, + properties: execResult.properties, + outgoing: [], + extraInfo: { + status: execResult.status, + detail: execResult.detail, + }, + }); + this.removeTaskFromRunningMap(taskParam); + } + } + private interrupted({ + execResult, + taskParam, + } : { execResult: NodeExecResult, taskParam: TaskParam}) { + this.emit(EVENT_INSTANCE_INTERRUPTED, { + executionId: taskParam.executionId, + status: FlowStatus.INTERRUPTED, + nodeId: taskParam.nodeId, + taskId: taskParam.taskId, + detail: execResult.detail, + }); + } + private next(data: NextTaskParam) { + if (data.outgoing && data.outgoing.length > 0) { + data.outgoing.forEach((item) => { + this.addTask({ + executionId: data.executionId, + nodeId: item.target, + }); + }); + } + this.saveTaskResult(data); + this.removeTaskFromRunningMap(data); + this.run({ + executionId: data.executionId, + nodeId: data.nodeId, + taskId: data.taskId, + }); + } + private saveTaskResult(data: TaskResult) { + this.recorder.addTask({ + executionId: data.executionId, + taskId: data.taskId, + nodeId: data.nodeId, + nodeType: data.nodeType, + timestamp: Date.now(), + properties: data.properties, + }); + } +} diff --git a/packages/engine/src/constant/LogCode.ts b/packages/engine/src/constant/LogCode.ts new file mode 100644 index 000000000..90ae66049 --- /dev/null +++ b/packages/engine/src/constant/LogCode.ts @@ -0,0 +1,30 @@ +export enum ErrorCode { + // 模型数据错误 + NONE_START_NODE = 1000, + NONE_NODE_ID = 1001, + // 表达式错误 + NO_DOCUMENT_BODY = 2001, +} + +export enum WarningCode { + NONE_START_NODE_IN_DATA = 2000, + START_NODE_INCOMING = 2001, + // 表达式判断异常 + EXPRESSION_EXEC_ERROR = 3000, +} + +const errorMsgMapCn = { + [ErrorCode.NONE_START_NODE]: '未找到入度为0的节点', + [ErrorCode.NONE_NODE_ID]: '流程数据中存在没有此节点', + [ErrorCode.NO_DOCUMENT_BODY]: '找不到document.body, 请在DOM加载完成后再执行', +}; + +const warningMsgMapCn = { + [WarningCode.NONE_START_NODE_IN_DATA]: '初始化数据中未找到入度为0的节点', + [WarningCode.START_NODE_INCOMING]: '开始节点不允许被连入', + [WarningCode.EXPRESSION_EXEC_ERROR]: '表达式执行异常', +}; + +export const getErrorMsg = (code: ErrorCode) => `error[${code}]: ${errorMsgMapCn[code]}`; + +export const getWarningMsg = (code: WarningCode) => `warning[${code}]: ${warningMsgMapCn[code]}`; diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts new file mode 100644 index 000000000..ddc96a178 --- /dev/null +++ b/packages/engine/src/constant/constant.ts @@ -0,0 +1,21 @@ +// baseType +export const BASE_START_NODE = 'start'; + +// event name +export const EVENT_INSTANCE_COMPLETE = 'instance:complete'; +export const EVENT_INSTANCE_INTERRUPTED = 'instance:interrupted'; + +// flow status +export enum FlowStatus { + COMPLETED = 'completed', + INTERRUPTED = 'interrupted', + RUNNING = 'running', + ERROR = 'error', +} + +// task status +export enum TaskStatus { + SUCCESS = 'success', + ERROR = 'error', + INTERRUPTED = 'interrupted', +} diff --git a/packages/engine/src/expression/browserVm.ts b/packages/engine/src/expression/browserVm.ts new file mode 100644 index 000000000..46f0b13e3 --- /dev/null +++ b/packages/engine/src/expression/browserVm.ts @@ -0,0 +1,32 @@ +import { + ErrorCode, + WarningCode, + getErrorMsg, + getWarningMsg, +} from '../constant/LogCode'; + +const runInBrowserContext = async (code: string, globalData: any = {}) => { + const iframe = document.createElement('iframe'); + iframe.style.display = 'none'; + if (!document || !document.body) { + console.error(getErrorMsg(ErrorCode.NO_DOCUMENT_BODY)); + } + document.body.appendChild(iframe); + const iframeWindow = iframe.contentWindow as any; + const iframeEval = iframeWindow.eval; + Object.keys(globalData).forEach((key) => { + iframeWindow[key] = globalData[key]; + }); + let res = null; + try { + res = iframeEval.call(iframeWindow, code); + } catch (e) { + console.warn(getWarningMsg(WarningCode.EXPRESSION_EXEC_ERROR), { code, globalData, e }); + } + document.body.removeChild(iframe); + return res; +}; + +export { + runInBrowserContext, +}; diff --git a/packages/engine/src/expression/index.ts b/packages/engine/src/expression/index.ts new file mode 100644 index 000000000..3bbabe10a --- /dev/null +++ b/packages/engine/src/expression/index.ts @@ -0,0 +1,19 @@ +import { runInNewContext } from './nodeVm'; +import { runInBrowserContext } from './browserVm'; +import { isInNodeJS, isInBrowser, globalScope } from '../util/global'; + +const getExpressionResult = async (code: string, context: any) => { + if (isInNodeJS) { + const r = await runInNewContext(code, context); + return r; + } + if (isInBrowser) { + const r = await runInBrowserContext(code, context); + return r; + } + return globalScope.eval(code); // eslint-disable-line no-eval +}; + +export { + getExpressionResult, +}; diff --git a/packages/engine/src/expression/nodeVm.ts b/packages/engine/src/expression/nodeVm.ts new file mode 100644 index 000000000..b2f8672d0 --- /dev/null +++ b/packages/engine/src/expression/nodeVm.ts @@ -0,0 +1,12 @@ +// import vm from 'node:vm'; +const vm = require('vm'); + +const runInNewContext = async (code: string, globalData: any = {}) => { + const context = vm.createContext(globalData); + vm.runInContext(code, context); + return context; +}; + +export { + runInNewContext, +}; diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts new file mode 100644 index 000000000..f4ec55cfa --- /dev/null +++ b/packages/engine/src/index.ts @@ -0,0 +1,116 @@ +import type { ResumeParams, GraphConfigData } from './types.d'; +import FlowModel, { TaskParams } from './FlowModel'; +import StartNode from './nodes/StartNode'; +import TaskNode from './nodes/TaskNode'; +import Recorder from './recorder'; + +export default class Engine { + global: Record; + graphData: GraphConfigData; + nodeModelMap: Map; + flowModel: FlowModel; + recorder: Recorder; + constructor() { + this.nodeModelMap = new Map(); + this.recorder = new Recorder(); + // register node + this.register({ + type: StartNode.nodeTypeName, + model: StartNode, + }); + this.register({ + type: TaskNode.nodeTypeName, + model: TaskNode, + }); + } + /** + * 注册节点 + * @param nodeConfig { type: 'custom-node', model: Class } + */ + register(nodeConfig) { + this.nodeModelMap.set(nodeConfig.type, nodeConfig.model); + } + /** + * 自定义执行记录的存储,默认浏览器使用 sessionStorage,nodejs 使用内存存储。 + * 注意:由于执行记录不会主动删除,所以需要自行清理。 + * nodejs环境建议自定义为持久化存储。 + * engine.setCustomRecorder({ + * async addTask(task) {} + * async getTask(taskId) {} + * async getExecutionTasks(executionId) {} + * clear() {} + * }); + */ + setCustomRecorder(recorder: Recorder) { + this.recorder = recorder; + } + /** + * 加载流程图数据 + */ + load({ + graphData, + startNodeType = 'StartNode', + globalData = {}, + context = {}, + }) { + this.flowModel = new FlowModel({ + nodeModelMap: this.nodeModelMap, + recorder: this.recorder, + context, + globalData, + startNodeType, + }); + this.flowModel.load(graphData); + return this.flowModel; + } + /** + * 执行流程,允许多次调用。 + */ + async execute(execParam?: TaskParams) { + return new Promise((resolve, reject) => { + if (!execParam) { + execParam = {}; + } + this.flowModel.execute({ + ...execParam, + callback: (result) => { + resolve(result); + }, + onError: (error) => { + reject(error); + }, + }); + }); + } + async resume(resumeParam: ResumeParams) { + return new Promise((resolve, reject) => { + this.flowModel.resume({ + ...resumeParam, + callback: (result) => { + resolve(result); + }, + onError: (error) => { + reject(error); + }, + }); + }); + } + async getExecutionRecord(executionId) { + const tasks = await this.recorder.getExecutionTasks(executionId); + const records = []; + for (let i = 0; i < tasks.length; i++) { + records.push(this.recorder.getTask(tasks[i])); + } + return Promise.all(records); + } +} + +export { + Engine, + TaskNode, + StartNode, +}; + +export type { + TaskParams, +}; diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts new file mode 100644 index 000000000..ef177b6a8 --- /dev/null +++ b/packages/engine/src/nodes/BaseNode.ts @@ -0,0 +1,201 @@ +import { TaskStatus } from '../constant/constant'; +import { getExpressionResult } from '../expression'; +import type { + ActionResult, + NodeExecResult, + ExecResumeParams, + ExecParams, +} from '../types.d'; + +export interface BaseNodeInterface { + outgoing: Record[]; + incoming: Record[]; + nodeId: string; + type: string; + readonly baseType: string; + execute(taskParam): Promise; +} + +export type NodeConstructor = { + new (config: { + nodeConfig: NodeConfig; + context: Record; + globalData: Record; + }): BaseNode; +}; + +export type IncomingConfig = { + id: string; + properties?: Record; + source: string; +}; + +export type OutgoingConfig = { + id: string; + target: string; + properties?: Record; +}; + +export type NodeConfig = { + id: string; + type: string; + properties?: Record; + incoming: IncomingConfig[]; + outgoing: OutgoingConfig[]; +}; + +export type NextTaskParam = { + executionId: string; + nodeId: string; + taskId: string; + nodeType: string; + outgoing: OutgoingConfig[]; + properties?: Record; +}; + +export default class BaseNode implements BaseNodeInterface { + static nodeTypeName = 'BaseNode'; + /** + * 节点的出边 + */ + outgoing: OutgoingConfig[]; + /** + * 节点的入边 + */ + incoming: IncomingConfig[]; + /** + * 节点的属性 + */ + properties?: Record; + nodeId: string; + type: string; + /** + * 节点的上下文,是调用流程时传入的上下文 + */ + context: Record; + /** + * 节点的全局数据,是调用流程时传入的全局数据。 + * 在计算表达式时,即基于全局数据进行计算。 + */ + globalData: Record; + readonly baseType: string; + constructor({ nodeConfig, context, globalData }) { + this.outgoing = nodeConfig.outgoing; + this.incoming = nodeConfig.incoming; + this.nodeId = nodeConfig.id; + this.type = nodeConfig.type; + this.properties = nodeConfig.properties; + this.context = context; + this.globalData = globalData; + this.baseType = 'base'; + } + /** + * 节点的每一次执行都会生成一个唯一的taskId + */ + public async execute(params: ExecParams): Promise { + const r = await this.action({ + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + }); + if (!r || r.status === TaskStatus.SUCCESS) { + const outgoing = await this.getOutgoing(); + params.next({ + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, + outgoing, + }); + } + return { + status: r && r.status, + detail: r && r.detail, + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, + }; + } + /** + * 节点在执行中断后,可以通过resume方法恢复执行。 + * 自定义节点时不建议重写此方法 + */ + public async resume(params: ExecResumeParams): Promise { + const outgoing = await this.getOutgoing(); + await this.onResume({ + executionId: params.executionId, + nodeId: params.nodeId, + taskId: params.taskId, + data: params.data, + }); + params.next({ + executionId: params.executionId, + taskId: params.taskId, + nodeId: this.nodeId, + nodeType: this.type, + properties: this.properties, + outgoing, + }); + return undefined; + } + private async getOutgoing(): Promise { + const outgoing = []; + const expressions = []; + for (const item of this.outgoing) { + const { properties } = item; + expressions.push(this.isPass(properties)); + } + const result = await Promise.all(expressions); + result.forEach((item, index) => { + if (item) { + outgoing.push(this.outgoing[index]); + } + }); + return outgoing; + } + private async isPass(properties) { + if (!properties) return true; + const { conditionExpression } = properties; + if (!conditionExpression) return true; + try { + const result = await getExpressionResult(`result${this.nodeId} = (${conditionExpression})`, { + ...this.globalData, + }); + return result[`result${this.nodeId}`]; + } catch (e) { + return false; + } + } + /** + * 节点的执行逻辑 + * @overridable 可以自定义节点重写此方法。 + * @param params.executionId 流程执行记录ID + * @param params.taskId 此节点执行记录ID + * @param params.nodeId 节点ID + */ + public async action(params: { + executionId: string; + taskId: string; + nodeId: string; + }): Promise { + return undefined; + } + /** + * 节点的重新恢复执行逻辑 + * @overridable 可以自定义节点重写此方法。 + * @param params.executionId 流程执行记录ID + * @param params.taskId 此节点执行记录ID + * @param params.nodeId 节点ID + */ + public async onResume(params: { + executionId: string, + taskId: string, + nodeId: string, + data?: Record, + }): Promise { + return undefined; + } +} diff --git a/packages/engine/src/nodes/StartNode.ts b/packages/engine/src/nodes/StartNode.ts new file mode 100644 index 000000000..3c6c34961 --- /dev/null +++ b/packages/engine/src/nodes/StartNode.ts @@ -0,0 +1,6 @@ +import BaseNode from './BaseNode'; + +export default class StartNode extends BaseNode { + static nodeTypeName = 'StartNode'; + readonly baseType = 'start'; +} diff --git a/packages/engine/src/nodes/TaskNode.ts b/packages/engine/src/nodes/TaskNode.ts new file mode 100644 index 000000000..3f04d232d --- /dev/null +++ b/packages/engine/src/nodes/TaskNode.ts @@ -0,0 +1,6 @@ +import BaseNode from './BaseNode'; + +export default class TaskNode extends BaseNode { + static nodeTypeName = 'TaskNode'; + readonly baseType = 'task'; +} diff --git a/packages/engine/src/recorder/index.ts b/packages/engine/src/recorder/index.ts new file mode 100644 index 000000000..6cdfcdb46 --- /dev/null +++ b/packages/engine/src/recorder/index.ts @@ -0,0 +1,57 @@ +import type { + RecorderData, + RecorderInterface, +} from '../types.d'; +import storage from '../util/storage'; + +const LOGICFLOW_ENGINE_INSTANCES = 'LOGICFLOW_ENGINE_INSTANCES'; + +export default class Recorder implements RecorderInterface { + /* + * @param {Object} task + * { + * taskId: '', + * nodeId: '', + * executionId: '', + * nodeType: '', + * timestamp: '', + * properties: {}, + * } + */ + async addTask(task: RecorderData) { + const { executionId, taskId } = task; + const instanceData = this.getExecutionTasks(executionId); + if (!instanceData) { + this.pushExecution(executionId); + } + this.pushTaskToExecution(executionId, taskId); + storage.setItem(taskId, task); + } + async getTask(taskId: string): Promise { + return storage.getItem(taskId); + } + async getExecutionTasks(executionId) { + return storage.getItem(executionId); + } + clear() { + const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + instance.forEach((executionId) => { + storage.removeItem(executionId); + const instanceData = storage.getItem(executionId) || []; + instanceData.forEach((taskId) => { + storage.removeItem(taskId); + }); + }); + storage.removeItem(LOGICFLOW_ENGINE_INSTANCES); + } + private pushExecution(executionId) { + const instance = storage.getItem(LOGICFLOW_ENGINE_INSTANCES) || []; + instance.push(executionId); + storage.setItem(LOGICFLOW_ENGINE_INSTANCES, instance); + } + private pushTaskToExecution(executionId, taskId) { + const tasks = storage.getItem(executionId) || []; + tasks.push(taskId); + storage.setItem(executionId, tasks); + } +} diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts new file mode 100644 index 000000000..e87283785 --- /dev/null +++ b/packages/engine/src/types.d.ts @@ -0,0 +1,112 @@ +/** + * 即将执行的节点参数 + */ +export type NodeParam = { + executionId: string; + nodeId: string; +} +/** + * 执行节点的参数 + */ +export type TaskParam = { + taskId: string; +} & NodeParam; + +export type ExecParams = { + next: (data: NextTaskParam) => void; +} & TaskParam; + +export type ResumeParam = { + data: Record; +} & TaskParam; + +export type ExecResumeParams = { + next: (data: NextTaskParam) => void; +} & ResumeParam; + +export type RecorderData = { + nodeType: string; + timestamp: number; + properties?: Record; +} & TaskParam; + +export interface RecorderInterface { + addTask: (task: RecorderData) => Promise; + getTask: (taskId: string) => Promise; + getExecutionTasks: (executionId: string) => Promise; + clear: () => void; +}; + +export type TaskStatus = 'success' | 'error' | 'interrupted' | ''; + +export type ActionResult = { + status?: TaskStatus; + detail?: Record; +}; + +export type NodeExecResult = { + executionId: string, + taskId: string, + nodeId: string, + nodeType: string, + properties?: Record, +} & ActionResult; + +export type ResumeParams = { + executionId: string; + taskId: string; + nodeId: string; + data?: Record; +} + +export declare type Point = { + id?: string; + x: number; + y: number; + [key: string]: unknown; +}; + +export declare type TextConfig = { + value: string; +} & Point; +export declare type GraphConfigData = { + nodes: NodeConfig[]; + edges: EdgeConfig[]; +}; +export declare type NodeConfig = { + id?: string; + type: string; + x: number; + y: number; + text?: TextConfig | string; + zIndex?: number; + properties?: Record; +}; +export declare type EdgeConfig = { + id?: string; + /** + * 边的类型,不传默认为lf.setDefaultEdgeType(type)传入的类型。 + * LogicFlow内部默认为polyline + */ + type?: string; + sourceNodeId: string; + sourceAnchorId?: string; + targetNodeId: string; + targetAnchorId?: string; + startPoint?: { + x: number; + y: number; + }; + endPoint?: { + x: number; + y: number; + }; + text?: { + x: number; + y: number; + value: string; + } | string; + pointsList?: Point[]; + zIndex?: number; + properties?: Record; +}; diff --git a/packages/engine/src/util/ID.ts b/packages/engine/src/util/ID.ts new file mode 100644 index 000000000..a58491d36 --- /dev/null +++ b/packages/engine/src/util/ID.ts @@ -0,0 +1,11 @@ +import { v4 as uuidv4 } from 'uuid'; + +export const createExecId = (): string => { + const uuid = uuidv4(); + return `exec-${uuid}`; +}; + +export const createTaskId = (): string => { + const uuid = uuidv4(); + return `task-${uuid}`; +}; diff --git a/packages/engine/src/util/global.ts b/packages/engine/src/util/global.ts new file mode 100644 index 000000000..736c3b905 --- /dev/null +++ b/packages/engine/src/util/global.ts @@ -0,0 +1,35 @@ +// The one and only way of getting global scope in all environments +// https://stackoverflow.com/q/3277182/1008999 + +const isInBrowser = typeof window === 'object' && window.window === window; + +const isInNodeJS = typeof global === 'object' && global.global === global; +// eslint-disable-next-line no-restricted-globals +const isInWebWorker = !isInBrowser && typeof self === 'object' && self.constructor; + +const globalScope = (() => { + if (isInBrowser) { + return window; + } + // eslint-disable-next-line no-restricted-globals + if (typeof self === 'object' && self.self === self) { // web workers + // eslint-disable-next-line no-restricted-globals + return self; + } + if (isInNodeJS) { + return global; + } + if (typeof globalThis === 'object') { + return globalThis; + } + return { + eval: () => undefined, + } as Record; +})(); + +export { + globalScope, + isInBrowser, + isInWebWorker, + isInNodeJS, +}; diff --git a/packages/engine/src/util/storage.ts b/packages/engine/src/util/storage.ts new file mode 100644 index 000000000..109af89e0 --- /dev/null +++ b/packages/engine/src/util/storage.ts @@ -0,0 +1,40 @@ +/** + * 存储执行记录 + */ +import { globalScope } from './global'; + +if (!globalScope.sessionStorage) { + const storage = { + data: {} as Record, + setItem(key, value) { + storage.data[key] = value; + }, + getItem(key) { + return storage.data[key]; + }, + removeItem(key) { + delete storage.data[key]; + }, + }; + globalScope.sessionStorage = storage; +} + +export default { + setItem(key, value) { + if (typeof value === 'object') { + value = JSON.stringify(value); + } + globalScope.sessionStorage.setItem(key, value); + }, + getItem(key) { + const value = globalScope.sessionStorage.getItem(key); + try { + return JSON.parse(value); + } catch (e) { + return value; + } + }, + removeItem(key) { + globalScope.sessionStorage.removeItem(key); + }, +}; diff --git a/packages/engine/tsconfig.json b/packages/engine/tsconfig.json new file mode 100644 index 000000000..20afc0f61 --- /dev/null +++ b/packages/engine/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.json", + "include": [ + "src/*" + ], + "exclude": [ + "node_modules", + "**/*.spec.ts", + "**/*.test.js", + "**/*.d.ts" + ] +} diff --git a/packages/extension/package.json b/packages/extension/package.json index c75266317..bf037ab53 100644 --- a/packages/extension/package.json +++ b/packages/extension/package.json @@ -4,7 +4,7 @@ "description": "LogicFlow extension", "main": "cjs/index.js", "module": "es/index.js", - "homepage": "https://docs.logic-flow.cn", + "homepage": "https://site.logic-flow.cn", "types": "es/index.d.ts", "repository": { "type": "git", diff --git a/tsconfig.json b/tsconfig.json index 9eb7db4fb..eb51aee08 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -21,6 +21,8 @@ "exclude": [ "node_modules", "**/*.spec.ts", + "**/*.test.js", + ".eslintrc.js", "**/*.d.ts", "packages/core/types" ] diff --git a/yarn.lock b/yarn.lock index 47e0c06ba..9a613e5ee 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15144,7 +15144,7 @@ prompts@2.4.0: kleur "^3.0.3" sisteransi "^1.0.5" -prompts@^2.0.1: +prompts@2.4.2, prompts@^2.0.1: version "2.4.2" resolved "https://registry.yarnpkg.com/prompts/-/prompts-2.4.2.tgz#7b57e73b3a48029ad10ebd44f74b01722a4cb069" integrity sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==