From a18207b04fa138bff995e6f7cda1de29837027cc Mon Sep 17 00:00:00 2001 From: Simon Larsen Date: Wed, 26 Feb 2025 14:08:11 +0000 Subject: [PATCH] feat: add isManualExecution flag to workflow execution and related components --- .../WorkspaceNotificationRuleService.ts | 16 +- .../BaseModel/OnTriggerBaseModel.ts | 1 + .../Types/Workflow/Components/Schedule.ts | 2 + .../Types/Workflow/Components/Webhook.ts | 1 + Common/Server/Types/Workflow/TriggerCode.ts | 2 + Common/Server/Types/Workflow/Workflow.ts | 1 + Common/Server/Utils/Workspace/Slack/Slack.ts | 153 ++++++++---------- .../Server/Utils/Workspace/WorkspaceBase.ts | 4 +- Common/Types/Workflow/Component.ts | 2 + Common/Types/Workflow/Components/Webhook.ts | 32 +++- Dashboard/src/Pages/Workflow/View/Builder.tsx | 21 +-- Workflow/API/Manual.ts | 5 +- Workflow/Routes.ts | 1 + Workflow/Services/QueueWorkflow.ts | 1 + Workflow/Services/RunWorkflow.ts | 36 ++++- 15 files changed, 166 insertions(+), 112 deletions(-) diff --git a/Common/Server/Services/WorkspaceNotificationRuleService.ts b/Common/Server/Services/WorkspaceNotificationRuleService.ts index b8b57d5576c..ee0f430c7c1 100644 --- a/Common/Server/Services/WorkspaceNotificationRuleService.ts +++ b/Common/Server/Services/WorkspaceNotificationRuleService.ts @@ -29,7 +29,10 @@ import WorkspaceUserAuthTokenService from "./WorkspaceUserAuthTokenService"; import WorkspaceMessagePayload, { WorkspaceMessageBlock, } from "../../Types/Workspace/WorkspaceMessagePayload"; -import WorkspaceProjectAuthToken, { MiscData, SlackMiscData } from "../../Models/DatabaseModels/WorkspaceProjectAuthToken"; +import WorkspaceProjectAuthToken, { + MiscData, + SlackMiscData, +} from "../../Models/DatabaseModels/WorkspaceProjectAuthToken"; import WorkspaceProjectAuthTokenService from "./WorkspaceProjectAuthTokenService"; import logger from "../Utils/Logger"; @@ -52,14 +55,16 @@ export class Service extends DatabaseService { const miscData: MiscData | undefined = data.projectAuthToken.miscData; if (!miscData) { - throw new BadDataException("Misc data not found in project auth token"); + throw new BadDataException("Misc data not found in project auth token"); } - if(data.workspaceType === WorkspaceType.Slack) { - const userId: string = (miscData as SlackMiscData).botUserId; + if (data.workspaceType === WorkspaceType.Slack) { + const userId: string = (miscData as SlackMiscData).botUserId; if (!userId) { - throw new BadDataException("Bot user ID not found in project auth token"); + throw new BadDataException( + "Bot user ID not found in project auth token", + ); } return userId; @@ -68,7 +73,6 @@ export class Service extends DatabaseService { throw new BadDataException("Workspace type not supported"); } - public async createInviteAndPostToChannelsBasedOnRules(data: { projectId: ObjectID; notificationRuleEventType: NotificationRuleEventType; diff --git a/Common/Server/Types/Workflow/Components/BaseModel/OnTriggerBaseModel.ts b/Common/Server/Types/Workflow/Components/BaseModel/OnTriggerBaseModel.ts index 652149a8b67..923e3af708e 100644 --- a/Common/Server/Types/Workflow/Components/BaseModel/OnTriggerBaseModel.ts +++ b/Common/Server/Types/Workflow/Components/BaseModel/OnTriggerBaseModel.ts @@ -222,6 +222,7 @@ export default class OnTriggerBaseModel< returnValues: { data: requestData, }, + isManualExecution: false, }; promises.push(props.executeWorkflow(executeWorkflow)); diff --git a/Common/Server/Types/Workflow/Components/Schedule.ts b/Common/Server/Types/Workflow/Components/Schedule.ts index be8a3ed3540..83441dc612d 100644 --- a/Common/Server/Types/Workflow/Components/Schedule.ts +++ b/Common/Server/Types/Workflow/Components/Schedule.ts @@ -53,6 +53,7 @@ export default class WebhookTrigger extends TriggerCode { const executeWorkflow: ExecuteWorkflowType = { workflowId: new ObjectID(workflow._id!), returnValues: {}, + isManualExecution: false, }; if ( @@ -122,6 +123,7 @@ export default class WebhookTrigger extends TriggerCode { const executeWorkflow: ExecuteWorkflowType = { workflowId: new ObjectID(workflow._id!), returnValues: {}, + isManualExecution: false, }; if ( diff --git a/Common/Server/Types/Workflow/Components/Webhook.ts b/Common/Server/Types/Workflow/Components/Webhook.ts index aa4e4c6beb5..197abf4f7bd 100644 --- a/Common/Server/Types/Workflow/Components/Webhook.ts +++ b/Common/Server/Types/Workflow/Components/Webhook.ts @@ -89,6 +89,7 @@ export default class WebhookTrigger extends TriggerCode { "request-params": req.query, "request-body": req.body, }, + isManualExecution: false, }; await props.executeWorkflow(executeWorkflow); diff --git a/Common/Server/Types/Workflow/TriggerCode.ts b/Common/Server/Types/Workflow/TriggerCode.ts index ffcc0737096..e1d926385f7 100644 --- a/Common/Server/Types/Workflow/TriggerCode.ts +++ b/Common/Server/Types/Workflow/TriggerCode.ts @@ -10,6 +10,8 @@ import { Port } from "Common/Types/Workflow/Component"; export interface ExecuteWorkflowType { workflowId: ObjectID; returnValues: JSONObject; + // is this workflow triggered manually or not + isManualExecution: boolean; } export interface InitProps { diff --git a/Common/Server/Types/Workflow/Workflow.ts b/Common/Server/Types/Workflow/Workflow.ts index 7bf37d00307..413965237a8 100644 --- a/Common/Server/Types/Workflow/Workflow.ts +++ b/Common/Server/Types/Workflow/Workflow.ts @@ -6,4 +6,5 @@ export interface RunProps { workflowId: ObjectID; workflowLogId: ObjectID | null; timeout: number; + isManualExecution: boolean; } diff --git a/Common/Server/Utils/Workspace/Slack/Slack.ts b/Common/Server/Utils/Workspace/Slack/Slack.ts index ae33e220547..2541d225197 100644 --- a/Common/Server/Utils/Workspace/Slack/Slack.ts +++ b/Common/Server/Utils/Workspace/Slack/Slack.ts @@ -15,7 +15,6 @@ import WorkspaceBase, { WorkspaceChannel } from "../WorkspaceBase"; import WorkspaceType from "../../../../Types/Workspace/WorkspaceType"; export default class SlackUtil extends WorkspaceBase { - public static override async joinChannel(data: { authToken: string; channelId: string; @@ -31,9 +30,9 @@ export default class SlackUtil extends WorkspaceBase { }, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", + ["Content-Type"]: "application/x-www-form-urlencoded", }, - ); + ); logger.debug("Response from Slack API for joining channel:"); logger.debug(response); @@ -52,7 +51,6 @@ export default class SlackUtil extends WorkspaceBase { logger.debug("Channel joined successfully with data:"); logger.debug(data); - } public static override async inviteUserToChannelByChannelId(data: { @@ -72,7 +70,7 @@ export default class SlackUtil extends WorkspaceBase { }, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", + ["Content-Type"]: "application/x-www-form-urlencoded", }, ); @@ -99,8 +97,7 @@ export default class SlackUtil extends WorkspaceBase { channelName: string; workspaceUserId: string; }): Promise { - - if(data.channelName && data.channelName.startsWith("#")) { + if (data.channelName && data.channelName.startsWith("#")) { // trim # from channel name data.channelName = data.channelName.substring(1); } @@ -139,9 +136,8 @@ export default class SlackUtil extends WorkspaceBase { logger.debug(existingWorkspaceChannels); for (let channelName of data.channelNames) { - // if channel name starts with #, remove it - if(channelName && channelName.startsWith("#")) { + if (channelName && channelName.startsWith("#")) { channelName = channelName.substring(1); } @@ -168,7 +164,6 @@ export default class SlackUtil extends WorkspaceBase { return workspaceChannels; } - public static override async getWorkspaceChannelFromChannelName(data: { authToken: string; channelName: string; @@ -176,9 +171,10 @@ export default class SlackUtil extends WorkspaceBase { logger.debug("Getting workspace channel ID from channel name with data:"); logger.debug(data); - const channels: Dictionary = await this.getAllWorkspaceChannels({ - authToken: data.authToken, - }); + const channels: Dictionary = + await this.getAllWorkspaceChannels({ + authToken: data.authToken, + }); logger.debug("All workspace channels:"); logger.debug(channels); @@ -191,7 +187,7 @@ export default class SlackUtil extends WorkspaceBase { logger.debug("Workspace channel ID obtained:"); logger.debug(channels[data.channelName]!.id); - return channels[data.channelName]!; + return channels[data.channelName]!; } public static override async getWorkspaceChannelFromChannelId(data: { @@ -209,7 +205,7 @@ export default class SlackUtil extends WorkspaceBase { }, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", + ["Content-Type"]: "application/x-www-form-urlencoded", }, ); @@ -262,7 +258,7 @@ export default class SlackUtil extends WorkspaceBase { {}, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", + ["Content-Type"]: "application/x-www-form-urlencoded", }, ); @@ -329,8 +325,7 @@ export default class SlackUtil extends WorkspaceBase { const channelIdsToPostTo: Array = []; for (let channelName of data.workspaceMessagePayload.channelNames) { - - if(channelName && channelName.startsWith("#")) { + if (channelName && channelName.startsWith("#")) { // trim # from channel name channelName = channelName.substring(1); } @@ -353,19 +348,18 @@ export default class SlackUtil extends WorkspaceBase { for (const channelId of channelIdsToPostTo) { try { - - // check if the user is in the channel. + // check if the user is in the channel. const isUserInChannel = await this.isUserInChannel({ authToken: data.authToken, channelId: channelId, userId: data.userId, }); - if(!isUserInChannel) { - // add user to the channel + if (!isUserInChannel) { + // add user to the channel await this.joinChannel({ authToken: data.authToken, - channelId: channelId + channelId: channelId, }); } @@ -376,7 +370,6 @@ export default class SlackUtil extends WorkspaceBase { }); logger.debug(`Message sent to channel ID ${channelId} successfully.`); - } catch (e) { logger.error(`Error sending message to channel ID ${channelId}:`); logger.error(e); @@ -401,7 +394,7 @@ export default class SlackUtil extends WorkspaceBase { }, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/json", + ["Content-Type"]: "application/json", }, ); @@ -438,7 +431,7 @@ export default class SlackUtil extends WorkspaceBase { }, { Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", + ["Content-Type"]: "application/x-www-form-urlencoded", }, ); @@ -520,15 +513,12 @@ export default class SlackUtil extends WorkspaceBase { return markdownBlock; } - public static override async isUserInChannel(data: { authToken: string; channelId: string; userId: string; }): Promise { - - - const members: Array = []; + const members: Array = []; logger.debug("Checking if user is in channel with data:"); logger.debug(data); @@ -536,63 +526,61 @@ export default class SlackUtil extends WorkspaceBase { let cursor: string | undefined = undefined; do { - - // check if the user is in the channel, return true if they are, false if they are not - - const requestBody: JSONObject = { - channel: data.channelId, - limit: 1000, - }; - - if(cursor) { - requestBody["cursor"] = cursor; - } - - const response: HTTPErrorResponse | HTTPResponse = await API.post( - URL.fromString("https://slack.com/api/conversations.members"), - requestBody, - { - Authorization: `Bearer ${data.authToken}`, - ['Content-Type']: "application/x-www-form-urlencoded", - }, - ); + // check if the user is in the channel, return true if they are, false if they are not - logger.debug("Response from Slack API for getting channel members:"); - logger.debug(response); + const requestBody: JSONObject = { + channel: data.channelId, + limit: 1000, + }; - if (response instanceof HTTPErrorResponse) { - logger.error("Error response from Slack API:"); - logger.error(response); - throw response; - } + if (cursor) { + requestBody["cursor"] = cursor; + } + const response: HTTPErrorResponse | HTTPResponse = + await API.post( + URL.fromString("https://slack.com/api/conversations.members"), + requestBody, + { + Authorization: `Bearer ${data.authToken}`, + ["Content-Type"]: "application/x-www-form-urlencoded", + }, + ); - // check for ok response + logger.debug("Response from Slack API for getting channel members:"); + logger.debug(response); - if ((response.jsonData as JSONObject)?.["ok"] !== true) { - logger.error("Invalid response from Slack API:"); - logger.error(response.jsonData); - throw new BadRequestException("Invalid response"); - } + if (response instanceof HTTPErrorResponse) { + logger.error("Error response from Slack API:"); + logger.error(response); + throw response; + } - // check if the user is in the channel - const membersOnThisPage: Array = (response.jsonData as JSONObject)["members"] as Array; + // check for ok response - members.push(...membersOnThisPage); + if ((response.jsonData as JSONObject)?.["ok"] !== true) { + logger.error("Invalid response from Slack API:"); + logger.error(response.jsonData); + throw new BadRequestException("Invalid response"); + } - cursor = ((response.jsonData as JSONObject)["response_metadata"] as JSONObject)?.["next_cursor"] as string; + // check if the user is in the channel + const membersOnThisPage: Array = ( + response.jsonData as JSONObject + )["members"] as Array; - } while(cursor); + members.push(...membersOnThisPage); - + cursor = ( + (response.jsonData as JSONObject)["response_metadata"] as JSONObject + )?.["next_cursor"] as string; + } while (cursor); - if(members.includes(data.userId)) { + if (members.includes(data.userId)) { return true; } return false; - - } public static override getButtonBlock(data: { @@ -623,17 +611,18 @@ export default class SlackUtil extends WorkspaceBase { logger.debug("Sending message to channel via incoming webhook with data:"); logger.debug(data); - const apiResult: HTTPResponse | HTTPErrorResponse | null = await API.post(data.url, { - blocks: [ - { - type: "section", - text: { - type: "mrkdwn", - text: `${data.text}`, + const apiResult: HTTPResponse | HTTPErrorResponse | null = + await API.post(data.url, { + blocks: [ + { + type: "section", + text: { + type: "mrkdwn", + text: `${data.text}`, + }, }, - }, - ], - }); + ], + }); logger.debug("Response from Slack API for sending message via webhook:"); logger.debug(apiResult); diff --git a/Common/Server/Utils/Workspace/WorkspaceBase.ts b/Common/Server/Utils/Workspace/WorkspaceBase.ts index 4ba85661c21..072c44eb907 100644 --- a/Common/Server/Utils/Workspace/WorkspaceBase.ts +++ b/Common/Server/Utils/Workspace/WorkspaceBase.ts @@ -21,7 +21,6 @@ export interface WorkspaceChannel { } export default class WorkspaceBase { - public static async joinChannel(_data: { authToken: string; channelId: string; @@ -78,8 +77,7 @@ export default class WorkspaceBase { authToken: string; channelId: string; workspaceUserId: string; - }): Promise { - } + }): Promise {} public static async createChannelsIfDoesNotExist(_data: { authToken: string; diff --git a/Common/Types/Workflow/Component.ts b/Common/Types/Workflow/Component.ts index ca347e6e7b3..7bc2e4d6d70 100644 --- a/Common/Types/Workflow/Component.ts +++ b/Common/Types/Workflow/Component.ts @@ -90,6 +90,8 @@ export default interface ComponentMetadata { outPorts: Array; tableName?: string | undefined; documentationLink?: Route; + // this is used in trigger component to show the manual execution button + runWorkflowManuallyArguments?: Array | undefined; } export interface ComponentCategory { diff --git a/Common/Types/Workflow/Components/Webhook.ts b/Common/Types/Workflow/Components/Webhook.ts index 3625a707f77..3e17c62e63b 100644 --- a/Common/Types/Workflow/Components/Webhook.ts +++ b/Common/Types/Workflow/Components/Webhook.ts @@ -16,7 +16,37 @@ const components: Array = [ iconProp: IconProp.AltGlobe, componentType: ComponentType.Trigger, documentationLink: Route.fromString("/workflow/docs/Webhook.md"), - arguments: [], + + arguments: [ + + ], + runWorkflowManuallyArguments: [ + { + id: "request-headers", + name: "Request Headers", + description: "Request Headers for this request", + type: ComponentInputType.StringDictionary, + required: false, + placeholder: '{"header1": "value1", "header2": "value2", ....}', + }, + { + id: "request-params", + name: "Request Query Params", + description: "Request Query Params for this request", + type: ComponentInputType.StringDictionary, + required: false, + placeholder: '{"query1": "value1", "query2": "value2", ....}', + }, + { + id: "request-body", + name: "Request Body", + description: "Request Body", + type: ComponentInputType.JSON, + required: false, + placeholder: '{"key1": "value1", "key2": "value2", ....}', + }, + ], + returnValues: [ { id: "request-headers", diff --git a/Dashboard/src/Pages/Workflow/View/Builder.tsx b/Dashboard/src/Pages/Workflow/View/Builder.tsx index 03af336f531..97c8fe5be3f 100644 --- a/Dashboard/src/Pages/Workflow/View/Builder.tsx +++ b/Dashboard/src/Pages/Workflow/View/Builder.tsx @@ -312,16 +312,17 @@ const Delete: FunctionComponent = (): ReactElement => { }} onRun={async (component: NodeDataProp) => { try { - const result: HTTPErrorResponse | HTTPResponse = await API.post( - URL.fromString(WORKFLOW_URL.toString()).addRoute( - "/manual/run/" + modelId.toString(), - ), - { - data: component.returnValues, - }, - ); - - if(result instanceof HTTPErrorResponse) { + const result: HTTPErrorResponse | HTTPResponse = + await API.post( + URL.fromString(WORKFLOW_URL.toString()).addRoute( + "/manual/run/" + modelId.toString(), + ), + { + data: component.returnValues, + }, + ); + + if (result instanceof HTTPErrorResponse) { throw result; } diff --git a/Workflow/API/Manual.ts b/Workflow/API/Manual.ts index 451d35ea93c..9baa4e10c17 100644 --- a/Workflow/API/Manual.ts +++ b/Workflow/API/Manual.ts @@ -23,9 +23,8 @@ export default class ManualAPI { public async manuallyRunWorkflow( req: ExpressRequest, res: ExpressResponse, - next: NextFunction + next: NextFunction, ): Promise { - try { // add this workflow to the run queue and return the 200 response. @@ -40,12 +39,12 @@ export default class ManualAPI { await QueueWorkflow.addWorkflowToQueue({ workflowId: new ObjectID(req.params["workflowId"] as string), returnValues: req.body.data || {}, + isManualExecution: true, }); return Response.sendJsonObjectResponse(req, res, { status: "Scheduled", }); - } catch (err) { next(err); } diff --git a/Workflow/Routes.ts b/Workflow/Routes.ts index 9f076106062..0f10e97e552 100644 --- a/Workflow/Routes.ts +++ b/Workflow/Routes.ts @@ -52,6 +52,7 @@ const WorkflowFeatureSet: FeatureSet = { : null, arguments: job.data.data as JSONObject, timeout: WorkflowTimeoutInMs || 5000, + isManualExecution: job.data.isManualExecution || false, }); }, { concurrency: 100 }, diff --git a/Workflow/Services/QueueWorkflow.ts b/Workflow/Services/QueueWorkflow.ts index 413937264c9..9c0dbdf0be9 100644 --- a/Workflow/Services/QueueWorkflow.ts +++ b/Workflow/Services/QueueWorkflow.ts @@ -187,6 +187,7 @@ export default class QueueWorkflow { data: executeWorkflow.returnValues, workflowLogId: workflowLog?._id || null, workflowId: workflow._id, + isManualExecution: executeWorkflow.isManualExecution, // this is to check if the workflow is triggered manually or not. }, { scheduleAt: scheduleAt, diff --git a/Workflow/Services/RunWorkflow.ts b/Workflow/Services/RunWorkflow.ts index 14b6f872951..cc0ff0b6384 100644 --- a/Workflow/Services/RunWorkflow.ts +++ b/Workflow/Services/RunWorkflow.ts @@ -135,7 +135,9 @@ export default class RunWorkflow { // form a run stack. - const runStack: RunStack = await this.makeRunStack(workflow.graph); + const runStack: RunStack = await this.makeRunStack({ + graph: workflow.graph, + }); const getVariableResult: { storageMap: StorageMap; @@ -212,16 +214,34 @@ export default class RunWorkflow { this.log(args); this.log("Component Logs: " + executeComponentId); - const result: RunReturnType = await this.runComponent( - args, - stackItem.node, - setDidErrorOut, - ); + let result: RunReturnType | null = null; + + if ( + runProps.isManualExecution && + stackItem.node.componentType === ComponentType.Trigger + ) { + // skip the trigger component if this is a manual execution. + result = { + returnValues: runProps.arguments, + executePort: stackItem.node.metadata.outPorts[0] || undefined, + }; + } else { + result = await this.runComponent( + args, + stackItem.node, + setDidErrorOut, + ); + } if (didWorkflowErrorOut) { throw new BadDataException("Workflow stopped because of an error"); } + if (!result) { + this.log("No result returned from component: " + executeComponentId); + break; + } + this.log("Completed Execution Component: " + executeComponentId); this.log("Data Returned"); this.log(result.returnValues); @@ -503,7 +523,9 @@ export default class RunWorkflow { } } - public async makeRunStack(graph: JSONObject): Promise { + public async makeRunStack(data: { graph: JSONObject }): Promise { + const graph: JSONObject = data.graph; + const nodes: Array = graph["nodes"] as Array; const edges: Array = graph["edges"] as Array;