From f41e353887fef4269510d25fa87b73da4cf925f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 18 Feb 2025 10:23:57 +0100 Subject: [PATCH] fix(core): Ensure that 'workflow-post-execute' event has userId whenever it's available (#13326) --- .../execution-lifecycle-hooks.test.ts | 44 +++++++++++-------- .../execution-lifecycle-hooks.ts | 26 +++++------ packages/cli/src/scaling/job-processor.ts | 11 +++-- packages/cli/src/workflow-runner.ts | 31 ++----------- 4 files changed, 49 insertions(+), 63 deletions(-) diff --git a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts index 747cd775c0b0a..c7e4ecd8dd843 100644 --- a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts @@ -88,6 +88,7 @@ describe('Execution Lifecycle Hooks', () => { const expressionError = new ExpressionError('Error'); const pushRef = 'test-push-ref'; const retryOf = 'test-retry-of'; + const userId = 'test-user-id'; const now = new Date('2025-01-13T18:25:50.267Z'); jest.useFakeTimers({ now }); @@ -110,7 +111,7 @@ describe('Execution Lifecycle Hooks', () => { }; }); - const workflowEventTests = () => { + const workflowEventTests = (expectedUserId?: string) => { describe('workflowExecuteBefore', () => { it('should emit workflow-pre-execute events', async () => { await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]); @@ -130,6 +131,7 @@ describe('Execution Lifecycle Hooks', () => { executionId, runData: successfulRun, workflow: workflowData, + userId: expectedUserId, }); }); @@ -214,7 +216,7 @@ describe('Execution Lifecycle Hooks', () => { describe('getLifecycleHooksForRegularMain', () => { const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => getLifecycleHooksForRegularMain( - { executionMode, workflowData, pushRef, retryOf }, + { executionMode, workflowData, pushRef, retryOf, userId }, executionId, ); @@ -222,7 +224,7 @@ describe('Execution Lifecycle Hooks', () => { lifecycleHooks = createHooks(); }); - workflowEventTests(); + workflowEventTests(userId); nodeEventsTests(); externalHooksTests(); statisticsTests(); @@ -527,13 +529,19 @@ describe('Execution Lifecycle Hooks', () => { describe('getLifecycleHooksForScalingMain', () => { beforeEach(() => { - lifecycleHooks = getLifecycleHooksForScalingMain('manual', executionId, workflowData, { - pushRef, - retryOf, - }); + lifecycleHooks = getLifecycleHooksForScalingMain( + { + executionMode: 'manual', + workflowData, + pushRef, + retryOf, + userId, + }, + executionId, + ); }); - workflowEventTests(); + workflowEventTests(userId); externalHooksTests(); it('should setup the correct set of hooks', () => { @@ -566,13 +574,13 @@ describe('Execution Lifecycle Hooks', () => { saveDataErrorExecution: 'all', }; const lifecycleHooks = getLifecycleHooksForScalingMain( - 'webhook', - executionId, - workflowData, { + executionMode: 'webhook', + workflowData, pushRef, retryOf, }, + executionId, ); await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]); @@ -589,13 +597,13 @@ describe('Execution Lifecycle Hooks', () => { saveDataErrorExecution: 'none', }; const lifecycleHooks = getLifecycleHooksForScalingMain( - 'webhook', - executionId, - workflowData, { + executionMode: 'webhook', + workflowData, pushRef, retryOf, }, + executionId, ); await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]); @@ -610,10 +618,10 @@ describe('Execution Lifecycle Hooks', () => { describe('getLifecycleHooksForScalingWorker', () => { const createHooks = (executionMode: WorkflowExecuteMode = 'manual') => - getLifecycleHooksForScalingWorker(executionMode, executionId, workflowData, { - pushRef, - retryOf, - }); + getLifecycleHooksForScalingWorker( + { executionMode, workflowData, pushRef, retryOf }, + executionId, + ); beforeEach(() => { lifecycleHooks = createHooks(); diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index 237bd25edb2d1..fec17f2dec80a 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -376,14 +376,13 @@ export function getLifecycleHooksForSubExecutions( * Returns ExecutionLifecycleHooks instance for worker in scaling mode. */ export function getLifecycleHooksForScalingWorker( - mode: WorkflowExecuteMode, + data: IWorkflowExecutionDataProcess, executionId: string, - workflowData: IWorkflowBase, - { pushRef, retryOf }: Omit = {}, ): ExecutionLifecycleHooks { - const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); + const { pushRef, retryOf, executionMode, workflowData } = data; + const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); - const optionalParameters = { pushRef, retryOf, saveSettings }; + const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; hookFunctionsNodeEvents(hooks); hookFunctionsFinalizeExecutionStatus(hooks); hookFunctionsSaveWorker(hooks, optionalParameters); @@ -391,7 +390,7 @@ export function getLifecycleHooksForScalingWorker( hookFunctionsStatistics(hooks); hookFunctionsExternalHooks(hooks); - if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { + if (executionMode === 'manual' && Container.get(InstanceSettings).isWorker) { hookFunctionsPush(hooks, optionalParameters); } @@ -402,17 +401,16 @@ export function getLifecycleHooksForScalingWorker( * Returns ExecutionLifecycleHooks instance for main process if workflow runs via worker */ export function getLifecycleHooksForScalingMain( - mode: WorkflowExecuteMode, + data: IWorkflowExecutionDataProcess, executionId: string, - workflowData: IWorkflowBase, - { pushRef, retryOf }: Omit = {}, ): ExecutionLifecycleHooks { - const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); + const { pushRef, retryOf, executionMode, workflowData, userId } = data; + const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); - const optionalParameters = { pushRef, retryOf, saveSettings }; + const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; const executionRepository = Container.get(ExecutionRepository); - hookFunctionsWorkflowEvents(hooks); + hookFunctionsWorkflowEvents(hooks, userId); hookFunctionsSaveProgress(hooks, optionalParameters); hookFunctionsExternalHooks(hooks); hookFunctionsFinalizeExecutionStatus(hooks); @@ -466,11 +464,11 @@ export function getLifecycleHooksForRegularMain( data: IWorkflowExecutionDataProcess, executionId: string, ): ExecutionLifecycleHooks { - const { pushRef, retryOf, executionMode, workflowData } = data; + const { pushRef, retryOf, executionMode, workflowData, userId } = data; const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData); const saveSettings = toSaveSettings(workflowData.settings); const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings }; - hookFunctionsWorkflowEvents(hooks); + hookFunctionsWorkflowEvents(hooks, userId); hookFunctionsNodeEvents(hooks); hookFunctionsFinalizeExecutionStatus(hooks); hookFunctionsSave(hooks, optionalParameters); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 18a6f6b39f400..91d051b240cc2 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -132,10 +132,13 @@ export class JobProcessor { const { pushRef } = job.data; const lifecycleHooks = getLifecycleHooksForScalingWorker( - execution.mode, - job.data.executionId, - execution.workflowData, - { retryOf: execution.retryOf ?? undefined, pushRef }, + { + executionMode: execution.mode, + workflowData: execution.workflowData, + retryOf: execution.retryOf, + pushRef, + }, + executionId, ); additionalData.hooks = lifecycleHooks; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 82beb20648b14..88530890ae40a 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -345,14 +345,7 @@ export class WorkflowRunner { try { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); - lifecycleHooks = getLifecycleHooksForScalingMain( - data.executionMode, - executionId, - data.workflowData, - { - retryOf: data.retryOf ?? undefined, - }, - ); + lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId); // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. @@ -360,12 +353,7 @@ export class WorkflowRunner { } catch (error) { // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const lifecycleHooks = getLifecycleHooksForScalingWorker( - data.executionMode, - executionId, - data.workflowData, - { retryOf: data.retryOf ?? undefined }, - ); + const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks); throw error; } @@ -378,13 +366,7 @@ export class WorkflowRunner { // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const lifecycleHooks = getLifecycleHooksForScalingWorker( - data.executionMode, - executionId, - data.workflowData, - { retryOf: data.retryOf ?? undefined }, - ); - + const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); const error = new ExecutionCancelledError(executionId); await this.processError( error, @@ -409,12 +391,7 @@ export class WorkflowRunner { // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. - const lifecycleHooks = getLifecycleHooksForScalingWorker( - data.executionMode, - executionId, - data.workflowData, - { retryOf: data.retryOf ?? undefined }, - ); + const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); await this.processError( error,