Skip to content

Commit

Permalink
move instanceRole to InstanceSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jul 30, 2024
1 parent a2d0884 commit 5669e21
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 48 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class License {
* This ensures the mains do not cause a 429 (too many requests) on license init.
*/
if (config.getEnv('multiMainSetup.enabled')) {
return autoRenewEnabled && config.getEnv('instanceRole') === 'leader';
return autoRenewEnabled && this.instanceSettings.isLeader;
}

return autoRenewEnabled;
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class Start extends BaseCommand {
await this.initOrchestration();
this.logger.debug('Orchestration init complete');

if (!config.getEnv('license.autoRenewEnabled') && config.getEnv('instanceRole') === 'leader') {
if (!config.getEnv('license.autoRenewEnabled') && this.instanceSettings.isLeader) {
this.logger.warn(
'Automatic license renewal is disabled. The license will not renew automatically, and access to licensed features may be lost!',
);
Expand All @@ -208,7 +208,7 @@ export class Start extends BaseCommand {

async initOrchestration() {
if (config.getEnv('executions.mode') === 'regular') {
config.set('instanceRole', 'leader');
this.instanceSettings.markAsLeader();
return;
}

Expand Down
6 changes: 0 additions & 6 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -883,12 +883,6 @@ export const schema = {
},
},

instanceRole: {
doc: 'Always `leader` in single-main setup. `leader` or `follower` in multi-main setup.',
format: ['unset', 'leader', 'follower'] as const,
default: 'unset', // only until Start.initOrchestration
},

multiMainSetup: {
enabled: {
doc: 'Whether to enable multi-main setup for queue mode (license required)',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Container from 'typedi';
import { stringify } from 'flatted';
import { randomInt } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';

import { mockInstance } from '@test/mocking';
import { createWorkflow } from '@test-integration/db/workflows';
Expand All @@ -21,38 +22,37 @@ import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNod
import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants';
import { setupMessages } from './utils';

import type { EventService } from '@/eventbus/event.service';
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import type { Logger } from '@/Logger';

describe('ExecutionRecoveryService', () => {
let push: Push;
const push = mockInstance(Push);
mockInstance(InternalHooks);
const instanceSettings = new InstanceSettings();

let executionRecoveryService: ExecutionRecoveryService;
let orchestrationService: OrchestrationService;
let executionRepository: ExecutionRepository;

beforeAll(async () => {
await testDb.init();
push = mockInstance(Push);
executionRepository = Container.get(ExecutionRepository);
orchestrationService = Container.get(OrchestrationService);

mockInstance(InternalHooks);
executionRecoveryService = new ExecutionRecoveryService(
mock<Logger>(),
mock(),
instanceSettings,
push,
executionRepository,
orchestrationService,
mock<EventService>(),
mock(),
);
});

beforeEach(() => {
config.set('instanceRole', 'leader');
instanceSettings.markAsLeader();
});

afterEach(async () => {
config.load(config.default);
jest.restoreAllMocks();
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
executionRecoveryService.shutdown();
Expand All @@ -69,7 +69,6 @@ describe('ExecutionRecoveryService', () => {
* Arrange
*/
config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
Expand All @@ -88,7 +87,7 @@ describe('ExecutionRecoveryService', () => {
* Arrange
*/
config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
instanceSettings.markAsFollower();
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
Expand Down Expand Up @@ -130,7 +129,7 @@ describe('ExecutionRecoveryService', () => {
/**
* Arrange
*/
config.set('instanceRole', 'follower');
instanceSettings.markAsFollower();
// @ts-expect-error Private method
const amendSpy = jest.spyOn(executionRecoveryService, 'amend');
const messages = setupMessages('123', 'Some workflow');
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO
import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
import type { DateTime } from 'luxon';
import type { IRun, ITaskData } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';
import type { EventMessageTypes } from '../eventbus/EventMessageClasses';
import type { IExecutionResponse } from '@/Interfaces';
import { NodeCrashedError } from '@/errors/node-crashed.error';
Expand All @@ -25,6 +26,7 @@ import { EventService } from '@/eventbus/event.service';
export class ExecutionRecoveryService {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly push: Push,
private readonly executionRepository: ExecutionRepository,
private readonly orchestrationService: OrchestrationService,
Expand All @@ -37,10 +39,10 @@ export class ExecutionRecoveryService {
init() {
if (config.getEnv('executions.mode') === 'regular') return;

const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;

const { isLeader } = this.instanceSettings;
if (isLeader) this.scheduleQueueRecovery();

const { isMultiMainSetupEnabled } = this.orchestrationService;
if (isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup
.on('leader-takeover', () => this.scheduleQueueRecovery())
Expand All @@ -59,7 +61,7 @@ export class ExecutionRecoveryService {
* Recover key properties of a truncated execution using event logs.
*/
async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) {
if (this.orchestrationService.isFollower) return;
if (this.instanceSettings.isFollower) return;

const amendedExecution = await this.amend(executionId, messages);

Expand Down Expand Up @@ -332,7 +334,7 @@ export class ExecutionRecoveryService {
private shouldScheduleQueueRecovery() {
return (
config.getEnv('executions.mode') === 'queue' &&
config.getEnv('instanceRole') === 'leader' &&
this.instanceSettings.isLeader &&
!this.isShuttingDown
);
}
Expand Down
10 changes: 7 additions & 3 deletions packages/cli/src/services/orchestration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis
import { RedisService } from './redis.service';
import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee';
import type { WorkflowActivateMode } from 'n8n-workflow';
import { InstanceSettings } from 'n8n-core';

@Service()
export class OrchestrationService {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly redisService: RedisService,
readonly multiMainSetup: MultiMainSetup,
) {}
Expand Down Expand Up @@ -43,12 +45,14 @@ export class OrchestrationService {
return config.getEnv('redis.queueModeId');
}

/** @deprecated use InstanceSettings.isLeader */
get isLeader() {
return config.getEnv('instanceRole') === 'leader';
return this.instanceSettings.isLeader;
}

/** @deprecated use InstanceSettings.isFollower */
get isFollower() {
return config.getEnv('instanceRole') !== 'leader';
return this.instanceSettings.isFollower;
}

sanityCheck() {
Expand All @@ -63,7 +67,7 @@ export class OrchestrationService {
if (this.isMultiMainSetupEnabled) {
await this.multiMainSetup.init();
} else {
config.set('instanceRole', 'leader');
this.instanceSettings.markAsLeader();
}

this.isInitialized = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import config from '@/config';
import { Service } from 'typedi';
import { TIME } from '@/constants';
import { InstanceSettings } from 'n8n-core';
import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
import { Logger } from '@/Logger';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
Expand All @@ -16,6 +17,7 @@ type MultiMainEvents = {
export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly redisPublisher: RedisServicePubSubPublisher,
private readonly redisClientService: RedisClientService,
) {
Expand Down Expand Up @@ -50,7 +52,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
async shutdown() {
clearInterval(this.leaderCheckInterval);

const isLeader = config.getEnv('instanceRole') === 'leader';
const { isLeader } = this.instanceSettings;

if (isLeader) await this.redisPublisher.clear(this.leaderKey);
}
Expand All @@ -69,8 +71,8 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (leaderId && leaderId !== this.instanceId) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`);

if (config.getEnv('instanceRole') === 'leader') {
config.set('instanceRole', 'follower');
if (this.instanceSettings.isLeader) {
this.instanceSettings.markAsFollower();

this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery

Expand All @@ -85,7 +87,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
`[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`,
);

config.set('instanceRole', 'follower');
this.instanceSettings.markAsFollower();

/**
* Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery
Expand All @@ -106,7 +108,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
if (keySetSuccessfully) {
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`);

config.set('instanceRole', 'leader');
this.instanceSettings.markAsLeader();

await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);

Expand All @@ -115,7 +117,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
*/
this.emit('leader-takeover');
} else {
config.set('instanceRole', 'follower');
this.instanceSettings.markAsFollower();
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/cli/src/services/pruning.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import { BinaryDataService, InstanceSettings } from 'n8n-core';
import { inTest, TIME } from '@/constants';
import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository';
Expand All @@ -25,6 +25,7 @@ export class PruningService {

constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService,
Expand Down Expand Up @@ -56,7 +57,7 @@ export class PruningService {
if (
config.getEnv('multiMainSetup.enabled') &&
config.getEnv('generic.instanceType') === 'main' &&
config.getEnv('instanceRole') === 'follower'
this.instanceSettings.isFollower
) {
return false;
}
Expand Down
8 changes: 5 additions & 3 deletions packages/cli/test/integration/pruning.service.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import config from '@/config';
import { BinaryDataService } from 'n8n-core';
import { BinaryDataService, InstanceSettings } from 'n8n-core';
import type { ExecutionStatus } from 'n8n-workflow';
import Container from 'typedi';

Expand All @@ -15,10 +15,11 @@ import { mockInstance } from '../shared/mocking';
import { createWorkflow } from './shared/db/workflows';
import { createExecution, createSuccessfulExecution } from './shared/db/executions';
import { mock } from 'jest-mock-extended';
import type { OrchestrationService } from '@/services/orchestration.service';

describe('softDeleteOnPruningCycle()', () => {
let pruningService: PruningService;
const instanceSettings = new InstanceSettings();
instanceSettings.markAsLeader();

const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY);
Expand All @@ -29,9 +30,10 @@ describe('softDeleteOnPruningCycle()', () => {

pruningService = new PruningService(
mockInstance(Logger),
instanceSettings,
Container.get(ExecutionRepository),
mockInstance(BinaryDataService),
mock<OrchestrationService>(),
mock(),
);

workflow = await createWorkflow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ describe('EnterpriseWorkflowService', () => {
Container.get(CredentialsRepository),
mock(),
mock(),
mock(),
mock(),
);
});

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/unit/WaitTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup);

const execution = mock<IExecutionResponse>({
id: '123',
Expand Down
Loading

0 comments on commit 5669e21

Please # to comment.