From 7373bf2da4a90b2b7610a267259b33daa77fd9de Mon Sep 17 00:00:00 2001 From: xutao <987427795@qq.com> Date: Thu, 24 Aug 2023 11:35:05 +0800 Subject: [PATCH] feat(engine): support node return error --- packages/engine/__test__/08_error.test.js | 67 +++++++++++++++++++++++ packages/engine/src/FlowModel.ts | 13 +++-- packages/engine/src/Scheduler.ts | 47 ++++++++-------- packages/engine/src/constant/constant.ts | 2 + packages/engine/src/index.ts | 9 ++- packages/engine/src/nodes/BaseNode.ts | 14 +++-- packages/engine/src/types.d.ts | 13 +++-- 7 files changed, 126 insertions(+), 39 deletions(-) create mode 100644 packages/engine/__test__/08_error.test.js diff --git a/packages/engine/__test__/08_error.test.js b/packages/engine/__test__/08_error.test.js new file mode 100644 index 000000000..1060d7ee6 --- /dev/null +++ b/packages/engine/__test__/08_error.test.js @@ -0,0 +1,67 @@ +import Engine, { TaskNode } from '../src/index'; + +describe('@logicflow/engine error', () => { + class DataNode extends TaskNode { + async action() { + this.globalData['dataSource'] = { + time: this.context.getTime(), + } + return { + status: 'error', + detail: { + errorMsg: this.context.getTime(), + } + } + } + } + const engine = new Engine({ + context: { + getTime() { + return new Date().getTime(); + } + } + }); + engine.register({ + type: 'DataNode', + model: DataNode, + }) + const flowData = { + /** node1 |--> node2(DataNode) */ + graphData: { + nodes: [ + { + id: 'node1', + type: 'StartNode', + properties: { + text: '开始', + }, + }, + { + id: 'node2', + type: 'DataNode', + properties: { + text: '数据节点', + }, + } + ], + edges: [ + { + id: 'edge1', + type: 'line', + sourceNodeId: 'node1', + targetNodeId: 'node2', + properties: {} + + } + ] + }, + }; + engine.load(flowData); + test('return error status', async () => { + const executeData = await engine.execute(flowData); + expect(executeData.status).toEqual('error'); + const execution = await engine.getExecutionRecord(executeData.executionId); + expect(execution.length).toBe(2); + expect(execution[1].status).toEqual('error'); + }); +}); \ No newline at end of file diff --git a/packages/engine/src/FlowModel.ts b/packages/engine/src/FlowModel.ts index 42ef560fe..cfaf07334 100644 --- a/packages/engine/src/FlowModel.ts +++ b/packages/engine/src/FlowModel.ts @@ -4,16 +4,18 @@ import type { } from './nodes/BaseNode'; import type Recorder from './recorder'; import { - EVENT_INSTANCE_COMPLETE, EVENT_INSTANCE_INTERRUPTED, + EVENT_INSTANCE_COMPLETE, + EVENT_INSTANCE_INTERRUPTED, + EVENT_INSTANCE_ERROR, } from './constant/constant'; import { createExecId } from './util/ID'; import Scheduler from './Scheduler'; import { ErrorCode, getErrorMsg } from './constant/LogCode'; -import type { ActionParam } from './types.d'; +import type { ActionParam, NextActionParam } from './types.d'; export type FlowResult = { result?: Record; -} & ActionParam; +} & ActionParam | NextActionParam; export type ActionParams = { executionId?: string; @@ -22,7 +24,7 @@ export type ActionParams = { }; export type ExecParams = { - callback?: (result: FlowResult) => void; + callback?: (result: NextActionParam) => void; onError?: (error: Error) => void; } & ActionParams; @@ -111,6 +113,9 @@ export default class FlowModel { this.scheduler.on(EVENT_INSTANCE_INTERRUPTED, (result) => { this.onExecuteFinished(result); }); + this.scheduler.on(EVENT_INSTANCE_ERROR, (result) => { + this.onExecuteFinished(result); + }); } public setStartNodeType(startNodeType) { this.startNodeType = startNodeType; diff --git a/packages/engine/src/Scheduler.ts b/packages/engine/src/Scheduler.ts index b53302a12..9b187e51f 100644 --- a/packages/engine/src/Scheduler.ts +++ b/packages/engine/src/Scheduler.ts @@ -2,6 +2,7 @@ import EventEmitter from './EventEmitter'; import { EVENT_INSTANCE_COMPLETE, EVENT_INSTANCE_INTERRUPTED, + EVENT_INSTANCE_ERROR, FlowStatus, } from './constant/constant'; import { createActionId } from './util/ID'; @@ -70,8 +71,7 @@ export default class Scheduler extends EventEmitter { */ public run(runParams: { executionId: string; - nodeId?: string; - actionId?: string; + [key: string]: any; }) { const nodeQueue = this.nodeQueueMap.get(runParams.executionId); // 将同一个executionId当前待执行的节点一起执行 @@ -89,9 +89,7 @@ export default class Scheduler extends EventEmitter { if (!this.hasRunningAction(runParams.executionId)) { // 当一个流程在nodeQueueMap和actionRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。 this.emit(EVENT_INSTANCE_COMPLETE, { - executionId: runParams.executionId, - nodeId: runParams.nodeId, - actionId: runParams.actionId, + ...runParams, status: FlowStatus.COMPLETED, }); } @@ -145,10 +143,21 @@ export default class Scheduler extends EventEmitter { next: this.next.bind(this), }); if (execResult && execResult.status === FlowStatus.INTERRUPTED) { - this.interrupted({ - execResult, - actionParam, + this.interrupted(execResult); + this.saveActionResult({ + executionId: actionParam.executionId, + nodeId: actionParam.nodeId, + actionId: actionParam.actionId, + nodeType: execResult.nodeType, + properties: execResult.properties, + outgoing: execResult.outgoing, + status: execResult.status, + detail: execResult.detail, }); + this.removeActionFromRunningMap(actionParam); + } + if (execResult && execResult.status === FlowStatus.ERROR) { + this.error(execResult); this.saveActionResult({ executionId: actionParam.executionId, nodeId: actionParam.nodeId, @@ -163,17 +172,11 @@ export default class Scheduler extends EventEmitter { } // TODO: 考虑停下所有的任务 } - private interrupted({ - execResult, - actionParam, - } : { execResult: NextActionParam, actionParam: ActionParam}) { - this.emit(EVENT_INSTANCE_INTERRUPTED, { - executionId: actionParam.executionId, - status: FlowStatus.INTERRUPTED, - nodeId: actionParam.nodeId, - actionId: actionParam.actionId, - detail: execResult.detail, - }); + private interrupted(execResult: NextActionParam) { + this.emit(EVENT_INSTANCE_INTERRUPTED, execResult); + } + private error(execResult : NextActionParam) { + this.emit(EVENT_INSTANCE_ERROR, execResult); } private next(data: NextActionParam) { if (data.outgoing && data.outgoing.length > 0) { @@ -188,11 +191,7 @@ export default class Scheduler extends EventEmitter { } this.saveActionResult(data); this.removeActionFromRunningMap(data); - this.run({ - executionId: data.executionId, - nodeId: data.nodeId, - actionId: data.actionId, - }); + this.run(data); } private saveActionResult(data: NextActionParam) { this.recorder.addActionRecord({ diff --git a/packages/engine/src/constant/constant.ts b/packages/engine/src/constant/constant.ts index 9a6cec0b4..f0b38277e 100644 --- a/packages/engine/src/constant/constant.ts +++ b/packages/engine/src/constant/constant.ts @@ -4,12 +4,14 @@ export const BASE_START_NODE = 'start'; // event name export const EVENT_INSTANCE_COMPLETE = 'instance:complete'; export const EVENT_INSTANCE_INTERRUPTED = 'instance:interrupted'; +export const EVENT_INSTANCE_ERROR = 'instance:error'; // flow status export enum FlowStatus { COMPLETED = 'completed', INTERRUPTED = 'interrupted', RUNNING = 'running', + PENDING = 'pending', ERROR = 'error', } diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 293040142..a706b5160 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -1,4 +1,9 @@ -import type { ResumeParams, GraphConfigData, EngineConstructorOptions } from './types.d'; +import type { + ResumeParams, + GraphConfigData, + EngineConstructorOptions, + NextActionParam, +} from './types.d'; import FlowModel, { ActionParams } from './FlowModel'; import StartNode from './nodes/StartNode'; import TaskNode from './nodes/TaskNode'; @@ -72,7 +77,7 @@ export default class Engine { /** * 执行流程,允许多次调用。 */ - async execute(execParam?: ActionParams) { + async execute(execParam?: ActionParams): Promise { return new Promise((resolve, reject) => { if (!execParam) { execParam = {}; diff --git a/packages/engine/src/nodes/BaseNode.ts b/packages/engine/src/nodes/BaseNode.ts index 5f0974a79..da1e0816d 100644 --- a/packages/engine/src/nodes/BaseNode.ts +++ b/packages/engine/src/nodes/BaseNode.ts @@ -2,6 +2,7 @@ import { ActionStatus } from '../constant/constant'; import { getExpressionResult } from '../expression'; import type { NextActionParam, + ActionResult, ExecResumeParams, ExecParams, OutgoingConfig, @@ -94,7 +95,8 @@ export default class BaseNode implements BaseNodeInterface { actionId: params.actionId, nodeId: this.nodeId, }); - if (!r || r.status === ActionStatus.SUCCESS) { + const status = r ? r.status : 'success'; + if (status === ActionStatus.SUCCESS) { const outgoing = await this.getOutgoing(); const detail = r ? r.detail : {}; params.next({ @@ -109,7 +111,7 @@ export default class BaseNode implements BaseNodeInterface { }); } return { - status: r && r.status, + status, detail: r && r.detail, executionId: params.executionId, actionId: params.actionId, @@ -138,6 +140,7 @@ export default class BaseNode implements BaseNodeInterface { nodeType: this.type, properties: this.properties, outgoing, + status: ActionStatus.SUCCESS, }); return undefined; } @@ -175,13 +178,16 @@ export default class BaseNode implements BaseNodeInterface { * @param params.executionId 流程执行记录ID * @param params.actionId 此节点执行记录ID * @param params.nodeId 节点ID + * @returns 返回下一步的执行参数 + * 当不返回时,表示此节点执行成功,流程会继续执行下一步。 + * 当返回时,返回格式 */ public async action(params: { executionId: string; actionId: string; nodeId: string; - }): Promise { - return undefined; + }): Promise { + return null; } /** * 节点的重新恢复执行逻辑 diff --git a/packages/engine/src/types.d.ts b/packages/engine/src/types.d.ts index bf6827ae7..8b8b06e00 100644 --- a/packages/engine/src/types.d.ts +++ b/packages/engine/src/types.d.ts @@ -12,7 +12,7 @@ export type ActionParam = { actionId: string; } & NodeParam; -export type ActionStatus = 'success' | 'error' | 'interrupted' | ''; +export type ActionStatus = 'success' | 'error' | 'interrupted' | 'pending'; export type OutgoingConfig = { id: string; @@ -21,6 +21,11 @@ export type OutgoingConfig = { result?: string; }; +export type ActionResult = { + status: ActionStatus; + detail?: Record; +} | void; + export type NextActionParam = { executionId: string; nodeId: string; @@ -29,10 +34,8 @@ export type NextActionParam = { outgoing: OutgoingConfig[]; properties?: Record; detail?: Record; - status?: ActionStatus; -}; - -type ActionResult = NextActionParam; + status: ActionStatus; +} export type ExecParams = { next: (data: NextActionParam) => void;