diff --git a/src/admin-app.ts b/src/admin-app.ts index 6c828575..b531aeff 100644 --- a/src/admin-app.ts +++ b/src/admin-app.ts @@ -4,9 +4,11 @@ import { Registry } from 'prom-client' const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance): FastifyInstance => { const app = fastify(opts) + app.register(plugins.signals) app.register(plugins.adminTenantId) app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health'] })) app.register(routes.tenants, { prefix: 'tenants' }) + app.register(routes.objects, { prefix: 'tenants' }) app.register(routes.migrations, { prefix: 'migrations' }) app.register(routes.s3Credentials, { prefix: 's3' }) diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 23d0853b..9fc3074d 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -94,6 +94,7 @@ export const db = fastifyPlugin( interface DbSuperUserPluginOptions { disableHostCheck?: boolean + maxConnections?: number } export const dbSuperUser = fastifyPlugin( @@ -113,6 +114,7 @@ export const dbSuperUser = fastifyPlugin( method: request.method, headers: request.headers, disableHostCheck: opts.disableHostCheck, + maxConnections: opts.maxConnections, operation: () => request.operation?.type, }) }) diff --git a/src/http/routes/admin/index.ts b/src/http/routes/admin/index.ts index 5a35c323..c8e9f709 100644 --- a/src/http/routes/admin/index.ts +++ b/src/http/routes/admin/index.ts @@ -1,3 +1,4 @@ export { default as migrations } from './migrations' export { default as tenants } from './tenants' export { default as s3Credentials } from './s3' +export { default as objects } from './objects' diff --git a/src/http/routes/admin/objects.ts b/src/http/routes/admin/objects.ts new file mode 100644 index 00000000..d8f0a8fc --- /dev/null +++ b/src/http/routes/admin/objects.ts @@ -0,0 +1,199 @@ +import { FastifyInstance, RequestGenericInterface } from 'fastify' +import apiKey from '../../plugins/apikey' +import { dbSuperUser, storage } from '../../plugins' +import { ObjectScanner } from '@storage/scanner/scanner' +import { FastifyReply } from 'fastify/types/reply' + +const listOrphanedObjects = { + description: 'List Orphaned Objects', + params: { + type: 'object', + properties: { + tenantId: { type: 'string' }, + bucketId: { type: 'string' }, + }, + required: ['tenantId', 'bucketId'], + }, + query: { + type: 'object', + properties: { + before: { type: 'string' }, + }, + }, +} as const + +const syncOrphanedObjects = { + description: 'Sync Orphaned Objects', + params: { + type: 'object', + properties: { + tenantId: { type: 'string' }, + bucketId: { type: 'string' }, + }, + required: ['tenantId', 'bucketId'], + }, + body: { + type: 'object', + properties: { + deleteDbKeys: { type: 'boolean' }, + deleteS3Keys: { type: 'boolean' }, + }, + }, + optional: ['deleteDbKeys', 'deleteS3Keys'], +} as const + +interface ListOrphanObjectsRequest extends RequestGenericInterface { + Params: { + tenantId: string + bucketId: string + } + Querystring: { + before?: string + } +} + +interface SyncOrphanObjectsRequest extends RequestGenericInterface { + Params: { + tenantId: string + bucketId: string + } + Body: { + deleteDbKeys?: boolean + deleteS3Keys?: boolean + before?: string + } +} + +export default async function routes(fastify: FastifyInstance) { + fastify.register(apiKey) + fastify.register(dbSuperUser, { + disableHostCheck: true, + maxConnections: 5, + }) + fastify.register(storage) + + fastify.get( + '/:tenantId/buckets/:bucketId/orphan-objects', + { + schema: listOrphanedObjects, + }, + async (req, reply) => { + const bucket = req.params.bucketId + let before = req.query.before ? new Date(req.query.before as string) : undefined + + if (before && isNaN(before.getTime())) { + return reply.status(400).send({ + error: 'Invalid date format', + }) + } + if (!before) { + before = new Date() + before.setHours(before.getHours() - 1) + } + + const scanner = new ObjectScanner(req.storage) + const orphanObjects = scanner.listOrphaned(bucket, { + signal: req.signals.disconnect.signal, + before: before, + }) + + reply.header('Content-Type', 'application/json; charset=utf-8') + + // Do not let the connection time out, periodically send + // a ping message to keep the connection alive + const respPing = ping(reply) + + try { + for await (const result of orphanObjects) { + if (result.value.length > 0) { + respPing.update() + reply.raw.write( + JSON.stringify({ + ...result, + event: 'data', + }) + ) + } + } + } catch (e) { + throw e + } finally { + respPing.clear() + reply.raw.end() + } + } + ) + + fastify.delete( + '/:tenantId/buckets/:bucketId/orphan-objects', + { + schema: syncOrphanedObjects, + }, + async (req, reply) => { + if (!req.body.deleteDbKeys && !req.body.deleteS3Keys) { + return reply.status(400).send({ + error: 'At least one of deleteDbKeys or deleteS3Keys must be set to true', + }) + } + + const bucket = `${req.params.bucketId}` + let before = req.body.before ? new Date(req.body.before as string) : undefined + + if (!before) { + before = new Date() + before.setHours(before.getHours() - 1) + } + + const respPing = ping(reply) + + try { + const scanner = new ObjectScanner(req.storage) + const result = scanner.deleteOrphans(bucket, { + deleteDbKeys: req.body.deleteDbKeys, + deleteS3Keys: req.body.deleteS3Keys, + signal: req.signals.disconnect.signal, + }) + + for await (const deleted of result) { + respPing.update() + reply.raw.write( + JSON.stringify({ + ...deleted, + event: 'data', + }) + ) + } + } catch (e) { + throw e + } finally { + respPing.clear() + reply.raw.end() + } + } + ) +} + +// Occasionally write a ping message to the response stream +function ping(reply: FastifyReply) { + let lastSend = undefined as Date | undefined + const clearPing = setInterval(() => { + const fiveSecondsEarly = new Date() + fiveSecondsEarly.setSeconds(fiveSecondsEarly.getSeconds() - 5) + + if (!lastSend || (lastSend && lastSend < fiveSecondsEarly)) { + lastSend = new Date() + reply.raw.write( + JSON.stringify({ + event: 'ping', + }) + ) + } + }, 1000 * 10) + + return { + clear: () => clearInterval(clearPing), + update: () => { + lastSend = new Date() + }, + } +} diff --git a/src/internal/concurrency/index.ts b/src/internal/concurrency/index.ts index 86792b45..74a5ff2b 100644 --- a/src/internal/concurrency/index.ts +++ b/src/internal/concurrency/index.ts @@ -1,2 +1,3 @@ export * from './mutex' export * from './async-abort-controller' +export * from './merge-async-itertor' diff --git a/src/internal/concurrency/merge-async-itertor.ts b/src/internal/concurrency/merge-async-itertor.ts new file mode 100644 index 00000000..8912c685 --- /dev/null +++ b/src/internal/concurrency/merge-async-itertor.ts @@ -0,0 +1,44 @@ +type MergedYield>> = { + [K in keyof Gens]: Gens[K] extends AsyncGenerator ? { type: K; value: V } : never +}[keyof Gens] + +export async function* mergeAsyncGenerators>>( + gens: Gens +): AsyncGenerator> { + // Convert the input object into an array of [name, generator] tuples + const entries = Object.entries(gens) as [keyof Gens, Gens[keyof Gens]][] + + // Initialize an array to keep track of each generator's state + const iterators = entries.map(([name, gen]) => ({ + name, + iterator: gen[Symbol.asyncIterator](), + done: false, + })) + + // Continue looping as long as at least one generator is not done + while (iterators.some((it) => !it.done)) { + // Prepare an array of promises to fetch the next value from each generator + const nextPromises = iterators.map((it) => + it.done ? Promise.resolve({ done: true, value: undefined }) : it.iterator.next() + ) + + // Await all the next() promises concurrently + const results = await Promise.all(nextPromises) + + // Iterate through the results and yield values with their corresponding names + for (let i = 0; i < iterators.length; i++) { + const it = iterators[i] + const result = results[i] + + if (!it.done && !result.done) { + // Yield an object containing the generator's name and the yielded value + yield { type: it.name, value: result.value } as MergedYield + } + + if (!it.done && result.done) { + // Mark the generator as done if it has no more values + it.done = true + } + } + } +} diff --git a/src/internal/database/client.ts b/src/internal/database/client.ts index 68225733..e855f6e0 100644 --- a/src/internal/database/client.ts +++ b/src/internal/database/client.ts @@ -6,6 +6,7 @@ import { ERRORS } from '@internal/errors' interface ConnectionOptions { host: string tenantId: string + maxConnections?: number headers?: Record method?: string path?: string diff --git a/src/internal/database/connection.ts b/src/internal/database/connection.ts index 440333ed..5c612edd 100644 --- a/src/internal/database/connection.ts +++ b/src/internal/database/connection.ts @@ -69,7 +69,7 @@ export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.sp export class TenantConnection { public readonly role: string - constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) { + constructor(public readonly pool: Knex, protected readonly options: TenantConnectionOptions) { this.role = options.user.payload.role || 'anon' } @@ -101,7 +101,9 @@ export class TenantConnection { searchPath: isExternalPool ? undefined : searchPath, pool: { min: 0, - max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections, + max: isExternalPool + ? options.maxConnections || 1 + : options.maxConnections || databaseMaxConnections, acquireTimeoutMillis: databaseConnectionTimeout, idleTimeoutMillis: isExternalPool ? options.idleTimeoutMillis || 100 diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index d95152ab..f70a1107 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -32,7 +32,7 @@ export const baseLogger = pino({ headers: whitelistHeaders(request.headers), hostname: request.hostname, remoteAddress: request.ip, - remotePort: request.socket.remotePort, + remotePort: request.socket?.remotePort, } }, }, diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index 0584da7d..df5cd76c 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -88,6 +88,10 @@ export class Event> { } static batchSend[]>(messages: T) { + if (!pgQueueEnable) { + return Promise.all(messages.map((message) => message.send())) + } + return Queue.getInstance().insert( messages.map((message) => { const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {} diff --git a/src/internal/testing/generators/array.ts b/src/internal/testing/generators/array.ts new file mode 100644 index 00000000..1889480b --- /dev/null +++ b/src/internal/testing/generators/array.ts @@ -0,0 +1,25 @@ +export async function eachParallel(times: number, fn: (index: number) => Promise) { + const promises = [] + for (let i = 0; i < times; i++) { + promises.push(fn(i)) + } + + return Promise.all(promises) +} + +export function pickRandomFromArray(arr: T[]): T { + return arr[Math.floor(Math.random() * arr.length)] +} + +export function pickRandomRangeFromArray(arr: T[], range: number): T[] { + if (arr.length <= range) { + return arr + } + + const result = new Set() + while (result.size < range) { + result.add(pickRandomFromArray(arr)) + } + + return Array.from(result) +} diff --git a/src/internal/testing/seeder/base-seeder.ts b/src/internal/testing/seeder/base-seeder.ts new file mode 100644 index 00000000..032343ab --- /dev/null +++ b/src/internal/testing/seeder/base-seeder.ts @@ -0,0 +1,87 @@ +import { Persistence } from './persistence' + +export abstract class Seeder { + protected persistence: Persistence + protected records: Map + + constructor(persistence: Persistence) { + this.persistence = persistence + this.records = new Map() + } + + /** + * Retrieves all collected records. + * @returns A map of table names to their respective records. + */ + getAllRecords(): Map { + return this.records + } + + /** + * Runs the seeding operations within a transaction. + * @param operation - The seeding operations to execute. + */ + async runInTransaction(operation: () => Promise): Promise { + try { + await this.persistence.beginTransaction() + await operation() + await this.persistence.commitTransaction() + } catch (error) { + console.error('Error during seeding:', error) + console.log('Rolling back transaction...') + await this.persistence.rollbackTransaction() + throw error + } + } + + /** + * Performs batch inserts for all collected records. + */ + async batchInsertAll(): Promise { + for (const [table, records] of this.records.entries()) { + if (records.length > 0) { + console.log(`Inserting ${records.length} records into ${table}...`) + await this.persistence.insertBatch(table, records) + console.log(`Inserted ${records.length} records into ${table}.`) + } + } + } + + /** + * Adds records to the internal collection for a specific table. + * @param table - The table name. + * @param records - The records to add. + */ + protected addRecords(table: string, records: T[]): void { + if (!this.records.has(table)) { + this.records.set(table, []) + } + this.records.get(table)!.push(...records) + } + + /** + * Executes a raw SQL query. + * @param query - The SQL query string. + * @param bindings - Optional bindings for parameterized queries. + * @returns The result of the query. + */ + protected async rawQuery(query: string, bindings?: any[]): Promise { + return this.persistence.rawQuery(query, bindings) + } + + /** + * Generates a list of records based on count and a generator function. + * Assigns UUIDs to each record. + * @param count - Number of records to generate. + * @param generator - Function to generate record data. + * @returns An array of generated records with assigned IDs. + */ + protected generateRecords( + count: number, + generator: (n: number) => T + ): T[] { + return Array.from({ length: count }, (_, i) => ({ + ...generator(i + 1), + })) + } +} diff --git a/src/internal/testing/seeder/index.ts b/src/internal/testing/seeder/index.ts new file mode 100644 index 00000000..ef91c6d2 --- /dev/null +++ b/src/internal/testing/seeder/index.ts @@ -0,0 +1,3 @@ +export * from './knex-persistence' +export * from './persistence' +export * from './base-seeder' diff --git a/src/internal/testing/seeder/knex-persistence.ts b/src/internal/testing/seeder/knex-persistence.ts new file mode 100644 index 00000000..0942de26 --- /dev/null +++ b/src/internal/testing/seeder/knex-persistence.ts @@ -0,0 +1,50 @@ +// src/persistence/KnexPersistence.ts +import { Persistence } from './persistence' +import knex, { Knex } from 'knex' + +export class KnexPersistence implements Persistence { + private knex: Knex + private trx: Knex.Transaction | null = null + + constructor(knexConfig: Knex.Config) { + this.knex = knex(knexConfig) + } + + async insertBatch(table: string, records: T[]): Promise { + if (this.trx) { + await this.trx(table).insert(records) + } else { + await this.knex(table).insert(records) + } + } + + async beginTransaction(): Promise { + this.trx = await this.knex.transaction() + } + + async commitTransaction(): Promise { + if (this.trx) { + await this.trx.commit() + this.trx = null + } + } + + async rollbackTransaction(): Promise { + if (this.trx) { + await this.trx.rollback() + this.trx = null + } + } + + async rawQuery(query: string, bindings: any[] = []): Promise { + if (this.trx) { + return this.trx.raw(query, bindings) + } + return this.knex.raw(query, bindings) + } + + // Optional: Destroy the Knex instance to close connections + async destroy(): Promise { + await this.knex.destroy() + } +} diff --git a/src/internal/testing/seeder/persistence.ts b/src/internal/testing/seeder/persistence.ts new file mode 100644 index 00000000..7cc6255b --- /dev/null +++ b/src/internal/testing/seeder/persistence.ts @@ -0,0 +1,7 @@ +export interface Persistence { + insertBatch(table: string, records: T[]): Promise + beginTransaction(): Promise + commitTransaction(): Promise + rollbackTransaction(): Promise + rawQuery(query: string, bindings?: any[]): Promise +} diff --git a/src/storage/backend/adapter.ts b/src/storage/backend/adapter.ts index 01195d02..561108fb 100644 --- a/src/storage/backend/adapter.ts +++ b/src/storage/backend/adapter.ts @@ -52,6 +52,19 @@ export abstract class StorageBackendAdapter { this.client = null } + async list( + bucket: string, + options?: { + prefix?: string + delimiter?: string + nextToken?: string + startAfter?: string + beforeDate?: Date + } + ): Promise<{ keys: { name: string; size: number }[]; nextToken?: string }> { + throw new Error('list not implemented') + } + /** * Gets an object body and metadata * @param bucketName diff --git a/src/storage/backend/file.ts b/src/storage/backend/file.ts index b44402c3..beece990 100644 --- a/src/storage/backend/file.ts +++ b/src/storage/backend/file.ts @@ -66,6 +66,18 @@ export class FileBackend implements StorageBackendAdapter { throw new Error('FILE_STORAGE_ETAG_ALGORITHM env variable must be either "mtime" or "md5"') } + async list( + bucket: string, + options?: { + prefix?: string + delimiter?: string + nextToken?: string + startAfter?: string + } + ): Promise<{ keys: { name: string; size: number }[]; nextToken?: string }> { + return Promise.resolve({ keys: [] }) + } + /** * Gets an object body and metadata * @param bucketName diff --git a/src/storage/backend/index.ts b/src/storage/backend/index.ts index ca44a4e2..46e5b5a4 100644 --- a/src/storage/backend/index.ts +++ b/src/storage/backend/index.ts @@ -1,6 +1,6 @@ import { StorageBackendAdapter } from './adapter' import { FileBackend } from './file' -import { S3Backend, S3ClientOptions } from './s3' +import { S3Backend, S3ClientOptions } from './s3/adapter' import { getConfig, StorageBackendType } from '../../config' export * from './s3' diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3/adapter.ts similarity index 88% rename from src/storage/backend/s3.ts rename to src/storage/backend/s3/adapter.ts index c4ec0414..2d6abd0e 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3/adapter.ts @@ -8,6 +8,7 @@ import { GetObjectCommand, GetObjectCommandInput, HeadObjectCommand, + ListObjectsV2Command, ListPartsCommand, S3Client, S3ClientConfig, @@ -23,13 +24,14 @@ import { ObjectResponse, withOptionalVersion, UploadPart, -} from './adapter' +} from './../adapter' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { ERRORS, StorageBackendError } from '@internal/errors' -import { getConfig } from '../../config' +import { getConfig } from '../../../config' import { Readable } from 'node:stream' import { createAgent, InstrumentedAgent } from '@internal/http' import { monitorStream } from '@internal/streams' +import { BackupObjectInfo, ObjectBackup } from '@storage/backend/s3/backup' const { tracingFeatures, storageS3MaxSockets, tracingEnabled } = getConfig() @@ -249,6 +251,54 @@ export class S3Backend implements StorageBackendAdapter { } } + async list( + bucket: string, + options?: { + prefix?: string + delimiter?: string + nextToken?: string + startAfter?: string + beforeDate?: Date + } + ): Promise<{ keys: { name: string; size: number }[]; nextToken?: string }> { + try { + const command = new ListObjectsV2Command({ + Bucket: bucket, + Prefix: options?.prefix, + Delimiter: options?.delimiter, + ContinuationToken: options?.nextToken || undefined, + StartAfter: options?.startAfter, + }) + const data = await this.client.send(command) + const keys = + data.Contents?.filter((ele) => { + if (options?.beforeDate) { + if (ele.LastModified && ele.LastModified < options.beforeDate) { + return ele.Key as string + } + return false + } + return ele.Key + }).map((ele) => { + if (options?.prefix) { + return { + name: (ele.Key as string).replace(options.prefix, '').replace('/', ''), + size: ele.Size as number, + } + } + + return { name: ele.Key as string, size: ele.Size as number } + }) || [] + + return { + keys, + nextToken: data.NextContinuationToken, + } + } catch (e: any) { + throw StorageBackendError.fromError(e) + } + } + /** * Deletes multiple objects * @param bucket @@ -464,6 +514,10 @@ export class S3Backend implements StorageBackendAdapter { } } + async backup(backupInfo: BackupObjectInfo) { + return new ObjectBackup(this.client, backupInfo).backup() + } + close() { this.agent.close() } diff --git a/src/storage/backend/s3/backup.ts b/src/storage/backend/s3/backup.ts new file mode 100644 index 00000000..224f0ffb --- /dev/null +++ b/src/storage/backend/s3/backup.ts @@ -0,0 +1,198 @@ +import { + S3Client, + CopyObjectCommand, + CreateMultipartUploadCommand, + UploadPartCopyCommand, + CompleteMultipartUploadCommand, + AbortMultipartUploadCommand, + CompletedPart, +} from '@aws-sdk/client-s3' + +const FIVE_GB = 5 * 1024 * 1024 * 1024 + +export interface BackupObjectInfo { + // Source Object Details + sourceBucket: string + sourceKey: string + + // Destination Object Details + destinationBucket: string + destinationKey: string + + size: number +} + +/** + * Class representing an object backup operation between S3 buckets. + */ +export class ObjectBackup { + private static readonly MAX_PART_SIZE = 5 * 1024 * 1024 * 1024 // 5 GB per part + private static readonly MAX_CONCURRENT_UPLOADS = 5 // Adjust based on your system's capabilities + private s3Client: S3Client + private objectInfo: BackupObjectInfo + + /** + * Creates an instance of ObjectBackup. + * @param s3Client - An instance of S3Client. + * @param objectInfo - Information about the object to be backed up. + */ + constructor(s3Client: S3Client, objectInfo: BackupObjectInfo) { + this.s3Client = s3Client + this.objectInfo = objectInfo + } + + /** + * Initiates the backup (copy) process for the specified object. + */ + public async backup(): Promise { + try { + const { size } = this.objectInfo + + if (size > FIVE_GB) { + // Perform multipart copy for large files + await this.multipartCopy() + } else { + // Perform single copy for smaller files + await this.singleCopy() + } + } catch (error) { + throw error + } + } + + /** + * Performs a single copy operation for objects <= 5GB. + */ + private async singleCopy(): Promise { + const { sourceBucket, sourceKey, destinationBucket, destinationKey } = this.objectInfo + + const copyParams = { + Bucket: destinationBucket, + Key: destinationKey, + CopySource: encodeURIComponent(`/${sourceBucket}/${sourceKey}`), + } + + const copyCommand = new CopyObjectCommand(copyParams) + await this.s3Client.send(copyCommand) + } + + /** + * Performs a multipart copy operation for objects > 5GB. + */ + private async multipartCopy(): Promise { + const { destinationBucket, destinationKey, size } = this.objectInfo + + // Step 1: Initiate Multipart Upload + const createMultipartUploadCommand = new CreateMultipartUploadCommand({ + Bucket: destinationBucket, + Key: destinationKey, + }) + + const createMultipartUploadResponse = await this.s3Client.send(createMultipartUploadCommand) + const uploadId = createMultipartUploadResponse.UploadId + + if (!uploadId) { + throw new Error('Failed to initiate multipart upload.') + } + + const maxPartSize = ObjectBackup.MAX_PART_SIZE + const numParts = Math.ceil(size / maxPartSize) + const completedParts: CompletedPart[] = [] + + try { + // Step 2: Copy Parts Concurrently + await this.copyPartsConcurrently(uploadId, numParts, size, completedParts) + + // Step 3: Sort the completed parts by PartNumber + completedParts.sort((a, b) => (a.PartNumber! < b.PartNumber! ? -1 : 1)) + + // Step 4: Complete Multipart Upload + const completeMultipartUploadCommand = new CompleteMultipartUploadCommand({ + Bucket: destinationBucket, + Key: destinationKey, + UploadId: uploadId, + MultipartUpload: { + Parts: completedParts, + }, + }) + + await this.s3Client.send(completeMultipartUploadCommand) + } catch (error) { + // Abort the multipart upload in case of failure + const abortMultipartUploadCommand = new AbortMultipartUploadCommand({ + Bucket: destinationBucket, + Key: destinationKey, + UploadId: uploadId, + }) + + await this.s3Client.send(abortMultipartUploadCommand) + throw error + } + } + + /** + * Copies parts of the object concurrently. + * @param uploadId - The UploadId from the initiated multipart upload. + * @param numParts - Total number of parts to copy. + * @param size - Total size of the object in bytes. + * @param completedParts - Array to store completed parts information. + */ + private async copyPartsConcurrently( + uploadId: string, + numParts: number, + size: number, + completedParts: CompletedPart[] + ): Promise { + const { sourceBucket, sourceKey, destinationBucket, destinationKey } = this.objectInfo + const partSize = ObjectBackup.MAX_PART_SIZE + let currentPart = 1 + + // Worker function to copy a single part + const copyPart = async (partNumber: number): Promise => { + const start = (partNumber - 1) * partSize + const end = partNumber * partSize < size ? partNumber * partSize - 1 : size - 1 + + const uploadPartCopyCommand = new UploadPartCopyCommand({ + Bucket: destinationBucket, + Key: destinationKey, + PartNumber: partNumber, + UploadId: uploadId, + CopySource: encodeURIComponent(`/${sourceBucket}/${sourceKey}`), + CopySourceRange: `bytes=${start}-${end}`, + }) + + const uploadPartCopyResponse = await this.s3Client.send(uploadPartCopyCommand) + + if (!uploadPartCopyResponse.CopyPartResult?.ETag) { + throw new Error(`Failed to copy part ${partNumber}. No ETag returned.`) + } + + completedParts.push({ + ETag: uploadPartCopyResponse.CopyPartResult.ETag, + PartNumber: partNumber, + }) + } + + // Array to hold active worker promises + const workers: Promise[] = [] + + // Start concurrent workers + for (let i = 0; i < ObjectBackup.MAX_CONCURRENT_UPLOADS && currentPart <= numParts; i++) { + const worker = (async () => { + while (currentPart <= numParts) { + const partToCopy = currentPart + currentPart += 1 + try { + await copyPart(partToCopy) + } catch (error) { + throw error + } + } + })() + workers.push(worker) + } + + // Wait for all workers to complete + await Promise.all(workers) + } +} diff --git a/src/storage/backend/s3/index.ts b/src/storage/backend/s3/index.ts new file mode 100644 index 00000000..32544c6d --- /dev/null +++ b/src/storage/backend/s3/index.ts @@ -0,0 +1 @@ +export * from './adapter' diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 48c984d3..a81d8a12 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -49,6 +49,7 @@ export interface Database { tenantId: string reqId?: string role?: string + connection: TenantConnection tenant(): { ref: string; host: string } @@ -78,7 +79,13 @@ export interface Database { deleteBucket(bucketId: string | string[]): Promise - listObjects(bucketId: string, columns: string, limit: number): Promise + listObjects( + bucketId: string, + columns: string, + limit: number, + before?: Date, + nextToken?: string + ): Promise listObjectsV2( bucketId: string, options?: { @@ -131,12 +138,21 @@ export interface Database { deleteObject(bucketId: string, objectName: string, version?: string): Promise deleteObjects(bucketId: string, objectNames: string[], by: keyof Obj): Promise + deleteObjectVersions( + bucketId: string, + objectNames: { name: string; version: string }[] + ): Promise updateObjectMetadata(bucketId: string, objectName: string, metadata: ObjectMetadata): Promise updateObjectOwner(bucketId: string, objectName: string, owner?: string): Promise findObjects(bucketId: string, objectNames: string[], columns: string): Promise + findObjectVersions( + bucketId: string, + objectNames: { name: string; version: string }[], + columns: string + ): Promise findObject( bucketId: string, objectName: string, diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 5f708545..a8609a10 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -178,13 +178,31 @@ export class StorageKnexDB implements Database { }) } - async listObjects(bucketId: string, columns = 'id', limit = 10) { + async listObjects( + bucketId: string, + columns = 'id', + limit = 10, + before?: Date, + nextToken?: string + ) { const data = await this.runQuery('ListObjects', (knex) => { - return knex + const query = knex .from('objects') .select(columns.split(',')) .where('bucket_id', bucketId) - .limit(limit) as Promise + // @ts-expect-error knex typing is wrong, it doesn't accept a knex raw on orderBy, even though is totally legit + .orderBy(knex.raw('name COLLATE "C"')) + .limit(limit) + + if (before) { + query.andWhere('created_at', '<', before.toISOString()) + } + + if (nextToken) { + query.andWhere(knex.raw('name COLLATE "C" > ?', [nextToken])) + } + + return query as Promise }) return data @@ -439,6 +457,24 @@ export class StorageKnexDB implements Database { return objects } + async deleteObjectVersions(bucketId: string, objectNames: { name: string; version: string }[]) { + const objects = await this.runQuery('DeleteObjects', (knex) => { + const placeholders = objectNames.map(() => '(?, ?)').join(', ') + + // Step 2: Flatten the array of tuples into a single array of values + const flatParams = objectNames.flatMap(({ name, version }) => [name, version]) + + return knex + .from('objects') + .delete() + .where('bucket_id', bucketId) + .whereRaw(`(name, version) IN (${placeholders})`, flatParams) + .returning('*') + }) + + return objects + } + async updateObjectMetadata(bucketId: string, objectName: string, metadata: ObjectMetadata) { const [object] = await this.runQuery('UpdateObjectMetadata', (knex) => { return knex @@ -530,6 +566,24 @@ export class StorageKnexDB implements Database { return objects } + async findObjectVersions(bucketId: string, obj: { name: string; version: string }[]) { + const objects = await this.runQuery('FindObjectVersions', (knex) => { + // Step 1: Generate placeholders for each tuple + const placeholders = obj.map(() => '(?, ?)').join(', ') + + // Step 2: Flatten the array of tuples into a single array of values + const flatParams = obj.flatMap(({ name, version }) => [name, version]) + + return knex + .from('objects') + .select('objects.name', 'objects.version') + .where('bucket_id', bucketId) + .whereRaw(`(name, version) IN (${placeholders})`, flatParams) + }) + + return objects + } + async mustLockObject(bucketId: string, objectName: string, version?: string) { return this.runQuery('MustLockObject', async (knex) => { const hash = hashStringToInt(`${bucketId}/${objectName}${version ? `/${version}` : ''}`) diff --git a/src/storage/events/backup-object.ts b/src/storage/events/backup-object.ts new file mode 100644 index 00000000..ce184857 --- /dev/null +++ b/src/storage/events/backup-object.ts @@ -0,0 +1,118 @@ +import { BaseEvent } from './base-event' +import { JobWithMetadata, SendOptions, WorkOptions } from 'pg-boss' +import { BasePayload } from '@internal/queue' +import { S3Backend } from '@storage/backend' +import { getConfig } from '../../config' +import { logger, logSchema } from '@internal/monitoring' + +const { storageS3Bucket } = getConfig() + +interface BackupObjectEventPayload extends BasePayload { + name: string + bucketId: string + version: string + size: number + deleteOriginal?: boolean +} + +export class BackupObjectEvent extends BaseEvent { + static queueName = 'backup-object' + + static getWorkerOptions(): WorkOptions { + return { + teamSize: 10, + teamConcurrency: 5, + includeMetadata: true, + } + } + + static getQueueOptions(payload: BackupObjectEventPayload): SendOptions { + return { + singletonKey: `${payload.tenant.ref}/${payload.bucketId}/${payload.name}/${payload.version}`, + retryLimit: 5, + retryDelay: 5, + priority: 10, + } + } + + static async handle(job: JobWithMetadata) { + const tenantId = job.data.tenant.ref + const storage = await this.createStorage(job.data) + + if (!(storage.backend instanceof S3Backend)) { + return + } + + const s3Key = `${tenantId}/${job.data.bucketId}/${job.data.name}` + + try { + logSchema.event(logger, `[Admin]: BackupObject ${s3Key}`, { + jodId: job.id, + type: 'event', + event: 'BackupObject', + payload: JSON.stringify(job.data), + objectPath: s3Key, + resources: [`${job.data.bucketId}/${job.data.name}`], + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + }) + + await storage.backend.backup({ + sourceBucket: storageS3Bucket, + destinationBucket: storageS3Bucket, + sourceKey: `${s3Key}/${job.data.version}`, + destinationKey: `__internal/${s3Key}/${job.data.version}`, + size: job.data.size, + }) + + if (job.data.deleteOriginal) { + logSchema.event(logger, `[Admin]: DeleteOriginalObject ${s3Key}`, { + jodId: job.id, + type: 'event', + event: 'BackupObject', + payload: JSON.stringify(job.data), + objectPath: s3Key, + resources: [`${job.data.bucketId}/${job.data.name}`], + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + }) + + await storage.backend.deleteObject( + storageS3Bucket, + `${tenantId}/${job.data.bucketId}/${job.data.name}`, + job.data.version + ) + } + } catch (e) { + logger.error( + { + error: e, + jodId: job.id, + type: 'event', + event: 'ObjectAdminDelete', + payload: JSON.stringify(job.data), + objectPath: s3Key, + objectVersion: job.data.version, + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + }, + `[Admin]: BackupObjectEvent ${s3Key} - FAILED` + ) + } finally { + storage.db + .destroyConnection() + .then(() => { + // no-op + }) + .catch((e) => { + logger.error( + { error: e }, + `[Admin]: BackupObjectEvent ${tenantId} - FAILED DISPOSING CONNECTION` + ) + }) + } + } +} diff --git a/src/storage/events/index.ts b/src/storage/events/index.ts index c90c6967..8fc58aed 100644 --- a/src/storage/events/index.ts +++ b/src/storage/events/index.ts @@ -4,5 +4,6 @@ export * from './object-created' export * from './object-updated' export * from './object-removed' export * from './object-admin-delete' +export * from './backup-object' export * from './run-migrations' export * from './workers' diff --git a/src/storage/events/webhook.ts b/src/storage/events/webhook.ts index bff37004..2676e933 100644 --- a/src/storage/events/webhook.ts +++ b/src/storage/events/webhook.ts @@ -85,7 +85,7 @@ export class Webhook extends BaseEvent { static async handle(job: Job) { if (!webhookURL) { - logger.info('skipping webhook, no WEBHOOK_URL set') + logger.debug('skipping webhook, no WEBHOOK_URL set') return job } diff --git a/src/storage/events/workers.ts b/src/storage/events/workers.ts index 775767ba..a0e9d927 100644 --- a/src/storage/events/workers.ts +++ b/src/storage/events/workers.ts @@ -1,8 +1,9 @@ import { Queue } from '@internal/queue' -import { ObjectAdminDelete, Webhook, RunMigrationsOnTenants } from './index' +import { ObjectAdminDelete, Webhook, RunMigrationsOnTenants, BackupObjectEvent } from './index' export function registerWorkers() { Queue.register(Webhook) Queue.register(ObjectAdminDelete) Queue.register(RunMigrationsOnTenants) + Queue.register(BackupObjectEvent) } diff --git a/src/storage/index.ts b/src/storage/index.ts index 4d95f069..47e06ce5 100644 --- a/src/storage/index.ts +++ b/src/storage/index.ts @@ -2,3 +2,4 @@ export * from './storage' export * as backends from './backend' export * from './database' export * from './schemas' +export * from './scanner/scanner' diff --git a/src/storage/scanner/index.ts b/src/storage/scanner/index.ts new file mode 100644 index 00000000..4c086e13 --- /dev/null +++ b/src/storage/scanner/index.ts @@ -0,0 +1 @@ +export * from './scanner' diff --git a/src/storage/scanner/scanner.ts b/src/storage/scanner/scanner.ts new file mode 100644 index 00000000..e3fca824 --- /dev/null +++ b/src/storage/scanner/scanner.ts @@ -0,0 +1,459 @@ +import { Storage } from '@storage/storage' +import { getConfig } from '../../config' +import { ERRORS } from '@internal/errors' +import { mergeAsyncGenerators } from '@internal/concurrency' +import { BackupObjectEvent } from '@storage/events/backup-object' +import { withOptionalVersion } from '@storage/backend' + +const { storageS3Bucket } = getConfig() + +const S3_KEYS_TMP_TABLE_NAME = 'storage._s3_remote_keys' + +interface OrphanObject { + name: string + size: number + version?: string +} + +/** + * ObjectScanner is a utility class to scan and compare objects in the database and S3 + * it traverses all objects in the database and S3 and yields orphaned keys + */ +export class ObjectScanner { + constructor(private readonly storage: Storage) {} + + /** + * List all orphaned objects in the database and S3 + * @param bucket + * @param options + */ + async *listOrphaned(bucket: string, options: { before?: Date; signal: AbortSignal }) { + const tmpTable = `${S3_KEYS_TMP_TABLE_NAME}_${Date.now()}` + const prefix = `${this.storage.db.tenantId}/${bucket}` + + const localDBKeys = this.syncS3KeysToDB(tmpTable, prefix, options) + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of localDBKeys) { + // await all of the operation finished + if (options.signal.aborted) { + throw ERRORS.Aborted('Operation was aborted') + } + } + + const s3Keys = this.listS3Orphans(tmpTable, { + bucket: bucket, + prefix: `${this.storage.db.tenantId}/${bucket}`, + signal: options.signal, + }) + + const dbKeys = this.listDBOrphans(tmpTable, { + bucket, + before: options.before, + signal: options.signal, + }) + + for await (const orphan of mergeAsyncGenerators({ + s3Orphans: s3Keys, + dbOrphans: dbKeys, + })) { + if (options.signal.aborted) { + throw ERRORS.Aborted('Operation was aborted') + } + yield orphan + } + } catch (e) { + throw e + } finally { + await this.storage.db.connection.pool.raw(`DROP TABLE IF EXISTS ${tmpTable}`) + } + } + + /** + * Delete orphaned objects in the database and S3 + * + * @param bucket + * @param options + */ + async *deleteOrphans( + bucket: string, + options: { before?: Date; deleteDbKeys?: boolean; deleteS3Keys?: boolean; signal: AbortSignal } + ) { + const prefix = `${this.storage.db.tenantId}/${bucket}` + const tmpTable = `${S3_KEYS_TMP_TABLE_NAME}_${Date.now()}` + + try { + const iterators = {} as { + dbOrphans: AsyncGenerator | undefined + s3Orphans: AsyncGenerator | undefined + } + + const s3LocalCache = this.syncS3KeysToDB(tmpTable, prefix, options) + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of s3LocalCache) { + // await all of the operation finished + if (options.signal.aborted) { + throw ERRORS.Aborted('Operation was aborted') + } + } + + if (options.deleteDbKeys) { + iterators['dbOrphans'] = this.deleteDBOrphans(tmpTable, { + ...options, + bucket, + prefix, + }) + } + + if (options.deleteS3Keys) { + iterators['s3Orphans'] = this.deleteS3Orphans(tmpTable, { + ...options, + bucket, + prefix, + }) + } + + const iterator = mergeAsyncGenerators({ + dbOrphans: iterators.dbOrphans || (async function* () {})(), + s3Orphans: iterators.s3Orphans || (async function* () {})(), + }) + + for await (const result of iterator) { + yield result + } + } catch (e) { + throw e + } finally { + await this.storage.db.connection.pool.raw(`DROP TABLE IF EXISTS ${tmpTable}`) + } + } + + /** + * List all objects in the database for a given bucket + * yields max 1000 keys at a time + * + * @param bucket + * @param options + */ + protected async *listAllDbObjects( + bucket: string, + options: { before?: Date; signal: AbortSignal } + ) { + let nextToken: string | undefined = undefined + + while (true) { + if (options.signal.aborted) { + break + } + + const storageObjects = await this.storage.db.listObjects( + bucket, + 'id,name,version,metadata', + 1000, + options.before, + nextToken + ) + + const dbKeys = storageObjects.map(({ name, version, metadata }) => { + if (version) { + return { name: `${name}`, version: version, size: (metadata?.size as number) || 0 } + } + return { name, size: (metadata?.size as number) || 0 } + }) + + if (storageObjects.length === 0) { + break + } + + yield dbKeys + + if (storageObjects.length < 1000) { + break + } + + const lastObj = storageObjects[storageObjects.length - 1] + + if (lastObj.version) { + nextToken = `${lastObj.name}/${lastObj.version}` + } else { + nextToken = lastObj.name + } + } + } + + protected async *listAllCacheS3Keys(tableName: string, nextItem: string, signal: AbortSignal) { + while (true) { + if (signal.aborted) { + break + } + const query = this.storage.db.connection.pool + .table(tableName) + .select('key', 'size') + .orderBy('key', 'asc') + + if (nextItem) { + query.where('key', '>', nextItem) + } + + const result = await query.limit(1000) + + if (result.length === 1000) { + nextItem = result[result.length - 1].name + } + + if (result.length === 0) { + break + } + + yield result.map((k) => { + const keyPath = k.key.split('/') + const version = keyPath.pop() + return { + name: keyPath.join('/') as string, + version: version as string, + size: k.size, + } + }) + + if (result.length < 1000) { + break + } + } + } + + protected async findCacheS3KeysById( + table: string, + keys: string[] + // { before }: { before?: Date } + ) { + return this.storage.db.connection.pool + .table(table) + .select<{ key: string }[]>('key') + .whereIn('key', keys) + } + + protected async *syncS3KeysToDB( + tmpTable: string, + bucket: string, + { signal, before }: { signal: AbortSignal; before?: Date } + ) { + const s3ObjectsStream = this.listAllS3Objects(bucket, { + before, + signal, + }) + + await this.storage.db.connection.pool.raw(` + CREATE UNLOGGED TABLE IF NOT EXISTS ${tmpTable} ( + key TEXT COLLATE "C" PRIMARY KEY, + size BIGINT NOT NULL + ) + `) + + for await (const s3ObjectKeys of s3ObjectsStream) { + const stored = await this.storage.db.connection.pool + .table(tmpTable) + .insert( + s3ObjectKeys.map((k) => ({ + key: k.name, + size: k.size, + })), + tmpTable + ) + .onConflict() + .ignore() + .returning('*') + + yield stored + } + } + + /** + * List all objects in the S3 bucket for a given prefix + * yields max 1000 keys at a time + * + * yields at each iteration + * + * @param prefix + * @param options + * @protected + */ + protected async *listAllS3Objects( + prefix: string, + options: { before?: Date; signal: AbortSignal } + ) { + let nextToken: string | undefined = undefined + + while (true) { + if (options.signal.aborted) { + break + } + + const result = await this.storage.backend.list(storageS3Bucket, { + prefix, + nextToken, + beforeDate: options.before, + }) + + if (result.keys.length === 0) { + break + } + + nextToken = result.nextToken + + yield result.keys.filter((k) => { + return k.name && !k.name.endsWith('.info') + }) + + if (!nextToken) { + break + } + } + } + + private async *deleteS3Orphans( + tmpTable: string, + options: { + bucket: string + prefix: string + signal: AbortSignal + } + ) { + const s3Keys = this.listS3Orphans(tmpTable, options) + + for await (const s3Objects of s3Keys) { + if (options.signal.aborted) { + break + } + + await BackupObjectEvent.batchSend( + s3Objects.map((obj) => { + return new BackupObjectEvent({ + deleteOriginal: true, + name: obj.name, + bucketId: options.bucket, + tenant: this.storage.db.tenant(), + version: obj.version, + size: obj.size, + reqId: this.storage.db.reqId, + }) + }) + ) + + yield s3Objects + } + } + + private async *listS3Orphans( + tmpTable: string, + options: { + bucket: string + prefix: string + signal: AbortSignal + } + ) { + const s3Keys = this.listAllCacheS3Keys(tmpTable, '', options.signal) + + for await (const tmpS3Objects of s3Keys) { + if (options.signal.aborted) { + break + } + // find in the db if keys exists + const localObjs = tmpS3Objects.map((k) => ({ + name: k.name, + version: k.version, + })) + + if (localObjs.length === 0) { + continue + } + + const dbObjects = await this.storage.db.findObjectVersions( + options.bucket, + localObjs, + 'name,version' + ) + + const s3OrphanedKeys = tmpS3Objects.filter( + (key) => + !dbObjects.find((dbKey) => dbKey.name === key.name && dbKey.version === key.version) + ) + + if (s3OrphanedKeys.length > 0) { + // delete s3 keys + yield s3OrphanedKeys + } + } + } + + private async *listDBOrphans( + tmpTable: string, + options: { + bucket: string + before?: Date + signal: AbortSignal + } + ) { + const dbS3Objects = this.listAllDbObjects(options.bucket, { + before: options.before, + signal: options.signal, + }) + + for await (const dbObjects of dbS3Objects) { + if (options.signal.aborted) { + break + } + if (dbObjects.length === 0) { + continue + } + const tmpS3List = await this.findCacheS3KeysById( + tmpTable, + dbObjects.map((o) => { + return withOptionalVersion(o.name, o.version) + }) + ) + + const dbOrphans = dbObjects.filter( + (key) => + !tmpS3List.find((tmpKey) => { + return tmpKey.key === withOptionalVersion(key.name, key.version) + }) + ) + + if (dbOrphans.length > 0) { + yield dbOrphans + } + } + } + + private async *deleteDBOrphans( + tmpTable: string, + options: { + bucket: string + prefix: string + before?: Date + signal: AbortSignal + } + ) { + const promises = [] + const orphans = this.listDBOrphans(tmpTable, { + ...options, + before: options.before, + }) + for await (const dbObjects of orphans) { + if (dbObjects.length > 0) { + promises.push( + this.storage.db.deleteObjectVersions( + options.bucket, + dbObjects.filter((o) => o.version) as { name: string; version: string }[] + ) + ) + + yield dbObjects + } + } + + await Promise.all(promises) + } +} diff --git a/src/test/scanner.test.ts b/src/test/scanner.test.ts new file mode 100644 index 00000000..fb8d8d9e --- /dev/null +++ b/src/test/scanner.test.ts @@ -0,0 +1,160 @@ +import { useStorage } from './utils/storage' +import { Readable } from 'stream' +import { eachParallel } from '@internal/testing/generators/array' +import { getConfig } from '../config' +import { randomUUID } from 'crypto' + +const { storageS3Bucket, tenantId } = getConfig() + +describe('ObjectScanner', () => { + const storage = useStorage() + + it('Will list orphaned objects', async () => { + const bucketId = 'test-' + Date.now() + const bucket = await storage.storage.createBucket({ + id: bucketId, + name: bucketId, + }) + + const maxUploads = 20 + + // Create uploads + const result = await eachParallel(maxUploads, async (i) => { + const upload = await storage.uploader.upload({ + bucketId: bucket.id, + objectName: randomUUID() + `-test-${i}.text`, + uploadType: 'standard', + file: { + body: Readable.from(Buffer.from('test')), + mimeType: 'text/plain', + cacheControl: 'no-cache', + userMetadata: {}, + isTruncated: () => false, + }, + }) + + return { name: upload.obj.name, version: upload.obj.version } + }) + + const numToDelete = 5 + const objectsToDelete = result.slice(0, numToDelete) + await storage.database.deleteObjects( + bucket.id, + objectsToDelete.map((o) => o.name), + 'name' + ) + + const s3ToDelete = result.slice(5, 5 + numToDelete) + await storage.adapter.deleteObjects( + storageS3Bucket, + s3ToDelete.map((o) => `${tenantId}/${bucket.id}/${o.name}/${o.version}`) + ) + + const objectsAfterDel = await storage.database.listObjects(bucket.id, 'name', 10000) + expect(objectsAfterDel).toHaveLength(maxUploads - numToDelete) + + const orphaned = storage.scanner.listOrphaned(bucket.id, { + signal: new AbortController().signal, + }) + + const deleted = { s3OrphanedKeys: [] as any[], dbOrphanedKeys: [] as any[] } + for await (const result of orphaned) { + if (result.type === 'dbOrphans') { + deleted.dbOrphanedKeys = [...deleted.dbOrphanedKeys, ...result.value] + } + + if (result.type === 's3Orphans') { + deleted.s3OrphanedKeys = [...deleted.s3OrphanedKeys, ...result.value] + } + } + expect(deleted.s3OrphanedKeys).toHaveLength(numToDelete) + expect(deleted.dbOrphanedKeys).toHaveLength(numToDelete) + }) + + it('Will delete S3 objects, if no records exists in the database', async () => { + const bucketId = 'test-' + Date.now() + const bucket = await storage.storage.createBucket({ + id: bucketId, + name: bucketId, + }) + const options = { + deleteDbKeys: false, + deleteS3Keys: true, + signal: new AbortController().signal, + } + + const maxUploads = 300 + + // Create uploads + const result = await eachParallel(maxUploads, async (i) => { + const upload = await storage.uploader.upload({ + bucketId: bucket.id, + objectName: randomUUID() + `-test-${i}.text`, + uploadType: 'standard', + file: { + body: Readable.from(Buffer.from('test')), + mimeType: 'text/plain', + cacheControl: 'no-cache', + userMetadata: {}, + isTruncated: () => false, + }, + }) + + return { name: upload.obj.name } + }) + + const numToDelete = 10 + const objectsToDelete = result.slice(0, numToDelete) + await storage.database.deleteObjects( + bucket.id, + objectsToDelete.map((o) => o.name), + 'name' + ) + + const objectsAfterDel = await storage.database.listObjects(bucket.id, 'name', 10000) + expect(objectsAfterDel).toHaveLength(maxUploads - numToDelete) + + const orphaned = storage.scanner.deleteOrphans(bucket.id, options) + + const deleted = { dbOrphans: [] as any[], s3Orphans: [] as any[] } + for await (const result of orphaned) { + if (result.type === 'dbOrphans') { + deleted.dbOrphans = [...deleted.dbOrphans, ...result.value] + } + + if (result.type === 's3Orphans') { + deleted.s3Orphans = [...deleted.s3Orphans, ...result.value] + } + } + expect(deleted.s3Orphans).toHaveLength(numToDelete) + expect(deleted.dbOrphans).toHaveLength(0) + + // Compare number of items in the bucket + const s3ObjectAll = [] + let nextToken = '' + + while (true) { + const s3Objects = await storage.adapter.list(storageS3Bucket, { + prefix: `${tenantId}/${bucket.id}`, + nextToken: nextToken, + }) + s3ObjectAll.push(...s3Objects.keys) + if (!s3Objects.nextToken) { + break + } + nextToken = s3Objects.nextToken + } + + // Check s3 files are deleted + expect(s3ObjectAll).toHaveLength(maxUploads - numToDelete) + // Compare the keys names + expect(s3ObjectAll.length).not.toContain(objectsToDelete.map((o) => `${bucket.id}/${o.name}`)) + + // Check files are backed-up + const backupFiles = await storage.adapter.list(storageS3Bucket, { + prefix: `__internal/${tenantId}/${bucket.id}`, + }) + + expect(backupFiles.keys).toHaveLength(numToDelete) + }, 30000) +}) diff --git a/src/test/utils/seeds/bucket.ts b/src/test/utils/seeds/bucket.ts new file mode 100644 index 00000000..f0dcc124 --- /dev/null +++ b/src/test/utils/seeds/bucket.ts @@ -0,0 +1,55 @@ +// src/seeders/BucketsSeeder.ts + +import { Seeder } from '@internal/testing/seeder' +import { Bucket, Obj } from '@storage/schemas' +import { ObjectSeeder } from './object' + +export class BucketsSeeder extends Seeder { + /** + * Creates a specified number of buckets. + * @param count - Number of buckets to create. + * @param generator - Function to generate bucket data. + * @returns An array of created buckets. + */ + async createBuckets(count: number, generator: (n: number) => Bucket): Promise { + const buckets: Bucket[] = this.generateRecords(count, generator) + this.addRecords('buckets', buckets) + return buckets + } + + /** + * Creates objects associated with a given bucket. + * @param bucket - The parent bucket. + * @param count - Number of objects to create. + * @param generator - Function to generate object data. + * @returns An array of created objects. + */ + async createObjects( + bucket: Bucket, + count: number, + generator: (n: number) => Omit + ): Promise { + const objects: Obj[] = this.generateRecords(count, (n) => ({ + ...generator(n), + bucket_id: bucket.id, + })) + this.addRecords('objects', objects) + return objects + } + + /** + * Asynchronously iterates over a list of buckets and executes a callback for each. + * The callback receives an object containing child seeders. + * @param buckets - The list of buckets to iterate over. + * @param callback - The asynchronous callback to execute for each bucket. + */ + async each( + buckets: Bucket[], + callback: (context: { bucket: Bucket; objectSeeder: ObjectSeeder }) => Promise + ): Promise { + for (const bucket of buckets) { + const objectSeeder = new ObjectSeeder(this.persistence, bucket) + await callback({ bucket, objectSeeder }) + } + } +} diff --git a/src/test/utils/seeds/object.ts b/src/test/utils/seeds/object.ts new file mode 100644 index 00000000..267bdeef --- /dev/null +++ b/src/test/utils/seeds/object.ts @@ -0,0 +1,29 @@ +import { Seeder } from '@internal/testing/seeder' +import { Bucket, Obj } from '@storage/schemas' + +export class ObjectSeeder extends Seeder { + private bucket: Bucket + + constructor(persistence: Seeder['persistence'], bucket: Bucket) { + super(persistence) + this.bucket = bucket + } + + /** + * Creates a specified number of objects for the associated bucket. + * @param count - Number of objects to create. + * @param generator - Function to generate object data. + * @returns An array of created objects. + */ + async createObjects( + count: number, + generator: (n: number) => Omit + ): Promise { + const objects: Obj[] = this.generateRecords(count, (n) => ({ + ...generator(n), + bucket_id: this.bucket.id, + })) + this.addRecords('objects', objects) + return objects + } +} diff --git a/src/test/utils/seeds/seeder.ts b/src/test/utils/seeds/seeder.ts new file mode 100644 index 00000000..9bf1274f --- /dev/null +++ b/src/test/utils/seeds/seeder.ts @@ -0,0 +1,37 @@ +import { KnexPersistence } from '@internal/testing/seeder' +import { BucketsSeeder } from './bucket' +import { Knex } from 'knex' + +export class TestUtils { + private persistence: KnexPersistence + private bucketsSeeder: BucketsSeeder + + constructor(knexConfig: Knex.Config | Knex) { + this.persistence = new KnexPersistence(knexConfig) + this.bucketsSeeder = new BucketsSeeder(this.persistence) + } + + get buckets(): BucketsSeeder { + return this.bucketsSeeder + } + + /** + * Runs the seeding operations: + * 1. Executes the provided seeding function within a transaction. + * 2. After seeding, performs batch inserts for all collected records. + * @param operation - The seeding operations to execute. + */ + async runSeeder(operation: () => Promise): Promise { + await this.bucketsSeeder.runInTransaction(async () => { + await operation() + }) + await this.bucketsSeeder.batchInsertAll() + } + + /** + * Destroys the Knex instance to close database connections. + */ + async destroy(): Promise { + await this.persistence.destroy() + } +} diff --git a/src/test/utils/storage.ts b/src/test/utils/storage.ts new file mode 100644 index 00000000..f19eb2e1 --- /dev/null +++ b/src/test/utils/storage.ts @@ -0,0 +1,98 @@ +import { getPostgresConnection, getServiceKeyUser, TenantConnection } from '@internal/database' +import { Storage } from '@storage/storage' +import { createStorageBackend, StorageBackendAdapter } from '@storage/backend' +import { Database, StorageKnexDB } from '@storage/database' +import { ObjectScanner } from '@storage/scanner/scanner' +import { getConfig } from '../../config' +import { Uploader } from '@storage/uploader' +import { CreateBucketCommand, HeadBucketCommand, S3Client } from '@aws-sdk/client-s3' +import { isS3Error } from '@internal/errors' + +const { tenantId, storageBackendType } = getConfig() + +export function useStorage() { + let connection: TenantConnection + let storage: Storage + let adapter: StorageBackendAdapter + let database: Database + let scanner: ObjectScanner + let uploader: Uploader + + beforeAll(async () => { + const adminUser = await getServiceKeyUser(tenantId) + connection = await getPostgresConnection({ + tenantId, + user: adminUser, + superUser: adminUser, + host: 'localhost', + disableHostCheck: true, + }) + database = new StorageKnexDB(connection, { + tenantId, + host: 'localhost', + }) + adapter = createStorageBackend(storageBackendType) + storage = new Storage(adapter, database) + scanner = new ObjectScanner(storage) + uploader = new Uploader(adapter, database) + }) + + afterAll(async () => { + await connection.dispose() + }) + + return { + get connection() { + return connection + }, + get storage() { + return storage + }, + get adapter() { + return adapter + }, + get database() { + return database + }, + get scanner() { + return scanner + }, + get uploader() { + return uploader + }, + } +} + +export function createBucketIfNotExists(bucket: string, client: S3Client) { + return checkBucketExists(client, bucket).then((exists) => { + if (!exists) { + return createS3Bucket(bucket, client) + } + }) +} + +export function createS3Bucket(bucketName: string, client: S3Client) { + const createBucketCommand = new CreateBucketCommand({ + Bucket: bucketName, + }) + + return client.send(createBucketCommand) +} + +export const checkBucketExists = async (client: S3Client, bucket: string) => { + const options = { + Bucket: bucket, + } + + try { + await client.send(new HeadBucketCommand(options)) + return true + } catch (error) { + const err = error as Error + + if (err && isS3Error(err) && err.$metadata.httpStatusCode === 404) { + return false + } + throw error + } +}