diff --git a/modules/module-mongodb/src/common/MongoLSN.ts b/modules/module-mongodb/src/common/MongoLSN.ts index 7238dcdf..a91fc3f1 100644 --- a/modules/module-mongodb/src/common/MongoLSN.ts +++ b/modules/module-mongodb/src/common/MongoLSN.ts @@ -5,13 +5,13 @@ export type MongoLSNSpecification = { timestamp: mongo.Timestamp; /** * The ResumeToken type here is an alias for `unknown`. - * The docs mention the contents should be of the form. - * We use BSON serialization to store the resume token. + * The docs mention the contents should be of the form: * ```typescript * { * "_data" : * } * ``` + * We use BSON serialization to store the resume token. */ resume_token?: mongo.ResumeToken; }; @@ -22,7 +22,7 @@ const DELIMINATOR = '|'; /** * Represent a Logical Sequence Number (LSN) for MongoDB replication sources. - * This stores a combination of the cluster timestamp and optional change stream resume token. + * This stores a combination of the cluster timestamp and optional Change Stream resume token. */ export class MongoLSN { static fromSerialized(comparable: string): MongoLSN { diff --git a/modules/module-mongodb/test/src/resume.test.ts b/modules/module-mongodb/test/src/resume.test.ts index 6af7571c..f5e3944a 100644 --- a/modules/module-mongodb/test/src/resume.test.ts +++ b/modules/module-mongodb/test/src/resume.test.ts @@ -3,12 +3,13 @@ import { MongoLSN, ZERO_LSN } from '@module/common/MongoLSN.js'; import { MongoManager } from '@module/replication/MongoManager.js'; import { normalizeConnectionConfig } from '@module/types/types.js'; import { isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { BucketStorageFactory, TestStorageOptions } from '@powersync/service-core'; import { describe, expect, test, vi } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; import { env } from './env.js'; -import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js'; -describe('mongo resume', () => { +describe('mongo lsn', () => { test('LSN with resume tokens should be comparable', () => { // Values without a resume token should be comparable expect( @@ -81,19 +82,26 @@ describe('mongo resume', () => { }).comparable.split('|')[0] // Simulate an old LSN ).true; }); +}); + +describe.skipIf(!env.TEST_MONGO_STORAGE)('MongoDB resume - mongo storage', () => { + defineResumeTest(INITIALIZED_MONGO_STORAGE_FACTORY); +}); - /** - * This test is not specific to the storage. For simplicity it only tests against MongoDB storage. - */ - test.skipIf(!env.TEST_MONGO_STORAGE)('resuming with a different source database', async () => { - await using context = await ChangeStreamTestContext.open(INITIALIZED_MONGO_STORAGE_FACTORY); +describe.skipIf(!env.TEST_POSTGRES_STORAGE)('MongoDB resume - postgres storage', () => { + defineResumeTest(INITIALIZED_POSTGRES_STORAGE_FACTORY); +}); + +function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Promise) { + test('resuming with a different source database', async () => { + await using context = await ChangeStreamTestContext.open(factoryGenerator); const { db } = context; - await context.updateSyncRules(` - bucket_definitions: - global: - data: - - SELECT _id as id, description, num FROM "test_data"`); + await context.updateSyncRules(/* yaml */ + ` bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); await context.replicateSnapshot(); @@ -129,7 +137,7 @@ describe('mongo resume', () => { uri: url.toString() }) ); - const factory = await INITIALIZED_MONGO_STORAGE_FACTORY({ doNotClear: true }); + const factory = await factoryGenerator({ doNotClear: true }); // Create a new context without updating the sync rules await using context2 = new ChangeStreamTestContext(factory, connectionManager); @@ -141,4 +149,4 @@ describe('mongo resume', () => { // The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError')); }); -}); +}