Skip to content

Commit

Permalink
Merge branch 'master' into ai-635-add-tags-or-annotations-any-tags-or…
Browse files Browse the repository at this point in the history
…-thumbs-up-thumbs-down

# Conflicts:
#	packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts
  • Loading branch information
burivuhster committed Feb 13, 2025
2 parents 7fc7701 + 0c6eb6e commit 48933e9
Show file tree
Hide file tree
Showing 136 changed files with 4,390 additions and 326 deletions.
2 changes: 1 addition & 1 deletion docker/images/n8n-custom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN find . -type f -name "*.ts" -o -name "*.js.map" -o -name "*.vue" -o -name "t

# Deploy the `n8n` package into /compiled
RUN mkdir /compiled
RUN NODE_ENV=production DOCKER_BUILD=true pnpm --filter=n8n --prod --no-optional deploy /compiled
RUN NODE_ENV=production DOCKER_BUILD=true pnpm --filter=n8n --prod --no-optional --legacy deploy /compiled

# 2. Start with a new clean image with just the code that is needed to run n8n
FROM n8nio/base:${NODE_VERSION}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"private": true,
"engines": {
"node": ">=20.15",
"pnpm": ">=9.15"
"pnpm": ">=10.2.1"
},
"packageManager": "pnpm@9.15.5",
"packageManager": "pnpm@10.2.1",
"scripts": {
"prepare": "node scripts/prepare.mjs",
"preinstall": "node scripts/block-npm-install.js",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
type INodeType,
type INodeTypeDescription,
NodeOperationError,
parseErrorMetadata,
} from 'n8n-workflow';

import { promptTypeOptions, textFromPreviousNode } from '@utils/descriptions';
Expand Down Expand Up @@ -229,7 +230,12 @@ export class ChainRetrievalQa implements INodeType {
returnData.push({ json: { response } });
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } });
const metadata = parseErrorMetadata(error);
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
metadata,
});
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
jsonParse,
NodeConnectionType,
NodeOperationError,
parseErrorMetadata,
traverseNodeParameters,
} from 'n8n-workflow';
import { z } from 'zod';
Expand Down Expand Up @@ -92,7 +93,9 @@ export class WorkflowToolService {
} catch (error) {
const executionError = error as ExecutionError;
const errorResponse = `There was an error: "${executionError.message}"`;
void this.context.addOutputData(NodeConnectionType.AiTool, index, executionError);

const metadata = parseErrorMetadata(error);
void this.context.addOutputData(NodeConnectionType.AiTool, index, executionError, metadata);
return errorResponse;
} finally {
// @ts-expect-error this accesses a private member on the actual implementation to fix https://linear.app/n8n/issue/ADO-3186/bug-workflowtool-v2-always-uses-first-row-of-input-data
Expand Down
4 changes: 3 additions & 1 deletion packages/@n8n/nodes-langchain/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type {
ISupplyDataFunctions,
ITaskMetadata,
} from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType, parseErrorMetadata } from 'n8n-workflow';

import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers';
import { N8nBinaryLoader } from './N8nBinaryLoader';
Expand All @@ -41,10 +41,12 @@ export async function callMethodAsync<T>(
functionality: 'configuration-node',
});

const metadata = parseErrorMetadata(error);
parameters.executeFunctions.addOutputData(
parameters.connectionType,
parameters.currentNodeRunIndex,
error,
metadata,
);

if (error.message) {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/__tests__/workflow-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
ITaskData,
IWaitingForExecution,
IWaitingForExecutionSource,
IWorkflowBase,
IWorkflowExecutionDataProcess,
StartNodeData,
} from 'n8n-workflow';
Expand All @@ -19,7 +20,6 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { User } from '@/databases/entities/user';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { Telemetry } from '@/telemetry';
import { PermissionChecker } from '@/user-management/permission-checker';
Expand Down Expand Up @@ -53,7 +53,7 @@ beforeEach(async () => {
});

describe('processError', () => {
let workflow: WorkflowEntity;
let workflow: IWorkflowBase;
let execution: ExecutionEntity;
let hooks: core.ExecutionLifecycleHooks;

Expand Down
35 changes: 17 additions & 18 deletions packages/cli/src/active-workflow-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ import {
WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
WORKFLOW_REACTIVATE_MAX_TIMEOUT,
} from '@/constants';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { OnShutdown } from '@/decorators/on-shutdown';
import { executeErrorWorkflow } from '@/execution-lifecycle/execute-error-workflow';
import { ExecutionService } from '@/executions/execution.service';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowDb } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { ActiveWorkflowsService } from '@/services/active-workflows.service';
Expand All @@ -55,12 +53,13 @@ import { WebhookService } from '@/webhooks/webhook.service';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { formatWorkflow } from '@/workflows/workflow.formatter';

interface QueuedActivation {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
workflowData: IWorkflowBase;
}

@Service()
Expand Down Expand Up @@ -271,7 +270,7 @@ export class ActiveWorkflowManager {
* and overwrites the emit to be able to start it in subprocess
*/
getExecutePollFunctions(
workflowData: IWorkflowDb,
workflowData: IWorkflowBase,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
Expand Down Expand Up @@ -322,7 +321,7 @@ export class ActiveWorkflowManager {
* and overwrites the emit to be able to start it in subprocess
*/
getExecuteTriggerFunctions(
workflowData: IWorkflowDb,
workflowData: IWorkflowBase,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
Expand Down Expand Up @@ -379,7 +378,7 @@ export class ActiveWorkflowManager {
);
this.executeErrorWorkflow(activationError, workflowData, mode);

this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
this.addQueuedWorkflowActivation(activation, workflowData);
};
return new TriggerContext(workflow, node, additionalData, mode, activation, emit, emitError);
};
Expand Down Expand Up @@ -436,30 +435,30 @@ export class ActiveWorkflowManager {
}

private async activateWorkflow(
dbWorkflow: WorkflowEntity,
dbWorkflow: IWorkflowBase,
activationMode: 'init' | 'leadershipChange',
) {
try {
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
shouldPublish: false,
});
if (wasActivated) {
this.logger.info(` - ${dbWorkflow.display()})`);
this.logger.info(` - ${formatWorkflow(dbWorkflow)})`);
this.logger.info(' => Started');
this.logger.debug(`Successfully started workflow ${dbWorkflow.display()}`, {
this.logger.debug(`Successfully started workflow ${formatWorkflow(dbWorkflow)}`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
}
} catch (error) {
this.errorReporter.error(error);
this.logger.info(
` => ERROR: Workflow ${dbWorkflow.display()} could not be activated on first try, keep on trying if not an auth issue`,
` => ERROR: Workflow ${formatWorkflow(dbWorkflow)} could not be activated on first try, keep on trying if not an auth issue`,
);

this.logger.info(` ${error.message}`);
this.logger.error(
`Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`,
`Issue on initial workflow activation try of ${formatWorkflow(dbWorkflow)} (startup)`,
{
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
Expand Down Expand Up @@ -518,7 +517,7 @@ export class ActiveWorkflowManager {
async add(
workflowId: string,
activationMode: WorkflowActivateMode,
existingWorkflow?: WorkflowEntity,
existingWorkflow?: IWorkflowBase,
{ shouldPublish } = { shouldPublish: true },
) {
if (this.instanceSettings.isMultiMain && shouldPublish) {
Expand Down Expand Up @@ -549,7 +548,7 @@ export class ActiveWorkflowManager {
}

if (shouldDisplayActivationMessage) {
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
this.logger.debug(`Initializing active workflow ${formatWorkflow(dbWorkflow)} (startup)`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
Expand All @@ -570,7 +569,7 @@ export class ActiveWorkflowManager {

if (!canBeActivated) {
throw new WorkflowActivationError(
`Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
`Workflow ${formatWorkflow(dbWorkflow)} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
{ level: 'warning' },
);
}
Expand Down Expand Up @@ -673,7 +672,7 @@ export class ActiveWorkflowManager {
*/
private addQueuedWorkflowActivation(
activationMode: WorkflowActivateMode,
workflowData: WorkflowEntity,
workflowData: IWorkflowBase,
) {
const workflowId = workflowData.id;
const workflowName = workflowData.name;
Expand Down Expand Up @@ -811,7 +810,7 @@ export class ActiveWorkflowManager {
* Register as active in memory a trigger- or poller-based workflow.
*/
async addTriggersAndPollers(
dbWorkflow: WorkflowEntity,
dbWorkflow: IWorkflowBase,
workflow: Workflow,
{
activationMode,
Expand All @@ -838,7 +837,7 @@ export class ActiveWorkflowManager {
);

if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
this.logger.debug(`Adding triggers and pollers for workflow ${formatWorkflow(dbWorkflow)}`);

await this.activeWorkflows.add(
workflow.id,
Expand All @@ -850,7 +849,7 @@ export class ActiveWorkflowManager {
getPollFunctions,
);

this.logger.debug(`Workflow ${dbWorkflow.display()} activated`, {
this.logger.debug(`Workflow ${formatWorkflow(dbWorkflow)} activated`, {
workflowId: dbWorkflow.id,
workflowName: dbWorkflow.name,
});
Expand Down
11 changes: 5 additions & 6 deletions packages/cli/src/commands/execute-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import { Flags } from '@oclif/core';
import fs from 'fs';
import { diff } from 'json-diff';
import pick from 'lodash/pick';
import type { IRun, ITaskData, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import type { IRun, ITaskData, IWorkflowBase, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import os from 'os';
import { sep } from 'path';

import { ActiveExecutions } from '@/active-executions';
import type { User } from '@/databases/entities/user';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { IWorkflowDb } from '@/interfaces';
import { OwnershipService } from '@/services/ownership.service';
import { findCliWorkflowStart } from '@/utils';
import { WorkflowRunner } from '@/workflow-runner';
Expand Down Expand Up @@ -275,7 +274,7 @@ export class ExecuteBatch extends BaseCommand {
query.andWhere('workflows.id not in (:...skipIds)', { skipIds });
}

const allWorkflows = (await query.getMany()) as IWorkflowDb[];
const allWorkflows = (await query.getMany()) as IWorkflowBase[];

if (ExecuteBatch.debug) {
process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`);
Expand Down Expand Up @@ -378,7 +377,7 @@ export class ExecuteBatch extends BaseCommand {
});
}

private async runTests(allWorkflows: IWorkflowDb[]): Promise<IResult> {
private async runTests(allWorkflows: IWorkflowBase[]): Promise<IResult> {
const result: IResult = {
totalWorkflows: allWorkflows.length,
slackMessage: '',
Expand All @@ -401,7 +400,7 @@ export class ExecuteBatch extends BaseCommand {
const promisesArray = [];
for (let i = 0; i < ExecuteBatch.concurrency; i++) {
const promise = new Promise(async (resolve) => {
let workflow: IWorkflowDb | undefined;
let workflow: IWorkflowBase | undefined;
while (allWorkflows.length > 0) {
workflow = allWorkflows.shift();
if (ExecuteBatch.cancelled) {
Expand Down Expand Up @@ -563,7 +562,7 @@ export class ExecuteBatch extends BaseCommand {
}
}

async startThread(workflowData: IWorkflowDb): Promise<IExecutionResult> {
async startThread(workflowData: IWorkflowBase): Promise<IExecutionResult> {
// This will be the object returned by the promise.
// It will be updated according to execution progress below.
const executionResult: IExecutionResult = {
Expand Down
15 changes: 8 additions & 7 deletions packages/cli/src/commands/import/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Container } from '@n8n/di';
import { Flags } from '@oclif/core';
import glob from 'fast-glob';
import fs from 'fs';
import type { IWorkflowBase, WorkflowId } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';

import { UM_FIX_INSTRUCTION } from '@/constants';
Expand Down Expand Up @@ -102,7 +103,7 @@ export class ImportWorkflowsCommand extends BaseCommand {
this.reportSuccess(workflows.length);
}

private async checkRelations(workflows: WorkflowEntity[], projectId?: string, userId?: string) {
private async checkRelations(workflows: IWorkflowBase[], projectId?: string, userId?: string) {
// The credential is not supposed to be re-owned.
if (!userId && !projectId) {
return {
Expand All @@ -112,11 +113,11 @@ export class ImportWorkflowsCommand extends BaseCommand {
}

for (const workflow of workflows) {
if (!(await this.workflowExists(workflow))) {
if (!(await this.workflowExists(workflow.id))) {
continue;
}

const { user, project: ownerProject } = await this.getWorkflowOwner(workflow);
const { user, project: ownerProject } = await this.getWorkflowOwner(workflow.id);

if (!ownerProject) {
continue;
Expand Down Expand Up @@ -155,9 +156,9 @@ export class ImportWorkflowsCommand extends BaseCommand {
this.logger.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
}

private async getWorkflowOwner(workflow: WorkflowEntity) {
private async getWorkflowOwner(workflowId: WorkflowId) {
const sharing = await Container.get(SharedWorkflowRepository).findOne({
where: { workflowId: workflow.id, role: 'workflow:owner' },
where: { workflowId, role: 'workflow:owner' },
relations: { project: true },
});

Expand All @@ -175,8 +176,8 @@ export class ImportWorkflowsCommand extends BaseCommand {
return {};
}

private async workflowExists(workflow: WorkflowEntity) {
return await Container.get(WorkflowRepository).existsBy({ id: workflow.id });
private async workflowExists(workflowId: WorkflowId) {
return await Container.get(WorkflowRepository).existsBy({ id: workflowId });
}

private async readWorkflows(path: string, separate: boolean): Promise<WorkflowEntity[]> {
Expand Down
Loading

0 comments on commit 48933e9

Please # to comment.