diff --git a/package.json b/package.json index c6ba534c..6a5326c4 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "main": "index.js", "scripts": { "dev": "tsx watch src/start/server.ts | pino-pretty", - "build": "node ./build.js && resolve-tspaths", + "build": "tsc -noEmit && node ./build.js && resolve-tspaths", "start": "NODE_ENV=production node dist/start/server.js", "format": "prettier -c --write src/**", "lint": "prettier -v && prettier -c src/**", diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 9e0b3b87..23d0853b 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -1,22 +1,24 @@ import fastifyPlugin from 'fastify-plugin' import { getConfig, MultitenantMigrationStrategy } from '../../config' import { - areMigrationsUpToDate, getServiceKeyUser, getTenantConfig, TenantMigrationStatus, - updateTenantMigrationsState, TenantConnection, getPostgresConnection, - progressiveMigrations, - runMigrationsOnTenant, - hasMissingSyncMigration, - DBMigration, - lastMigrationName, } from '@internal/database' import { verifyJWT } from '@internal/auth' import { logSchema } from '@internal/monitoring' import { createMutexByKey } from '@internal/concurrency' +import { + areMigrationsUpToDate, + DBMigration, + hasMissingSyncMigration, + lastMigrationName, + progressiveMigrations, + runMigrationsOnTenant, + updateTenantMigrationsState, +} from '@internal/database/migrations' declare module 'fastify' { interface FastifyRequest { diff --git a/src/http/routes/admin/migrations.ts b/src/http/routes/admin/migrations.ts index 64d4ab7c..ac495ff6 100644 --- a/src/http/routes/admin/migrations.ts +++ b/src/http/routes/admin/migrations.ts @@ -1,9 +1,10 @@ import { FastifyInstance } from 'fastify' import { Queue } from '@internal/queue' -import { multitenantKnex, runMigrationsOnAllTenants } from '@internal/database' +import { multitenantKnex } from '@internal/database' import { RunMigrationsOnTenants } from '@storage/events' import apiKey from '../../plugins/apikey' import { getConfig } from '../../../config' +import { runMigrationsOnAllTenants } from '@internal/database/migrations' const { pgQueueEnable } = getConfig() diff --git a/src/http/routes/admin/tenants.ts b/src/http/routes/admin/tenants.ts index e6986172..1f6ceda6 100644 --- a/src/http/routes/admin/tenants.ts +++ b/src/http/routes/admin/tenants.ts @@ -2,15 +2,13 @@ import { FastifyInstance, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import apiKey from '../../plugins/apikey' import { decrypt, encrypt } from '@internal/auth' +import { deleteTenantConfig, TenantMigrationStatus, multitenantKnex } from '@internal/database' +import { dbSuperUser, storage } from '../../plugins' import { - deleteTenantConfig, - TenantMigrationStatus, - multitenantKnex, lastMigrationName, - runMigrationsOnTenant, progressiveMigrations, -} from '@internal/database' -import { dbSuperUser, storage } from '../../plugins' + runMigrationsOnTenant, +} from '@internal/database/migrations' const patchSchema = { body: { @@ -292,6 +290,9 @@ export default async function routes(fastify: FastifyInstance) { migrations_status: TenantMigrationStatus.COMPLETED, }) } catch (e) { + if (e instanceof Error) { + request.executionError = e + } progressiveMigrations.addTenant(tenantId) } } diff --git a/src/internal/database/index.ts b/src/internal/database/index.ts index 4b95f71a..2d3060a5 100644 --- a/src/internal/database/index.ts +++ b/src/internal/database/index.ts @@ -1,6 +1,5 @@ export * from './multitenant-db' export * from './tenant' export * from './connection' -export * from './migrations' export * from './client' export * from './pubsub' diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index d5abcf74..32a188f6 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -7,12 +7,12 @@ import { BasicPgClient, Migration } from 'postgres-migrations/dist/types' import { validateMigrationHashes } from 'postgres-migrations/dist/validation' import { runMigration } from 'postgres-migrations/dist/run-migration' import { searchPath } from '../connection' -import { getTenantConfig, listTenantsToMigrate } from '../tenant' +import { getTenantConfig, TenantMigrationStatus } from '../tenant' import { multitenantKnex } from '../multitenant-db' import { ProgressiveMigrations } from './progressive' import { RunMigrationsOnTenants } from '@storage/events' import { ERRORS } from '@internal/errors' -import { DBMigration } from '@internal/database' +import { DBMigration } from './types' const { multitenantDatabaseUrl, @@ -81,6 +81,86 @@ export async function lastMigrationName() { return migrations[migrations.length - 1].name as keyof typeof DBMigration } +/** + * List all tenants that needs to have the migrations run + */ +export async function* listTenantsToMigrate(signal: AbortSignal) { + let lastCursor = 0 + + while (true) { + if (signal.aborted) { + break + } + + const migrationVersion = await lastMigrationName() + + const data = await multitenantKnex + .table<{ id: string; cursor_id: number }>('tenants') + .select('id', 'cursor_id') + .where('cursor_id', '>', lastCursor) + .where((builder) => { + builder + .where((whereBuilder) => { + whereBuilder + .where('migrations_version', '!=', migrationVersion) + .whereNotIn('migrations_status', [ + TenantMigrationStatus.FAILED, + TenantMigrationStatus.FAILED_STALE, + ]) + }) + .orWhere('migrations_status', null) + }) + .orderBy('cursor_id', 'asc') + .limit(200) + + if (data.length === 0) { + break + } + + lastCursor = data[data.length - 1].cursor_id + yield data.map((tenant) => tenant.id) + } +} + +/** + * Update tenant migration version and status + * @param tenantId + * @param options + */ +export async function updateTenantMigrationsState( + tenantId: string, + options?: { state: TenantMigrationStatus } +) { + const migrationVersion = await lastMigrationName() + const state = options?.state || TenantMigrationStatus.COMPLETED + return multitenantKnex + .table('tenants') + .where('id', tenantId) + .update({ + migrations_version: [ + TenantMigrationStatus.FAILED, + TenantMigrationStatus.FAILED_STALE, + ].includes(state) + ? undefined + : migrationVersion, + migrations_status: state, + }) +} + +/** + * Determine if a tenant has the migrations up to date + * @param tenantId + */ +export async function areMigrationsUpToDate(tenantId: string) { + const latestMigrationVersion = await lastMigrationName() + const tenant = await getTenantConfig(tenantId) + + return ( + latestMigrationVersion === tenant.migrationVersion && + tenant.migrationStatus === TenantMigrationStatus.COMPLETED + ) +} + export async function hasMissingSyncMigration(tenantId: string) { const { migrationVersion, migrationStatus } = await getTenantConfig(tenantId) const migrations = await loadMigrationFilesCached('./migrations/tenant') diff --git a/src/internal/database/migrations/progressive.ts b/src/internal/database/migrations/progressive.ts index 9ae647e8..d6b11dfc 100644 --- a/src/internal/database/migrations/progressive.ts +++ b/src/internal/database/migrations/progressive.ts @@ -1,6 +1,7 @@ import { logger, logSchema } from '../../monitoring' -import { areMigrationsUpToDate, getTenantConfig, TenantMigrationStatus } from '../tenant' +import { getTenantConfig, TenantMigrationStatus } from '../tenant' import { RunMigrationsOnTenants } from '@storage/events' +import { areMigrationsUpToDate } from '@internal/database/migrations/migrate' export class ProgressiveMigrations { protected tenants: string[] = [] diff --git a/src/internal/database/tenant.ts b/src/internal/database/tenant.ts index bda661d7..3797fac6 100644 --- a/src/internal/database/tenant.ts +++ b/src/internal/database/tenant.ts @@ -7,7 +7,6 @@ import { PubSubAdapter } from '../pubsub' import { createMutexByKey } from '../concurrency' import { LRUCache } from 'lru-cache' import objectSizeOf from 'object-sizeof' -import { lastMigrationName } from './migrations/migrate' import { ERRORS } from '@internal/errors' import { DBMigration } from '@internal/database/migrations' @@ -88,86 +87,6 @@ const singleTenantServiceKey: } : undefined -/** - * List all tenants that needs to have the migrations run - */ -export async function* listTenantsToMigrate(signal: AbortSignal) { - let lastCursor = 0 - - while (true) { - if (signal.aborted) { - break - } - - const migrationVersion = await lastMigrationName() - - const data = await multitenantKnex - .table<{ id: string; cursor_id: number }>('tenants') - .select('id', 'cursor_id') - .where('cursor_id', '>', lastCursor) - .where((builder) => { - builder - .where((whereBuilder) => { - whereBuilder - .where('migrations_version', '!=', migrationVersion) - .whereNotIn('migrations_status', [ - TenantMigrationStatus.FAILED, - TenantMigrationStatus.FAILED_STALE, - ]) - }) - .orWhere('migrations_status', null) - }) - .orderBy('cursor_id', 'asc') - .limit(200) - - if (data.length === 0) { - break - } - - lastCursor = data[data.length - 1].cursor_id - yield data.map((tenant) => tenant.id) - } -} - -/** - * Update tenant migration version and status - * @param tenantId - * @param options - */ -export async function updateTenantMigrationsState( - tenantId: string, - options?: { state: TenantMigrationStatus } -) { - const migrationVersion = await lastMigrationName() - const state = options?.state || TenantMigrationStatus.COMPLETED - return multitenantKnex - .table('tenants') - .where('id', tenantId) - .update({ - migrations_version: [ - TenantMigrationStatus.FAILED, - TenantMigrationStatus.FAILED_STALE, - ].includes(state) - ? undefined - : migrationVersion, - migrations_status: state, - }) -} - -/** - * Determine if a tenant has the migrations up to date - * @param tenantId - */ -export async function areMigrationsUpToDate(tenantId: string) { - const latestMigrationVersion = await lastMigrationName() - const tenant = await getTenantConfig(tenantId) - - return ( - latestMigrationVersion === tenant.migrationVersion && - tenant.migrationStatus === TenantMigrationStatus.COMPLETED - ) -} - /** * Deletes tenants config from the in-memory cache * @param tenantId diff --git a/src/scripts/migrate-call.ts b/src/scripts/migrate-call.ts index b4f236e3..00e7ae6d 100644 --- a/src/scripts/migrate-call.ts +++ b/src/scripts/migrate-call.ts @@ -1,7 +1,7 @@ import dotenv from 'dotenv' dotenv.config() -import { runMigrationsOnTenant } from '@internal/database' +import { runMigrationsOnTenant } from '@internal/database/migrations' ;(async () => { await runMigrationsOnTenant(process.env.DATABASE_URL as string) })() diff --git a/src/start/server.ts b/src/start/server.ts index 34e69353..d298741d 100644 --- a/src/start/server.ts +++ b/src/start/server.ts @@ -5,19 +5,18 @@ import { IncomingMessage, Server, ServerResponse } from 'node:http' import build from '../app' import buildAdmin from '../admin-app' import { getConfig } from '../config' -import { - runMultitenantMigrations, - runMigrationsOnTenant, - startAsyncMigrations, - listenForTenantUpdate, - PubSub, -} from '@internal/database' +import { listenForTenantUpdate, PubSub } from '@internal/database' import { logger, logSchema } from '@internal/monitoring' import { Queue } from '@internal/queue' import { registerWorkers } from '@storage/events' import { AsyncAbortController } from '@internal/concurrency' import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown' +import { + runMigrationsOnTenant, + runMultitenantMigrations, + startAsyncMigrations, +} from '@internal/database/migrations' const shutdownSignal = new AsyncAbortController() diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 9e95f1b9..48c984d3 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -1,6 +1,7 @@ import { Bucket, S3MultipartUpload, Obj, S3PartUpload } from '../schemas' import { ObjectMetadata } from '../backend' -import { DBMigration, TenantConnection } from '@internal/database' +import { TenantConnection } from '@internal/database' +import { DBMigration } from '@internal/database/migrations' export interface SearchObjectOption { search?: string diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 0d6cabac..5f708545 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -17,9 +17,10 @@ import { SearchObjectOption, } from './adapter' import { DatabaseError } from 'pg' -import { DBMigration, TenantConnection } from '@internal/database' +import { TenantConnection } from '@internal/database' import { DbQueryPerformance } from '@internal/monitoring/metrics' import { isUuid } from '../limits' +import { DBMigration } from '@internal/database/migrations' /** * Database diff --git a/src/storage/events/run-migrations.ts b/src/storage/events/run-migrations.ts index e2cba669..631f2e20 100644 --- a/src/storage/events/run-migrations.ts +++ b/src/storage/events/run-migrations.ts @@ -1,14 +1,13 @@ import { BaseEvent } from './base-event' -import { - areMigrationsUpToDate, - getTenantConfig, - TenantMigrationStatus, - updateTenantMigrationsState, - runMigrationsOnTenant, -} from '@internal/database' +import { getTenantConfig, TenantMigrationStatus } from '@internal/database' import { JobWithMetadata, SendOptions, WorkOptions } from 'pg-boss' import { logger, logSchema } from '@internal/monitoring' import { BasePayload } from '@internal/queue' +import { + areMigrationsUpToDate, + runMigrationsOnTenant, + updateTenantMigrationsState, +} from '@internal/database/migrations' interface RunMigrationsPayload extends BasePayload { tenantId: string