Skip to content

Commit

Permalink
Merge pull request FlowiseAI#111 from FlowiseAI/feature/ChildProcessing
Browse files Browse the repository at this point in the history
  • Loading branch information
chungyau97 authored May 18, 2023
2 parents 275781a + 205de44 commit 0c16bca
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 67 deletions.
3 changes: 2 additions & 1 deletion packages/server/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
PORT=3000
# USERNAME=user
# PASSWORD=1234
# PASSWORD=1234
# EXECUTION_MODE=child or main
148 changes: 148 additions & 0 deletions packages/server/src/ChildProcess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils'

export class ChildProcess {
/**
* Stop child process when app is killed
*/
static async stopChildProcess() {
setTimeout(() => {
process.exit(0)
}, 50000)
}

/**
* Process prediction
* @param {IRunChatflowMessageValue} messageValue
* @return {Promise<void>}
*/
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
process.on('SIGTERM', ChildProcess.stopChildProcess)
process.on('SIGINT', ChildProcess.stopChildProcess)

await sendToParentProcess('start', '_')

// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue

let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}

/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges

/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}

const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}

if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
return
}

/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)

/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
)

const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}

const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData

const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
}

const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()

const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })

await sendToParentProcess('finish', { result, addToChatFlowPool })
}
}

/**
* Send data back to parent process
* @param {string} key Key of message
* @param {*} value Value of message
* @returns {Promise<void>}
*/
async function sendToParentProcess(key: string, value: any): Promise<void> {
// tslint:disable-line:no-any
return new Promise((resolve, reject) => {
process.send!(
{
key,
value
},
(error: Error) => {
if (error) {
return reject(error)
}
resolve()
}
)
})
}

const childProcess = new ChildProcess()

process.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'start') {
await childProcess.runChildProcess(message.value)
process.exit()
}
})
12 changes: 12 additions & 0 deletions packages/server/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,15 @@ export interface IDatabaseExport {
chatflows: IChatFlow[]
apikeys: ICommonObject[]
}

export interface IRunChatflowMessageValue {
chatflow: IChatFlow
incomingInput: IncomingInput
componentNodes: IComponentNodes
endingNodeData?: INodeData
}

export interface IChildProcessMessage {
key: string
value?: any
}
Loading

0 comments on commit 0c16bca

Please sign in to comment.