From 6533a19aabd645680a30eee6b645baa93ed26cf8 Mon Sep 17 00:00:00 2001 From: fenos Date: Mon, 13 Jan 2025 10:30:10 +0100 Subject: [PATCH] feat: allow disabling specific events --- .../0014-disable-tenants-events.sql | 1 + src/http/routes/admin/tenants.ts | 7 +++++++ src/internal/database/tenant.ts | 3 +++ src/internal/queue/event.ts | 21 ++++++++++++++++++- src/storage/events/webhook.ts | 15 +++++++++++++ 5 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 migrations/multitenant/0014-disable-tenants-events.sql diff --git a/migrations/multitenant/0014-disable-tenants-events.sql b/migrations/multitenant/0014-disable-tenants-events.sql new file mode 100644 index 00000000..6e58d7b0 --- /dev/null +++ b/migrations/multitenant/0014-disable-tenants-events.sql @@ -0,0 +1 @@ +ALTER TABLE tenants ADD COLUMN IF NOT EXISTS disable_events text[] NULL; \ No newline at end of file diff --git a/src/http/routes/admin/tenants.ts b/src/http/routes/admin/tenants.ts index 37a79941..1c447c37 100644 --- a/src/http/routes/admin/tenants.ts +++ b/src/http/routes/admin/tenants.ts @@ -25,6 +25,7 @@ const patchSchema = { jwks: { type: 'object', nullable: true }, serviceKey: { type: 'string' }, tracingMode: { type: 'string' }, + disableEvents: { type: 'array', items: { type: 'string' } }, features: { type: 'object', properties: { @@ -108,6 +109,7 @@ export default async function routes(fastify: FastifyInstance) { migrations_version, migrations_status, tracing_mode, + disable_events, }) => ({ id, anonKey: decrypt(anon_key), @@ -130,6 +132,7 @@ export default async function routes(fastify: FastifyInstance) { enabled: feature_s3_protocol, }, }, + disableEvents: disable_events, }) ) }) @@ -154,6 +157,7 @@ export default async function routes(fastify: FastifyInstance) { migrations_version, migrations_status, tracing_mode, + disable_events, } = tenant return { @@ -182,6 +186,7 @@ export default async function routes(fastify: FastifyInstance) { migrationVersion: migrations_version, migrationStatus: migrations_status, tracingMode: tracing_mode, + disableEvents: disable_events, } } }) @@ -248,6 +253,7 @@ export default async function routes(fastify: FastifyInstance) { databasePoolUrl, maxConnections, tracingMode, + disableEvents, } = request.body const { tenantId } = request.params @@ -272,6 +278,7 @@ export default async function routes(fastify: FastifyInstance) { ? null : features?.imageTransformation?.maxResolution, tracing_mode: tracingMode, + disable_events: disableEvents, }) .where('id', tenantId) diff --git a/src/internal/database/tenant.ts b/src/internal/database/tenant.ts index 35ec5653..bda661d7 100644 --- a/src/internal/database/tenant.ts +++ b/src/internal/database/tenant.ts @@ -34,6 +34,7 @@ interface TenantConfig { migrationStatus?: TenantMigrationStatus syncMigrationsDone?: boolean tracingMode?: string + disableEvents?: string[] } export interface Features { @@ -213,6 +214,7 @@ export async function getTenantConfig(tenantId: string): Promise { migrations_version, migrations_status, tracing_mode, + disable_events, } = tenant const serviceKey = decrypt(service_key) @@ -243,6 +245,7 @@ export async function getTenantConfig(tenantId: string): Promise { migrationStatus: migrations_status, migrationsRun: false, tracingMode: tracing_mode, + disableEvents: disable_events, } tenantConfigCache.set(tenantId, config) diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index b408ff8f..0584da7d 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -3,6 +3,7 @@ import PgBoss, { BatchWorkOptions, Job, SendOptions, WorkOptions } from 'pg-boss import { getConfig } from '../../config' import { QueueJobScheduled, QueueJobSchedulingTime } from '@internal/monitoring/metrics' import { logger, logSchema } from '@internal/monitoring' +import { getTenantConfig } from '@internal/database' export interface BasePayload { $version?: string @@ -19,7 +20,7 @@ export interface SlowRetryQueueOptions { retryDelay: number } -const { pgQueueEnable, region } = getConfig() +const { pgQueueEnable, region, isMultitenant } = getConfig() export type StaticThis> = BaseEventConstructor @@ -125,9 +126,27 @@ export class Event> { throw new Error('not implemented') } + static async shouldSend(payload: any) { + if (isMultitenant) { + // Do not send an event if disabled for this specific tenant + const tenant = await getTenantConfig(payload.tenant.ref) + const disabledEvents = tenant.disableEvents || [] + if (disabledEvents.includes(this.eventName())) { + return false + } + } + return true + } + async send(): Promise { const constructor = this.constructor as typeof Event + const shouldSend = await constructor.shouldSend(this.payload) + + if (!shouldSend) { + return + } + if (!pgQueueEnable) { return constructor.handle({ id: '__sync', diff --git a/src/storage/events/webhook.ts b/src/storage/events/webhook.ts index 70c9e2cb..c186e03b 100644 --- a/src/storage/events/webhook.ts +++ b/src/storage/events/webhook.ts @@ -5,8 +5,10 @@ import HttpAgent from 'agentkeepalive' import axios from 'axios' import { getConfig } from '../../config' import { logger, logSchema } from '@internal/monitoring' +import { getTenantConfig } from '@internal/database' const { + isMultitenant, webhookURL, webhookApiKey, webhookQueuePullInterval, @@ -63,6 +65,19 @@ export class Webhook extends BaseEvent { } } + static async shouldSend(payload: WebhookEvent) { + if (isMultitenant) { + // Do not send an event if disabled for this specific tenant + const tenant = await getTenantConfig(payload.tenant.ref) + const disabledEvents = tenant.disableEvents || [] + if (disabledEvents.includes(payload.event.type)) { + return false + } + } + + return true + } + static async handle(job: Job) { if (!webhookURL) { logger.info('skipping webhook, no WEBHOOK_URL set')