Skip to content

Commit

Permalink
fix(core): Ensure that 'workflow-post-execute' event has userId whene…
Browse files Browse the repository at this point in the history
…ver it's available (#13326)
  • Loading branch information
netroy authored and tomi committed Feb 20, 2025
1 parent 4da7ed7 commit 088872b
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ describe('Execution Lifecycle Hooks', () => {
const expressionError = new ExpressionError('Error');
const pushRef = 'test-push-ref';
const retryOf = 'test-retry-of';
const userId = 'test-user-id';

const now = new Date('2025-01-13T18:25:50.267Z');
jest.useFakeTimers({ now });
Expand All @@ -110,7 +111,7 @@ describe('Execution Lifecycle Hooks', () => {
};
});

const workflowEventTests = () => {
const workflowEventTests = (expectedUserId?: string) => {
describe('workflowExecuteBefore', () => {
it('should emit workflow-pre-execute events', async () => {
await lifecycleHooks.runHook('workflowExecuteBefore', [workflow, runExecutionData]);
Expand All @@ -130,6 +131,7 @@ describe('Execution Lifecycle Hooks', () => {
executionId,
runData: successfulRun,
workflow: workflowData,
userId: expectedUserId,
});
});

Expand Down Expand Up @@ -214,15 +216,15 @@ describe('Execution Lifecycle Hooks', () => {
describe('getLifecycleHooksForRegularMain', () => {
const createHooks = (executionMode: WorkflowExecuteMode = 'manual') =>
getLifecycleHooksForRegularMain(
{ executionMode, workflowData, pushRef, retryOf },
{ executionMode, workflowData, pushRef, retryOf, userId },
executionId,
);

beforeEach(() => {
lifecycleHooks = createHooks();
});

workflowEventTests();
workflowEventTests(userId);
nodeEventsTests();
externalHooksTests();
statisticsTests();
Expand Down Expand Up @@ -527,13 +529,19 @@ describe('Execution Lifecycle Hooks', () => {

describe('getLifecycleHooksForScalingMain', () => {
beforeEach(() => {
lifecycleHooks = getLifecycleHooksForScalingMain('manual', executionId, workflowData, {
pushRef,
retryOf,
});
lifecycleHooks = getLifecycleHooksForScalingMain(
{
executionMode: 'manual',
workflowData,
pushRef,
retryOf,
userId,
},
executionId,
);
});

workflowEventTests();
workflowEventTests(userId);
externalHooksTests();

it('should setup the correct set of hooks', () => {
Expand Down Expand Up @@ -566,13 +574,13 @@ describe('Execution Lifecycle Hooks', () => {
saveDataErrorExecution: 'all',
};
const lifecycleHooks = getLifecycleHooksForScalingMain(
'webhook',
executionId,
workflowData,
{
executionMode: 'webhook',
workflowData,
pushRef,
retryOf,
},
executionId,
);

await lifecycleHooks.runHook('workflowExecuteAfter', [successfulRun, {}]);
Expand All @@ -589,13 +597,13 @@ describe('Execution Lifecycle Hooks', () => {
saveDataErrorExecution: 'none',
};
const lifecycleHooks = getLifecycleHooksForScalingMain(
'webhook',
executionId,
workflowData,
{
executionMode: 'webhook',
workflowData,
pushRef,
retryOf,
},
executionId,
);

await lifecycleHooks.runHook('workflowExecuteAfter', [failedRun, {}]);
Expand All @@ -610,10 +618,10 @@ describe('Execution Lifecycle Hooks', () => {

describe('getLifecycleHooksForScalingWorker', () => {
const createHooks = (executionMode: WorkflowExecuteMode = 'manual') =>
getLifecycleHooksForScalingWorker(executionMode, executionId, workflowData, {
pushRef,
retryOf,
});
getLifecycleHooksForScalingWorker(
{ executionMode, workflowData, pushRef, retryOf },
executionId,
);

beforeEach(() => {
lifecycleHooks = createHooks();
Expand Down
26 changes: 12 additions & 14 deletions packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,22 +376,21 @@ export function getLifecycleHooksForSubExecutions(
* Returns ExecutionLifecycleHooks instance for worker in scaling mode.
*/
export function getLifecycleHooksForScalingWorker(
mode: WorkflowExecuteMode,
data: IWorkflowExecutionDataProcess,
executionId: string,
workflowData: IWorkflowBase,
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): ExecutionLifecycleHooks {
const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData);
const { pushRef, retryOf, executionMode, workflowData } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
hookFunctionsNodeEvents(hooks);
hookFunctionsFinalizeExecutionStatus(hooks);
hookFunctionsSaveWorker(hooks, optionalParameters);
hookFunctionsSaveProgress(hooks, optionalParameters);
hookFunctionsStatistics(hooks);
hookFunctionsExternalHooks(hooks);

if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
if (executionMode === 'manual' && Container.get(InstanceSettings).isWorker) {
hookFunctionsPush(hooks, optionalParameters);
}

Expand All @@ -402,17 +401,16 @@ export function getLifecycleHooksForScalingWorker(
* Returns ExecutionLifecycleHooks instance for main process if workflow runs via worker
*/
export function getLifecycleHooksForScalingMain(
mode: WorkflowExecuteMode,
data: IWorkflowExecutionDataProcess,
executionId: string,
workflowData: IWorkflowBase,
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): ExecutionLifecycleHooks {
const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData);
const { pushRef, retryOf, executionMode, workflowData, userId } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
const executionRepository = Container.get(ExecutionRepository);

hookFunctionsWorkflowEvents(hooks);
hookFunctionsWorkflowEvents(hooks, userId);
hookFunctionsSaveProgress(hooks, optionalParameters);
hookFunctionsExternalHooks(hooks);
hookFunctionsFinalizeExecutionStatus(hooks);
Expand Down Expand Up @@ -466,11 +464,11 @@ export function getLifecycleHooksForRegularMain(
data: IWorkflowExecutionDataProcess,
executionId: string,
): ExecutionLifecycleHooks {
const { pushRef, retryOf, executionMode, workflowData } = data;
const { pushRef, retryOf, executionMode, workflowData, userId } = data;
const hooks = new ExecutionLifecycleHooks(executionMode, executionId, workflowData);
const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
hookFunctionsWorkflowEvents(hooks);
hookFunctionsWorkflowEvents(hooks, userId);
hookFunctionsNodeEvents(hooks);
hookFunctionsFinalizeExecutionStatus(hooks);
hookFunctionsSave(hooks, optionalParameters);
Expand Down
11 changes: 7 additions & 4 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ export class JobProcessor {
const { pushRef } = job.data;

const lifecycleHooks = getLifecycleHooksForScalingWorker(
execution.mode,
job.data.executionId,
execution.workflowData,
{ retryOf: execution.retryOf ?? undefined, pushRef },
{
executionMode: execution.mode,
workflowData: execution.workflowData,
retryOf: execution.retryOf,
pushRef,
},
executionId,
);
additionalData.hooks = lifecycleHooks;

Expand Down
31 changes: 4 additions & 27 deletions packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,27 +345,15 @@ export class WorkflowRunner {
try {
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });

lifecycleHooks = getLifecycleHooksForScalingMain(
data.executionMode,
executionId,
data.workflowData,
{
retryOf: data.retryOf ?? undefined,
},
);
lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId);

// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]);
} catch (error) {
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);
const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks);
throw error;
}
Expand All @@ -378,13 +366,7 @@ export class WorkflowRunner {

// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);

const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
const error = new ExecutionCancelledError(executionId);
await this.processError(
error,
Expand All @@ -409,12 +391,7 @@ export class WorkflowRunner {

// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(
data.executionMode,
executionId,
data.workflowData,
{ retryOf: data.retryOf ?? undefined },
);
const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);

await this.processError(
error,
Expand Down

0 comments on commit 088872b

Please # to comment.