Skip to content

Commit

Permalink
convert more emitters
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jul 29, 2024
1 parent 6c5786d commit 74f3e7a
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 60 deletions.
24 changes: 11 additions & 13 deletions packages/cli/src/TypedEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,33 @@ import debounce from 'lodash/debounce';

type EventName = string;

type Listeners<L> = {
[E in Extract<keyof L, EventName>]: (...args: unknown[]) => unknown;
type Payloads<L> = {
[E in Extract<keyof L, EventName>]: unknown;
};

export class TypedEmitter<L extends Listeners<L>> extends EventEmitter {
type Listener<P> = (payload: P) => void;

export class TypedEmitter<L extends Payloads<L>> extends EventEmitter {
protected debounceWait = 300;

override on<U extends Extract<keyof L, EventName>>(event: U, listener: L[U]) {
override on<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.on(event, listener);
}

override once<U extends Extract<keyof L, EventName>>(event: U, listener: L[U]) {
override once<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.once(event, listener);
}

override off<U extends Extract<keyof L, EventName>>(event: U, listener: L[U]) {
override off<U extends Extract<keyof L, EventName>>(event: U, listener: Listener<L[U]>) {
return super.off(event, listener);
}

override emit<U extends Extract<keyof L, EventName>>(
event: U,
...args: Parameters<L[U]>
): boolean {
return super.emit(event, ...args);
override emit<U extends Extract<keyof L, EventName>>(event: U, payload?: L[U]): boolean {
return super.emit(event, payload);
}

protected debouncedEmit = debounce(
<U extends Extract<keyof L, EventName>>(event: U, ...args: Parameters<L[U]>) =>
super.emit(event, ...args),
<U extends Extract<keyof L, EventName>>(event: U, payload?: L[U]) => super.emit(event, payload),
this.debounceWait,
);
}
9 changes: 4 additions & 5 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,10 @@ export async function executeWebhook(
NodeExecuteFunctions,
executionMode,
);
Container.get(WorkflowStatisticsService).emit(
'nodeFetchedData',
workflow.id,
workflowStartNode,
);
Container.get(WorkflowStatisticsService).emit('nodeFetchedData', {
workflowId: workflow.id,
startNode: workflowStartNode,
});
} catch (err) {
// Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';
Expand Down
22 changes: 10 additions & 12 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,16 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
);
}
} finally {
workflowStatisticsService.emit(
'workflowExecutionCompleted',
this.workflowData,
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
);
});
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', workflowId, node);
async (workflowId: string, startNode: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, startNode });
},
],
};
Expand Down Expand Up @@ -636,11 +635,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
this.retryOf,
);
} finally {
workflowStatisticsService.emit(
'workflowExecutionCompleted',
this.workflowData,
workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData,
fullRunData,
);
});
}
},
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
Expand Down Expand Up @@ -675,8 +673,8 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', workflowId, node);
async (workflowId: string, startNode: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, startNode });
},
],
};
Expand Down
14 changes: 2 additions & 12 deletions packages/cli/src/eventbus/event.service.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
import { EventEmitter } from 'node:events';
import { Service } from 'typedi';
import { TypedEmitter } from '@/TypedEmitter';
import type { Event } from './event.types';

@Service()
export class EventService extends EventEmitter {
emit<K extends keyof Event>(eventName: K, arg?: Event[K]) {
super.emit(eventName, arg);
return true;
}

on<K extends keyof Event>(eventName: K, handler: (arg: Event[K]) => void) {
super.on(eventName, handler);
return this;
}
}
export class EventService extends TypedEmitter<Event> {}
7 changes: 4 additions & 3 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { EventEmitter } from 'events';
import { ServerResponse } from 'http';
import type { Server } from 'http';
import type { Socket } from 'net';
Expand All @@ -17,6 +16,7 @@ import { OrchestrationService } from '@/services/orchestration.service';
import { SSEPush } from './sse.push';
import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import { TypedEmitter } from '@/TypedEmitter';

const useWebSockets = config.getEnv('push.backend') === 'websocket';

Expand All @@ -28,7 +28,9 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket';
* @emits message when a message is received from a client
*/
@Service()
export class Push extends EventEmitter {
export class Push extends TypedEmitter<{
editorUiConnected: string;
}> {
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);

constructor(private readonly orchestrationService: OrchestrationService) {
Expand All @@ -37,7 +39,6 @@ export class Push extends EventEmitter {

handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
user,
ws,
query: { pushRef },
} = req;
Expand Down
14 changes: 6 additions & 8 deletions packages/cli/src/services/cache/cache.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import EventEmitter from 'node:events';

import Container, { Service } from 'typedi';
import { caching } from 'cache-manager';
import { ApplicationError, jsonStringify } from 'n8n-workflow';
Expand All @@ -10,14 +8,18 @@ import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refr
import type {
TaggedRedisCache,
TaggedMemoryCache,
CacheEvent,
MaybeHash,
Hash,
} from '@/services/cache/cache.types';
import { TIME } from '@/constants';
import { TypedEmitter } from '@/TypedEmitter';

@Service()
export class CacheService extends EventEmitter {
export class CacheService extends TypedEmitter<{
'metrics.cache.hit': never;
'metrics.cache.miss': never;
'metrics.cache.update': never;
}> {
private cache: TaggedRedisCache | TaggedMemoryCache;

async init() {
Expand Down Expand Up @@ -66,10 +68,6 @@ export class CacheService extends EventEmitter {
await this.cache.store.reset();
}

emit(event: CacheEvent, ...args: unknown[]) {
return super.emit(event, ...args);
}

isRedis() {
return this.cache.kind === 'redis';
}
Expand Down
2 changes: 0 additions & 2 deletions packages/cli/src/services/cache/cache.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@ export type TaggedMemoryCache = MemoryCache & { kind: 'memory' };
export type Hash<T = unknown> = Record<string, T>;

export type MaybeHash<T> = Hash<T> | undefined;

export type CacheEvent = `metrics.cache.${'hit' | 'miss' | 'update'}`;
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { EventEmitter } from 'node:events';
import config from '@/config';
import { Service } from 'typedi';
import { TIME } from '@/constants';
import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
import { Logger } from '@/Logger';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import { RedisClientService } from '@/services/redis/redis-client.service';
import { TypedEmitter } from '@/TypedEmitter';

@Service()
export class MultiMainSetup extends EventEmitter {
export class MultiMainSetup extends TypedEmitter<{
'leader-stepdown': never;
'leader-takeover': never;
}> {
constructor(
private readonly logger: Logger,
private readonly redisPublisher: RedisServicePubSubPublisher,
Expand Down
25 changes: 22 additions & 3 deletions packages/cli/src/services/workflow-statistics.service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
import { EventEmitter } from 'events';
import { Container, Service } from 'typedi';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository';
import { UserService } from '@/services/user.service';
import { Logger } from '@/Logger';
import { OwnershipService } from './ownership.service';
import { TypedEmitter } from '@/TypedEmitter';

interface SuccessMetrics {
project_id: string;
workflow_id: string;
user_id: string | undefined;
}

interface DataLoadMetrics {
user_id: string | undefined;
project_id: string;
workflow_id: string;
node_type: string;
node_id: string;
}

@Service()
export class WorkflowStatisticsService extends EventEmitter {
export class WorkflowStatisticsService extends TypedEmitter<{
nodeFetchedData: { workflowId: string; startNode: INode };
workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun };
'telemetry.onFirstProductionWorkflowSuccess': SuccessMetrics;
'telemetry.onFirstWorkflowDataLoad': DataLoadMetrics;
}> {
constructor(
private readonly logger: Logger,
private readonly repository: WorkflowStatisticsRepository,
Expand Down Expand Up @@ -53,7 +72,7 @@ export class WorkflowStatisticsService extends EventEmitter {
if (project.type === 'personal') {
const owner = await Container.get(OwnershipService).getProjectOwnerCached(project.id);

const metrics = {
const metrics: SuccessMetrics = {
project_id: project.id,
workflow_id: workflowId,
user_id: owner?.id,
Expand Down

0 comments on commit 74f3e7a

Please # to comment.