Skip to content

Commit

Permalink
feat(engine): add comments and sync storage execution
Browse files Browse the repository at this point in the history
  • Loading branch information
towersxu committed Jul 27, 2023
1 parent f3e905b commit 6fa0904
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 40 deletions.
81 changes: 57 additions & 24 deletions packages/engine/src/FlowModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ export default class FlowModel {
* 流程支持的节点类型
*/
nodeModelMap: Map<string, NodeConstructor>;
/**
* 每一次执行流程都会生成一个唯一的executionId。
*/
executionId: string;
/**
* 调度器,用于调度节点的执行。
*/
Expand Down Expand Up @@ -120,8 +116,43 @@ export default class FlowModel {
public setStartNodeType(startNodeType) {
this.startNodeType = startNodeType;
}
/**
* 解析LogicFlow图数据,将nodes和edges转换成节点格式。
* 例如:
* graphData: {
* nodes: [
* { id: 'node1', type: 'StartNode', properties: {} },
* { id: 'node2', type: 'TaskNode', properties: {} },
* ],
* edges: [
* { id: 'edge1', sourceNodeId: 'node1', targetNodeId: 'node2', properties: {} },
* ]
* }
* 转换成:
* nodeConfigMap: {
* node1: {
* id: 'node1',
* type: 'StartNode',
* properties: {},
* incoming: [],
* outgoing: [{ id: 'edge1', properties: {}, target: 'node2' }]
* },
* node2: {
* id: 'node2',
* type: 'TaskNode',
* properties: {},
* incoming: [{ id: 'edge1', properties: {}, source: 'node1' }],
* outgoing: [],
* }
* }
* 此格式方便后续执行时,根据节点id快速找到节点和执行初始化节点模型。
* 同时此方法还会找到所有的开始节点,方便后续执行时,从开始节点开始执行。
* @param graphData 流程图数据
*/
public load(graphData) {
const { nodes = [], edges = [] } = graphData;
this.startNodes = [];
this.nodeConfigMap = new Map();
nodes.forEach((node) => {
if (this.nodeModelMap.has(node.type)) {
const nodeConfig = {
Expand Down Expand Up @@ -159,11 +190,10 @@ export default class FlowModel {
});
}
/**
* 执行流程
* 同一次执行,这次执行内部的节点执行顺序为并行。
* 多次执行,多次执行之间为串行。
* 允许一个流程多次执行,效率更高。
* 例如:
* 执行流程, 每次执行都会生成一个唯一的executionId,用于区分不同的执行。
* 同一次执行,这次执行内部的节点执行顺序为并行。内部并行是为了避免异步节点阻塞其他节点的执行。
* 多次执行,多次执行之间为串行,这里选择串行的原因是避免多次执行之间的数据冲突。
* example:
* 一个流程存在着两个开始节点,A和B,A和B的下一个节点都是C,C的下两个节点是D和E。
* 外部分别触发了A和B的执行,那么A和B的执行是串行的(也就是需要A执行完成后再执行B),但是D和E的执行是并行的。
* 如果希望A和B的执行是并行的,就不能使用同一个流程模型执行,应该初始化两个。
Expand All @@ -185,7 +215,8 @@ export default class FlowModel {
this.createExecution();
}
/**
* 创建节点实例
* 创建节点实例, 每个节点实例都会有一个唯一的taskId。
* 通过executionId、nodeId、taskId可以唯一确定一个节点的某一次执行。
* @param nodeId 节点Id
* @returns 节点示例
*/
Expand All @@ -208,11 +239,11 @@ export default class FlowModel {
...data,
};
}
/**
* 在执行完成后,通知外部此次执行完成。
* 如果还存在待执行的任务,那么继续执行。
*/
private onTaskFinished(result) {
const { executionId } = result;
if (executionId !== this.executionId) {
return;
}
const { callback } = this.executingInstance;
if (callback) {
callback(result);
Expand All @@ -224,24 +255,27 @@ export default class FlowModel {
this.isRunning = false;
}
}
/**
* 从待执行队列中取出需要执行的内容。
* 会依次判断是否有taskId、nodeId、executionId。
* 若存在taskId,那么表示恢复执行。
* 若存在nodeId,那么表示从指定节点开始执行。
* 若都不存在,那么新建一个executionId,从开始节点开始执行。
*/
private createExecution() {
const execParams = this.executeQueue.shift();
this.executingInstance = execParams;
if (execParams.executionId) {
this.executionId = execParams.executionId;
} else {
this.executionId = createExecId();
}
// 如果有taskId,那么表示恢复执行
if (execParams.taskId) {
if (execParams.taskId && execParams.executionId && execParams.nodeId) {
this.scheduler.resume({
executionId: this.executionId,
executionId: execParams.executionId,
taskId: execParams.taskId,
nodeId: execParams.nodeId,
data: execParams.data,
});
return;
}
const executionId = execParams.executionId || createExecId();
if (execParams.nodeId) {
const nodeConfig = this.nodeConfigMap.get(execParams.nodeId);
if (!nodeConfig) {
Expand All @@ -252,13 +286,12 @@ export default class FlowModel {
}
this.startNodes.forEach((startNode) => {
this.scheduler.addTask({
executionId: this.executionId,
executionId,
nodeId: startNode.id,
});
// 所有的开始节点都执行
});
this.scheduler.run({
executionId: this.executionId,
executionId,
});
}
}
38 changes: 23 additions & 15 deletions packages/engine/src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,38 @@ type TaskResult = {
extraInfo?: Record<string, any>;
} & NextTaskParam;

type ExecutionId = string;

/**
* 调度器
* 通过一个队列维护需要执行的节点,一个集合维护正在执行的节点
*/
export default class Scheduler extends EventEmitter {
nodeQueueMap: Map<string, NodeParam[]>;
taskRunningMap: Map<string, TaskParamMap>;
/**
* 当前需要执行的节点队列
*/
nodeQueueMap: Map<ExecutionId, NodeParam[]>;
/**
* 当前正在执行的节点集合
* 在每个节点执行完成后,会从集合中删除。
* 同时会判断此集合中是否还存在和此节点相同的executionId,如果不存在,说明此流程已经执行完成。
*/
taskRunningMap: Map<ExecutionId, TaskParamMap>;
/**
* 流程模型,用于创建节点模型。
*/
flowModel: FlowModel;
/**
* 执行记录存储器
* 用于存储节点执行的结果。
*/
recorder: Recorder;
currentTask: TaskParam | null;
constructor(config) {
super();
this.nodeQueueMap = new Map();
this.taskRunningMap = new Map();
this.flowModel = config.flowModel;
this.recorder = config.recorder;
this.currentTask = null;
}
/**
* 添加一个任务到队列中。
Expand Down Expand Up @@ -103,10 +118,6 @@ export default class Scheduler extends EventEmitter {
next: this.next.bind(this),
});
}
// 流程执行过程中出错,停止执行
stop(data) {
console.log('stop', data);
}
private pushTaskToRunningMap(taskParam) {
const { executionId, taskId } = taskParam;
if (!this.taskRunningMap.has(executionId)) {
Expand Down Expand Up @@ -171,7 +182,7 @@ export default class Scheduler extends EventEmitter {
detail: execResult.detail,
});
}
private async next(data: NextTaskParam) {
private next(data: NextTaskParam) {
if (data.outgoing && data.outgoing.length > 0) {
data.outgoing.forEach((item) => {
this.addTask({
Expand All @@ -180,19 +191,16 @@ export default class Scheduler extends EventEmitter {
});
});
}
await this.saveTaskResult(data);
this.saveTaskResult(data);
this.removeTaskFromRunningMap(data);
this.run({
executionId: data.executionId,
nodeId: data.nodeId,
taskId: data.taskId,
});
}
/**
* 为了防止多次添加导致
*/
private async saveTaskResult(data: TaskResult) {
await this.recorder.addTask({
private saveTaskResult(data: TaskResult) {
this.recorder.addTask({
executionId: data.executionId,
taskId: data.taskId,
nodeId: data.nodeId,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine/src/recorder/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export default class Recorder implements RecorderInterface {
*/
async addTask(task: RecorderData) {
const { executionId, taskId } = task;
const instanceData = await this.getExecutionTasks(executionId);
const instanceData = this.getExecutionTasks(executionId);
if (!instanceData) {
this.pushExecution(executionId);
}
Expand Down

0 comments on commit 6fa0904

Please sign in to comment.