Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Feb 3, 2025
1 parent 9b35264 commit db229a3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,28 @@ describe('Execution Lifecycle Hooks', () => {
});
};

const statisticsTests = () => {
describe('statistics events', () => {
it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);

expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData: successfulRun,
});
});

it('nodeFetchedData should handle nodeFetchedData statistics event', async () => {
await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]);

expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', {
workflowId,
node,
});
});
});
};

describe('getWorkflowHooksMain', () => {
const createHooks = () =>
getWorkflowHooksMain({ executionMode, workflowData, pushRef, retryOf }, executionId);
Expand All @@ -195,6 +217,7 @@ describe('Execution Lifecycle Hooks', () => {
workflowEventTests();
nodeEventsTests();
externalHooksTests();
statisticsTests();

it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
Expand Down Expand Up @@ -485,26 +508,6 @@ describe('Execution Lifecycle Hooks', () => {
});
});

describe('statistics events', () => {
it('workflowExecuteAfter should emit workflowExecutionCompleted statistics event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);

expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
workflowData,
fullRunData: successfulRun,
});
});

it('nodeFetchedData should handle nodeFetchedData statistics event', async () => {
await hooks.executeHookFunctions('nodeFetchedData', [workflowId, node]);

expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', {
workflowId,
node,
});
});
});

describe("when pushRef isn't set", () => {
beforeEach(() => {
hooks = getWorkflowHooksMain({ executionMode, workflowData, retryOf }, executionId);
Expand Down Expand Up @@ -617,6 +620,7 @@ describe('Execution Lifecycle Hooks', () => {

nodeEventsTests();
externalHooksTests();
statisticsTests();

it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
Expand Down Expand Up @@ -712,6 +716,7 @@ describe('Execution Lifecycle Hooks', () => {
workflowEventTests();
nodeEventsTests();
externalHooksTests();
statisticsTests();

it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mock } from 'jest-mock-extended';
import { ErrorReporter } from 'n8n-core';
import { Logger } from 'n8n-core';
import type { IRunExecutionData, ITaskData } from 'n8n-workflow';
Expand All @@ -17,13 +18,13 @@ describe('saveExecutionProgress', () => {
jest.resetAllMocks();
});

const commonArgs: [string, string, string, ITaskData, IRunExecutionData] = [
'some-workflow-id',
'some-execution-id',
'My Node',
{} as ITaskData,
{} as IRunExecutionData,
];
const workflowId = 'some-workflow-id';
const executionId = 'some-execution-id';
const nodeName = 'My Node';
const taskData = mock<ITaskData>();
const runExecutionData = mock<IRunExecutionData>();

const commonArgs = [workflowId, executionId, nodeName, taskData, runExecutionData] as const;

test('should not try to update non-existent executions', async () => {
executionRepository.findSingleExecution.mockResolvedValue(undefined);
Expand Down Expand Up @@ -59,9 +60,11 @@ describe('saveExecutionProgress', () => {
});

test('should not try to update finished executions', async () => {
executionRepository.findSingleExecution.mockResolvedValue({
finished: true,
} as IExecutionResponse);
executionRepository.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({
finished: true,
}),
);

await saveExecutionProgress(...commonArgs);

Expand All @@ -76,11 +79,11 @@ describe('saveExecutionProgress', () => {

expect(fullExecutionData).toEqual({
data: {
executionData: undefined,
executionData: runExecutionData.executionData,
resultData: {
lastNodeExecuted: 'My Node',
lastNodeExecuted: nodeName,
runData: {
'My Node': [{}],
[nodeName]: [taskData],
},
},
startData: {},
Expand All @@ -89,7 +92,7 @@ describe('saveExecutionProgress', () => {
});

expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
'some-execution-id',
executionId,
fullExecutionData,
);

Expand All @@ -102,7 +105,7 @@ describe('saveExecutionProgress', () => {
startData: {},
resultData: {
runData: {
'My Node': [{}],
[nodeName]: [{}],
},
},
},
Expand All @@ -113,11 +116,11 @@ describe('saveExecutionProgress', () => {

expect(fullExecutionData).toEqual({
data: {
executionData: undefined,
executionData: runExecutionData.executionData,
resultData: {
lastNodeExecuted: 'My Node',
lastNodeExecuted: nodeName,
runData: {
'My Node': [{}, {}],
[nodeName]: [{}, taskData],
},
},
startData: {},
Expand All @@ -126,7 +129,7 @@ describe('saveExecutionProgress', () => {
});

expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
'some-execution-id',
executionId,
fullExecutionData,
);
});
Expand All @@ -144,6 +147,6 @@ describe('saveExecutionProgress', () => {

await saveExecutionProgress(...commonArgs);

expect(fullExecutionData.data.resultData.lastNodeExecuted).toEqual('My Node');
expect(fullExecutionData.data.resultData.lastNodeExecuted).toEqual(nodeName);
});
});
28 changes: 16 additions & 12 deletions packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import {
prepareExecutionDataForDbUpdate,
updateExistingExecution,
} from './shared/shared-hook-functions';
import { type ExecutionSavingSettings, toSaveSettings } from './to-save-settings';
import { type ExecutionSaveSettings, toSaveSettings } from './to-save-settings';

type HooksSetupParameters = {
saveSettings: ExecutionSavingSettings;
saveSettings: ExecutionSaveSettings;
pushRef?: string;
retryOf?: string;
};
Expand Down Expand Up @@ -238,6 +238,17 @@ function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks {
};
}

function hookFunctionsStatistics(): IWorkflowExecuteHooks {
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
return {
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}

/**
* Returns hook functions to save workflow execution and call error workflow
*/
Expand Down Expand Up @@ -358,11 +369,6 @@ function hookFunctionsSave({
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}

Expand Down Expand Up @@ -452,11 +458,6 @@ function hookFunctionsSaveWorker({
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}

Expand All @@ -477,6 +478,7 @@ export function getWorkflowHooksIntegrated(
hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSave({ saveSettings }),
hookFunctionsSaveProgress({ saveSettings }),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
Expand All @@ -498,6 +500,7 @@ export function getWorkflowHooksWorkerExecuter(
hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSaveWorker(optionalParameters),
hookFunctionsSaveProgress(optionalParameters),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
];

Expand Down Expand Up @@ -590,6 +593,7 @@ export function getWorkflowHooksMain(
hookFunctionsSave(optionalParameters),
hookFunctionsPush(optionalParameters),
hookFunctionsSaveProgress(optionalParameters),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
);
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export async function saveExecutionProgress(

// We do this to prevent crashes and executions ending in `unknown` state.
logger.error(
`Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
`Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsSaveProgress, nodeExecuteAfter)`,
{ error, executionId, workflowId },
);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/execution-lifecycle/to-save-settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { IWorkflowSettings } from 'n8n-workflow';

import config from '@/config';

export type ExecutionSavingSettings = {
export type ExecutionSaveSettings = {
error: boolean | 'all' | 'none';
success: boolean | 'all' | 'none';
manual: boolean;
Expand All @@ -17,7 +17,7 @@ export type ExecutionSavingSettings = {
* - `manual`: Whether to save successful or failed manual executions.
* - `progress`: Whether to save execution progress, i.e. after each node's execution.
*/
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}): ExecutionSavingSettings {
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}): ExecutionSaveSettings {
const DEFAULTS = {
ERROR: config.getEnv('executions.saveDataOnError'),
SUCCESS: config.getEnv('executions.saveDataOnSuccess'),
Expand Down

0 comments on commit db229a3

Please sign in to comment.