Skip to content

Commit

Permalink
feat(engine): support node return error
Browse files Browse the repository at this point in the history
  • Loading branch information
towersxu authored and boyongjiong committed Aug 24, 2023
1 parent d24533c commit 7373bf2
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 39 deletions.
67 changes: 67 additions & 0 deletions packages/engine/__test__/08_error.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import Engine, { TaskNode } from '../src/index';

describe('@logicflow/engine error', () => {
class DataNode extends TaskNode {
async action() {
this.globalData['dataSource'] = {
time: this.context.getTime(),
}
return {
status: 'error',
detail: {
errorMsg: this.context.getTime(),
}
}
}
}
const engine = new Engine({
context: {
getTime() {
return new Date().getTime();
}
}
});
engine.register({
type: 'DataNode',
model: DataNode,
})
const flowData = {
/** node1 |--> node2(DataNode) */
graphData: {
nodes: [
{
id: 'node1',
type: 'StartNode',
properties: {
text: '开始',
},
},
{
id: 'node2',
type: 'DataNode',
properties: {
text: '数据节点',
},
}
],
edges: [
{
id: 'edge1',
type: 'line',
sourceNodeId: 'node1',
targetNodeId: 'node2',
properties: {}

}
]
},
};
engine.load(flowData);
test('return error status', async () => {
const executeData = await engine.execute(flowData);
expect(executeData.status).toEqual('error');
const execution = await engine.getExecutionRecord(executeData.executionId);
expect(execution.length).toBe(2);
expect(execution[1].status).toEqual('error');
});
});
13 changes: 9 additions & 4 deletions packages/engine/src/FlowModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import type {
} from './nodes/BaseNode';
import type Recorder from './recorder';
import {
EVENT_INSTANCE_COMPLETE, EVENT_INSTANCE_INTERRUPTED,
EVENT_INSTANCE_COMPLETE,
EVENT_INSTANCE_INTERRUPTED,
EVENT_INSTANCE_ERROR,
} from './constant/constant';
import { createExecId } from './util/ID';
import Scheduler from './Scheduler';
import { ErrorCode, getErrorMsg } from './constant/LogCode';
import type { ActionParam } from './types.d';
import type { ActionParam, NextActionParam } from './types.d';

export type FlowResult = {
result?: Record<string, any>;
} & ActionParam;
} & ActionParam | NextActionParam;

export type ActionParams = {
executionId?: string;
Expand All @@ -22,7 +24,7 @@ export type ActionParams = {
};

export type ExecParams = {
callback?: (result: FlowResult) => void;
callback?: (result: NextActionParam) => void;
onError?: (error: Error) => void;
} & ActionParams;

Expand Down Expand Up @@ -111,6 +113,9 @@ export default class FlowModel {
this.scheduler.on(EVENT_INSTANCE_INTERRUPTED, (result) => {
this.onExecuteFinished(result);
});
this.scheduler.on(EVENT_INSTANCE_ERROR, (result) => {
this.onExecuteFinished(result);
});
}
public setStartNodeType(startNodeType) {
this.startNodeType = startNodeType;
Expand Down
47 changes: 23 additions & 24 deletions packages/engine/src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import EventEmitter from './EventEmitter';
import {
EVENT_INSTANCE_COMPLETE,
EVENT_INSTANCE_INTERRUPTED,
EVENT_INSTANCE_ERROR,
FlowStatus,
} from './constant/constant';
import { createActionId } from './util/ID';
Expand Down Expand Up @@ -70,8 +71,7 @@ export default class Scheduler extends EventEmitter {
*/
public run(runParams: {
executionId: string;
nodeId?: string;
actionId?: string;
[key: string]: any;
}) {
const nodeQueue = this.nodeQueueMap.get(runParams.executionId);
// 将同一个executionId当前待执行的节点一起执行
Expand All @@ -89,9 +89,7 @@ export default class Scheduler extends EventEmitter {
if (!this.hasRunningAction(runParams.executionId)) {
// 当一个流程在nodeQueueMap和actionRunningMap中都不存在执行的节点时,说明这个流程已经执行完成。
this.emit(EVENT_INSTANCE_COMPLETE, {
executionId: runParams.executionId,
nodeId: runParams.nodeId,
actionId: runParams.actionId,
...runParams,
status: FlowStatus.COMPLETED,
});
}
Expand Down Expand Up @@ -145,10 +143,21 @@ export default class Scheduler extends EventEmitter {
next: this.next.bind(this),
});
if (execResult && execResult.status === FlowStatus.INTERRUPTED) {
this.interrupted({
execResult,
actionParam,
this.interrupted(execResult);
this.saveActionResult({
executionId: actionParam.executionId,
nodeId: actionParam.nodeId,
actionId: actionParam.actionId,
nodeType: execResult.nodeType,
properties: execResult.properties,
outgoing: execResult.outgoing,
status: execResult.status,
detail: execResult.detail,
});
this.removeActionFromRunningMap(actionParam);
}
if (execResult && execResult.status === FlowStatus.ERROR) {
this.error(execResult);
this.saveActionResult({
executionId: actionParam.executionId,
nodeId: actionParam.nodeId,
Expand All @@ -163,17 +172,11 @@ export default class Scheduler extends EventEmitter {
}
// TODO: 考虑停下所有的任务
}
private interrupted({
execResult,
actionParam,
} : { execResult: NextActionParam, actionParam: ActionParam}) {
this.emit(EVENT_INSTANCE_INTERRUPTED, {
executionId: actionParam.executionId,
status: FlowStatus.INTERRUPTED,
nodeId: actionParam.nodeId,
actionId: actionParam.actionId,
detail: execResult.detail,
});
private interrupted(execResult: NextActionParam) {
this.emit(EVENT_INSTANCE_INTERRUPTED, execResult);
}
private error(execResult : NextActionParam) {
this.emit(EVENT_INSTANCE_ERROR, execResult);
}
private next(data: NextActionParam) {
if (data.outgoing && data.outgoing.length > 0) {
Expand All @@ -188,11 +191,7 @@ export default class Scheduler extends EventEmitter {
}
this.saveActionResult(data);
this.removeActionFromRunningMap(data);
this.run({
executionId: data.executionId,
nodeId: data.nodeId,
actionId: data.actionId,
});
this.run(data);
}
private saveActionResult(data: NextActionParam) {
this.recorder.addActionRecord({
Expand Down
2 changes: 2 additions & 0 deletions packages/engine/src/constant/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ export const BASE_START_NODE = 'start';
// event name
export const EVENT_INSTANCE_COMPLETE = 'instance:complete';
export const EVENT_INSTANCE_INTERRUPTED = 'instance:interrupted';
export const EVENT_INSTANCE_ERROR = 'instance:error';

// flow status
export enum FlowStatus {
COMPLETED = 'completed',
INTERRUPTED = 'interrupted',
RUNNING = 'running',
PENDING = 'pending',
ERROR = 'error',
}

Expand Down
9 changes: 7 additions & 2 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { ResumeParams, GraphConfigData, EngineConstructorOptions } from './types.d';
import type {
ResumeParams,
GraphConfigData,
EngineConstructorOptions,
NextActionParam,
} from './types.d';
import FlowModel, { ActionParams } from './FlowModel';
import StartNode from './nodes/StartNode';
import TaskNode from './nodes/TaskNode';
Expand Down Expand Up @@ -72,7 +77,7 @@ export default class Engine {
/**
* 执行流程,允许多次调用。
*/
async execute(execParam?: ActionParams) {
async execute(execParam?: ActionParams): Promise<NextActionParam> {
return new Promise((resolve, reject) => {
if (!execParam) {
execParam = {};
Expand Down
14 changes: 10 additions & 4 deletions packages/engine/src/nodes/BaseNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ActionStatus } from '../constant/constant';
import { getExpressionResult } from '../expression';
import type {
NextActionParam,
ActionResult,
ExecResumeParams,
ExecParams,
OutgoingConfig,
Expand Down Expand Up @@ -94,7 +95,8 @@ export default class BaseNode implements BaseNodeInterface {
actionId: params.actionId,
nodeId: this.nodeId,
});
if (!r || r.status === ActionStatus.SUCCESS) {
const status = r ? r.status : 'success';
if (status === ActionStatus.SUCCESS) {
const outgoing = await this.getOutgoing();
const detail = r ? r.detail : {};
params.next({
Expand All @@ -109,7 +111,7 @@ export default class BaseNode implements BaseNodeInterface {
});
}
return {
status: r && r.status,
status,
detail: r && r.detail,
executionId: params.executionId,
actionId: params.actionId,
Expand Down Expand Up @@ -138,6 +140,7 @@ export default class BaseNode implements BaseNodeInterface {
nodeType: this.type,
properties: this.properties,
outgoing,
status: ActionStatus.SUCCESS,
});
return undefined;
}
Expand Down Expand Up @@ -175,13 +178,16 @@ export default class BaseNode implements BaseNodeInterface {
* @param params.executionId 流程执行记录ID
* @param params.actionId 此节点执行记录ID
* @param params.nodeId 节点ID
* @returns 返回下一步的执行参数
* 当不返回时,表示此节点执行成功,流程会继续执行下一步。
* 当返回时,返回格式
*/
public async action(params: {
executionId: string;
actionId: string;
nodeId: string;
}): Promise<NextActionParam> {
return undefined;
}): Promise<ActionResult> {
return null;
}
/**
* 节点的重新恢复执行逻辑
Expand Down
13 changes: 8 additions & 5 deletions packages/engine/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export type ActionParam = {
actionId: string;
} & NodeParam;

export type ActionStatus = 'success' | 'error' | 'interrupted' | '';
export type ActionStatus = 'success' | 'error' | 'interrupted' | 'pending';

export type OutgoingConfig = {
id: string;
Expand All @@ -21,6 +21,11 @@ export type OutgoingConfig = {
result?: string;
};

export type ActionResult = {
status: ActionStatus;
detail?: Record<string, any>;
} | void;

export type NextActionParam = {
executionId: string;
nodeId: string;
Expand All @@ -29,10 +34,8 @@ export type NextActionParam = {
outgoing: OutgoingConfig[];
properties?: Record<string, any>;
detail?: Record<string, any>;
status?: ActionStatus;
};

type ActionResult = NextActionParam;
status: ActionStatus;
}

export type ExecParams = {
next: (data: NextActionParam) => void;
Expand Down

0 comments on commit 7373bf2

Please sign in to comment.