Skip to content

Commit

Permalink
fix: cleanup imports
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 13, 2025
1 parent 1612b85 commit 11477bb
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 117 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "index.js",
"scripts": {
"dev": "tsx watch src/start/server.ts | pino-pretty",
"build": "node ./build.js && resolve-tspaths",
"build": "tsc -noEmit && node ./build.js && resolve-tspaths",
"start": "NODE_ENV=production node dist/start/server.js",
"format": "prettier -c --write src/**",
"lint": "prettier -v && prettier -c src/**",
Expand Down
16 changes: 9 additions & 7 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import fastifyPlugin from 'fastify-plugin'
import { getConfig, MultitenantMigrationStrategy } from '../../config'
import {
areMigrationsUpToDate,
getServiceKeyUser,
getTenantConfig,
TenantMigrationStatus,
updateTenantMigrationsState,
TenantConnection,
getPostgresConnection,
progressiveMigrations,
runMigrationsOnTenant,
hasMissingSyncMigration,
DBMigration,
lastMigrationName,
} from '@internal/database'
import { verifyJWT } from '@internal/auth'
import { logSchema } from '@internal/monitoring'
import { createMutexByKey } from '@internal/concurrency'
import {
areMigrationsUpToDate,
DBMigration,
hasMissingSyncMigration,
lastMigrationName,
progressiveMigrations,
runMigrationsOnTenant,
updateTenantMigrationsState,
} from '@internal/database/migrations'

declare module 'fastify' {
interface FastifyRequest {
Expand Down
3 changes: 2 additions & 1 deletion src/http/routes/admin/migrations.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { FastifyInstance } from 'fastify'
import { Queue } from '@internal/queue'
import { multitenantKnex, runMigrationsOnAllTenants } from '@internal/database'
import { multitenantKnex } from '@internal/database'
import { RunMigrationsOnTenants } from '@storage/events'
import apiKey from '../../plugins/apikey'
import { getConfig } from '../../../config'
import { runMigrationsOnAllTenants } from '@internal/database/migrations'

const { pgQueueEnable } = getConfig()

Expand Down
13 changes: 7 additions & 6 deletions src/http/routes/admin/tenants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ import { FastifyInstance, RequestGenericInterface } from 'fastify'
import { FromSchema } from 'json-schema-to-ts'
import apiKey from '../../plugins/apikey'
import { decrypt, encrypt } from '@internal/auth'
import { deleteTenantConfig, TenantMigrationStatus, multitenantKnex } from '@internal/database'
import { dbSuperUser, storage } from '../../plugins'
import {
deleteTenantConfig,
TenantMigrationStatus,
multitenantKnex,
lastMigrationName,
runMigrationsOnTenant,
progressiveMigrations,
} from '@internal/database'
import { dbSuperUser, storage } from '../../plugins'
runMigrationsOnTenant,
} from '@internal/database/migrations'

const patchSchema = {
body: {
Expand Down Expand Up @@ -292,6 +290,9 @@ export default async function routes(fastify: FastifyInstance) {
migrations_status: TenantMigrationStatus.COMPLETED,
})
} catch (e) {
if (e instanceof Error) {
request.executionError = e
}
progressiveMigrations.addTenant(tenantId)
}
}
Expand Down
1 change: 0 additions & 1 deletion src/internal/database/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export * from './multitenant-db'
export * from './tenant'
export * from './connection'
export * from './migrations'
export * from './client'
export * from './pubsub'
84 changes: 82 additions & 2 deletions src/internal/database/migrations/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import { BasicPgClient, Migration } from 'postgres-migrations/dist/types'
import { validateMigrationHashes } from 'postgres-migrations/dist/validation'
import { runMigration } from 'postgres-migrations/dist/run-migration'
import { searchPath } from '../connection'
import { getTenantConfig, listTenantsToMigrate } from '../tenant'
import { getTenantConfig, TenantMigrationStatus } from '../tenant'
import { multitenantKnex } from '../multitenant-db'
import { ProgressiveMigrations } from './progressive'
import { RunMigrationsOnTenants } from '@storage/events'
import { ERRORS } from '@internal/errors'
import { DBMigration } from '@internal/database'
import { DBMigration } from './types'

const {
multitenantDatabaseUrl,
Expand Down Expand Up @@ -81,6 +81,86 @@ export async function lastMigrationName() {
return migrations[migrations.length - 1].name as keyof typeof DBMigration
}

/**
* List all tenants that needs to have the migrations run
*/
export async function* listTenantsToMigrate(signal: AbortSignal) {
let lastCursor = 0

while (true) {
if (signal.aborted) {
break
}

const migrationVersion = await lastMigrationName()

const data = await multitenantKnex
.table<{ id: string; cursor_id: number }>('tenants')
.select('id', 'cursor_id')
.where('cursor_id', '>', lastCursor)
.where((builder) => {
builder
.where((whereBuilder) => {
whereBuilder
.where('migrations_version', '!=', migrationVersion)
.whereNotIn('migrations_status', [
TenantMigrationStatus.FAILED,
TenantMigrationStatus.FAILED_STALE,
])
})
.orWhere('migrations_status', null)
})
.orderBy('cursor_id', 'asc')
.limit(200)

if (data.length === 0) {
break
}

lastCursor = data[data.length - 1].cursor_id
yield data.map((tenant) => tenant.id)
}
}

/**
* Update tenant migration version and status
* @param tenantId
* @param options
*/
export async function updateTenantMigrationsState(
tenantId: string,
options?: { state: TenantMigrationStatus }
) {
const migrationVersion = await lastMigrationName()
const state = options?.state || TenantMigrationStatus.COMPLETED
return multitenantKnex
.table('tenants')
.where('id', tenantId)
.update({
migrations_version: [
TenantMigrationStatus.FAILED,
TenantMigrationStatus.FAILED_STALE,
].includes(state)
? undefined
: migrationVersion,
migrations_status: state,
})
}

/**
* Determine if a tenant has the migrations up to date
* @param tenantId
*/
export async function areMigrationsUpToDate(tenantId: string) {
const latestMigrationVersion = await lastMigrationName()
const tenant = await getTenantConfig(tenantId)

return (
latestMigrationVersion === tenant.migrationVersion &&
tenant.migrationStatus === TenantMigrationStatus.COMPLETED
)
}

export async function hasMissingSyncMigration(tenantId: string) {
const { migrationVersion, migrationStatus } = await getTenantConfig(tenantId)
const migrations = await loadMigrationFilesCached('./migrations/tenant')
Expand Down
3 changes: 2 additions & 1 deletion src/internal/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger, logSchema } from '../../monitoring'
import { areMigrationsUpToDate, getTenantConfig, TenantMigrationStatus } from '../tenant'
import { getTenantConfig, TenantMigrationStatus } from '../tenant'
import { RunMigrationsOnTenants } from '@storage/events'
import { areMigrationsUpToDate } from '@internal/database/migrations/migrate'

export class ProgressiveMigrations {
protected tenants: string[] = []
Expand Down
81 changes: 0 additions & 81 deletions src/internal/database/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { PubSubAdapter } from '../pubsub'
import { createMutexByKey } from '../concurrency'
import { LRUCache } from 'lru-cache'
import objectSizeOf from 'object-sizeof'
import { lastMigrationName } from './migrations/migrate'
import { ERRORS } from '@internal/errors'
import { DBMigration } from '@internal/database/migrations'

Expand Down Expand Up @@ -88,86 +87,6 @@ const singleTenantServiceKey:
}
: undefined

/**
* List all tenants that needs to have the migrations run
*/
export async function* listTenantsToMigrate(signal: AbortSignal) {
let lastCursor = 0

while (true) {
if (signal.aborted) {
break
}

const migrationVersion = await lastMigrationName()

const data = await multitenantKnex
.table<{ id: string; cursor_id: number }>('tenants')
.select('id', 'cursor_id')
.where('cursor_id', '>', lastCursor)
.where((builder) => {
builder
.where((whereBuilder) => {
whereBuilder
.where('migrations_version', '!=', migrationVersion)
.whereNotIn('migrations_status', [
TenantMigrationStatus.FAILED,
TenantMigrationStatus.FAILED_STALE,
])
})
.orWhere('migrations_status', null)
})
.orderBy('cursor_id', 'asc')
.limit(200)

if (data.length === 0) {
break
}

lastCursor = data[data.length - 1].cursor_id
yield data.map((tenant) => tenant.id)
}
}

/**
* Update tenant migration version and status
* @param tenantId
* @param options
*/
export async function updateTenantMigrationsState(
tenantId: string,
options?: { state: TenantMigrationStatus }
) {
const migrationVersion = await lastMigrationName()
const state = options?.state || TenantMigrationStatus.COMPLETED
return multitenantKnex
.table('tenants')
.where('id', tenantId)
.update({
migrations_version: [
TenantMigrationStatus.FAILED,
TenantMigrationStatus.FAILED_STALE,
].includes(state)
? undefined
: migrationVersion,
migrations_status: state,
})
}

/**
* Determine if a tenant has the migrations up to date
* @param tenantId
*/
export async function areMigrationsUpToDate(tenantId: string) {
const latestMigrationVersion = await lastMigrationName()
const tenant = await getTenantConfig(tenantId)

return (
latestMigrationVersion === tenant.migrationVersion &&
tenant.migrationStatus === TenantMigrationStatus.COMPLETED
)
}

/**
* Deletes tenants config from the in-memory cache
* @param tenantId
Expand Down
2 changes: 1 addition & 1 deletion src/scripts/migrate-call.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dotenv from 'dotenv'
dotenv.config()

import { runMigrationsOnTenant } from '@internal/database'
import { runMigrationsOnTenant } from '@internal/database/migrations'
;(async () => {
await runMigrationsOnTenant(process.env.DATABASE_URL as string)
})()
13 changes: 6 additions & 7 deletions src/start/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import { IncomingMessage, Server, ServerResponse } from 'node:http'
import build from '../app'
import buildAdmin from '../admin-app'
import { getConfig } from '../config'
import {
runMultitenantMigrations,
runMigrationsOnTenant,
startAsyncMigrations,
listenForTenantUpdate,
PubSub,
} from '@internal/database'
import { listenForTenantUpdate, PubSub } from '@internal/database'
import { logger, logSchema } from '@internal/monitoring'
import { Queue } from '@internal/queue'
import { registerWorkers } from '@storage/events'
import { AsyncAbortController } from '@internal/concurrency'

import { bindShutdownSignals, createServerClosedPromise, shutdown } from './shutdown'
import {
runMigrationsOnTenant,
runMultitenantMigrations,
startAsyncMigrations,
} from '@internal/database/migrations'

const shutdownSignal = new AsyncAbortController()

Expand Down
3 changes: 2 additions & 1 deletion src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Bucket, S3MultipartUpload, Obj, S3PartUpload } from '../schemas'
import { ObjectMetadata } from '../backend'
import { DBMigration, TenantConnection } from '@internal/database'
import { TenantConnection } from '@internal/database'
import { DBMigration } from '@internal/database/migrations'

export interface SearchObjectOption {
search?: string
Expand Down
3 changes: 2 additions & 1 deletion src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import {
SearchObjectOption,
} from './adapter'
import { DatabaseError } from 'pg'
import { DBMigration, TenantConnection } from '@internal/database'
import { TenantConnection } from '@internal/database'
import { DbQueryPerformance } from '@internal/monitoring/metrics'
import { isUuid } from '../limits'
import { DBMigration } from '@internal/database/migrations'

/**
* Database
Expand Down
13 changes: 6 additions & 7 deletions src/storage/events/run-migrations.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { BaseEvent } from './base-event'
import {
areMigrationsUpToDate,
getTenantConfig,
TenantMigrationStatus,
updateTenantMigrationsState,
runMigrationsOnTenant,
} from '@internal/database'
import { getTenantConfig, TenantMigrationStatus } from '@internal/database'
import { JobWithMetadata, SendOptions, WorkOptions } from 'pg-boss'
import { logger, logSchema } from '@internal/monitoring'
import { BasePayload } from '@internal/queue'
import {
areMigrationsUpToDate,
runMigrationsOnTenant,
updateTenantMigrationsState,
} from '@internal/database/migrations'

interface RunMigrationsPayload extends BasePayload {
tenantId: string
Expand Down

0 comments on commit 11477bb

Please # to comment.