Skip to content

Commit

Permalink
Add test for postgres storage
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Feb 6, 2025
1 parent 3904106 commit ab8da19
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
6 changes: 3 additions & 3 deletions modules/module-mongodb/src/common/MongoLSN.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ export type MongoLSNSpecification = {
timestamp: mongo.Timestamp;
/**
* The ResumeToken type here is an alias for `unknown`.
* The docs mention the contents should be of the form.
* We use BSON serialization to store the resume token.
* The docs mention the contents should be of the form:
* ```typescript
* {
* "_data" : <BinData|string>
* }
* ```
* We use BSON serialization to store the resume token.
*/
resume_token?: mongo.ResumeToken;
};
Expand All @@ -22,7 +22,7 @@ const DELIMINATOR = '|';

/**
* Represent a Logical Sequence Number (LSN) for MongoDB replication sources.
* This stores a combination of the cluster timestamp and optional change stream resume token.
* This stores a combination of the cluster timestamp and optional Change Stream resume token.
*/
export class MongoLSN {
static fromSerialized(comparable: string): MongoLSN {
Expand Down
36 changes: 22 additions & 14 deletions modules/module-mongodb/test/src/resume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ import { MongoLSN, ZERO_LSN } from '@module/common/MongoLSN.js';
import { MongoManager } from '@module/replication/MongoManager.js';
import { normalizeConnectionConfig } from '@module/types/types.js';
import { isMongoServerError, mongo } from '@powersync/lib-service-mongodb';
import { BucketStorageFactory, TestStorageOptions } from '@powersync/service-core';
import { describe, expect, test, vi } from 'vitest';
import { ChangeStreamTestContext } from './change_stream_utils.js';
import { env } from './env.js';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js';

describe('mongo resume', () => {
describe('mongo lsn', () => {
test('LSN with resume tokens should be comparable', () => {
// Values without a resume token should be comparable
expect(
Expand Down Expand Up @@ -81,19 +82,26 @@ describe('mongo resume', () => {
}).comparable.split('|')[0] // Simulate an old LSN
).true;
});
});

describe.skipIf(!env.TEST_MONGO_STORAGE)('MongoDB resume - mongo storage', () => {
defineResumeTest(INITIALIZED_MONGO_STORAGE_FACTORY);
});

/**
* This test is not specific to the storage. For simplicity it only tests against MongoDB storage.
*/
test.skipIf(!env.TEST_MONGO_STORAGE)('resuming with a different source database', async () => {
await using context = await ChangeStreamTestContext.open(INITIALIZED_MONGO_STORAGE_FACTORY);
describe.skipIf(!env.TEST_POSTGRES_STORAGE)('MongoDB resume - postgres storage', () => {
defineResumeTest(INITIALIZED_POSTGRES_STORAGE_FACTORY);
});

function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Promise<BucketStorageFactory>) {
test('resuming with a different source database', async () => {
await using context = await ChangeStreamTestContext.open(factoryGenerator);
const { db } = context;

await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT _id as id, description, num FROM "test_data"`);
await context.updateSyncRules(/* yaml */
` bucket_definitions:
global:
data:
- SELECT _id as id, description, num FROM "test_data"`);

await context.replicateSnapshot();

Expand Down Expand Up @@ -129,7 +137,7 @@ describe('mongo resume', () => {
uri: url.toString()
})
);
const factory = await INITIALIZED_MONGO_STORAGE_FACTORY({ doNotClear: true });
const factory = await factoryGenerator({ doNotClear: true });

// Create a new context without updating the sync rules
await using context2 = new ChangeStreamTestContext(factory, connectionManager);
Expand All @@ -141,4 +149,4 @@ describe('mongo resume', () => {
// The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError
expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError'));
});
});
}

0 comments on commit ab8da19

Please # to comment.