Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(core): Don't fail task runner task if logging fails #12401

Merged
merged 5 commits into from
Dec 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -139,7 +139,8 @@ describe('JsTaskRunner', () => {
});

expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'Hello world!',
"'Hello'",
"'world!'",
]);
},
);
@@ -173,6 +174,44 @@ describe('JsTaskRunner', () => {
}),
).resolves.toBeDefined();
});

it('should not throw when trying to log the context object', async () => {
const task = newTaskWithSettings({
code: `
console.log(this);
return {json: {}}
`,
nodeMode: 'runOnceForAllItems',
});

await expect(
execTaskWithParams({
task,
taskData: newDataRequestResponse([wrapIntoJson({})]),
}),
).resolves.toBeDefined();
});

it('should log the context object as [[ExecutionContext]]', async () => {
const rpcCallSpy = jest.spyOn(defaultTaskRunner, 'makeRpcCall').mockResolvedValue(undefined);

const task = newTaskWithSettings({
code: `
console.log(this);
return {json: {}}
`,
nodeMode: 'runOnceForAllItems',
});

await execTaskWithParams({
task,
taskData: newDataRequestResponse([wrapIntoJson({})]),
});

expect(rpcCallSpy).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
'[[ExecutionContext]]',
]);
});
});

describe('built-in methods and variables available in the context', () => {
98 changes: 57 additions & 41 deletions packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts
Original file line number Diff line number Diff line change
@@ -15,8 +15,10 @@ import type {
EnvProviderState,
IExecuteData,
INodeTypeDescription,
IWorkflowDataProxyData,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { inspect } from 'node:util';
import { runInNewContext, type Context } from 'node:vm';

import type { MainConfig } from '@/config/main-config';
@@ -79,6 +81,8 @@ type CustomConsole = {
log: (...args: unknown[]) => void;
};

const noOp = () => {};

export class JsTaskRunner extends TaskRunner {
private readonly requireResolver: RequireResolver;

@@ -129,29 +133,12 @@ export class JsTaskRunner extends TaskRunner {
nodeTypes: this.nodeTypes,
});

const noOp = () => {};
const customConsole = {
// all except `log` are dummy methods that disregard without throwing, following existing Code node behavior
...Object.keys(console).reduce<Record<string, () => void>>((acc, name) => {
acc[name] = noOp;
return acc;
}, {}),
// Send log output back to the main process. It will take care of forwarding
// it to the UI or printing to console.
log: (...args: unknown[]) => {
const logOutput = args
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};

workflow.staticData = ObservableObject.create(workflow.staticData);

const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole, signal);
? await this.runForAllItems(task.taskId, settings, data, workflow, signal)
: await this.runForEachItem(task.taskId, settings, data, workflow, signal);

return {
result,
@@ -200,22 +187,14 @@ export class JsTaskRunner extends TaskRunner {
settings: JSExecSettings,
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData;

const context: Context = {
require: this.requireResolver,
module: {},
console: customConsole,
const context = this.buildContext(taskId, workflow, data.node, dataProxy, {
items: inputItems,
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, data.node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
};
});

try {
const result = await new Promise<TaskResultData['result']>((resolve, reject) => {
@@ -264,7 +243,6 @@ export class JsTaskRunner extends TaskRunner {
settings: JSExecSettings,
data: JsTaskData,
workflow: Workflow,
customConsole: CustomConsole,
signal: AbortSignal,
): Promise<INodeExecutionData[]> {
const inputItems = data.connectionInputData;
@@ -279,17 +257,7 @@ export class JsTaskRunner extends TaskRunner {
for (let index = chunkStartIdx; index < chunkEndIdx; index++) {
const item = inputItems[index];
const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = {
require: this.requireResolver,
module: {},
console: customConsole,
item,
$getWorkflowStaticData: (type: 'global' | 'node') =>
workflow.getStaticData(type, data.node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
};
const context = this.buildContext(taskId, workflow, data.node, dataProxy, { item });

try {
let result = await new Promise<INodeExecutionData | undefined>((resolve, reject) => {
@@ -467,4 +435,52 @@ export class JsTaskRunner extends TaskRunner {

return rpcObject;
}

private buildCustomConsole(taskId: string): CustomConsole {
return {
// all except `log` are dummy methods that disregard without throwing, following existing Code node behavior
...Object.keys(console).reduce<Record<string, () => void>>((acc, name) => {
acc[name] = noOp;
return acc;
}, {}),

// Send log output back to the main process. It will take care of forwarding
// it to the UI or printing to console.
log: (...args: unknown[]) => {
const formattedLogArgs = args.map((arg) => inspect(arg));
void this.makeRpcCall(taskId, 'logNodeOutput', formattedLogArgs);
},
};
}

/**
* Builds the 'global' context object that is passed to the script
*
* @param taskId The ID of the task. Needed for RPC calls
* @param workflow The workflow that is being executed. Needed for static data
* @param node The node that is being executed. Needed for static data
* @param dataProxy The data proxy object that provides access to built-ins
* @param additionalProperties Additional properties to add to the context
*/
private buildContext(
taskId: string,
workflow: Workflow,
node: INode,
dataProxy: IWorkflowDataProxyData,
additionalProperties: Record<string, unknown> = {},
): Context {
const context: Context = {
[inspect.custom]: () => '[[ExecutionContext]]',
require: this.requireResolver,
module: {},
console: this.buildCustomConsole(taskId),
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
...additionalProperties,
};

return context;
Comment on lines +472 to +484
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const context: Context = {
[inspect.custom]: () => '[[ExecutionContext]]',
require: this.requireResolver,
module: {},
console: this.buildCustomConsole(taskId),
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
...additionalProperties,
};
return context;
return {
[inspect.custom]: () => '[[ExecutionContext]]',
require: this.requireResolver,
module: {},
console: this.buildCustomConsole(taskId),
$getWorkflowStaticData: (type: 'global' | 'node') => workflow.getStaticData(type, node),
...this.getNativeVariables(),
...dataProxy,
...this.buildRpcCallObject(taskId),
...additionalProperties,
};

}
}
16 changes: 8 additions & 8 deletions packages/@n8n/task-runner/src/task-runner.ts
Original file line number Diff line number Diff line change
@@ -452,15 +452,15 @@ export abstract class TaskRunner extends EventEmitter {
});
});

this.send({
type: 'runner:rpc',
callId,
taskId,
name,
params,
});

try {
this.send({
type: 'runner:rpc',
callId,
taskId,
name,
params,
});

Comment on lines +456 to +463
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no catch block here? Shouldn't we error the task if RPC fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RPC calls are initiated by user code. Hence it's up to the user code to handle or ignore them. If the user code throws, we of course error the task.

const returnValue = await dataPromise;

return isSerializedBuffer(returnValue) ? toBuffer(returnValue) : returnValue;
Loading