Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Modules] Move Write Checkpoint APIs #110

Merged
merged 15 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slow-stingrays-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': minor
---

Moved Write Checkpoint APIs to SyncBucketStorage
9 changes: 8 additions & 1 deletion packages/service-core/src/routes/endpoints/checkpointing.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger, router, schema } from '@powersync/lib-services-framework';
import * as t from 'ts-codec';

import * as framework from '@powersync/lib-services-framework';
import * as util from '../../util/util-index.js';
import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';
Expand Down Expand Up @@ -63,7 +64,13 @@ export const writeCheckpoint2 = routeDefinition({
storageEngine: { activeBucketStorage }
} = service_context;

const writeCheckpoint = await activeBucketStorage.createManagedWriteCheckpoint({
const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent();
if (!activeSyncRules) {
throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`);
}

using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules);
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
user_id: full_user_id,
heads: { '1': currentCheckpoint }
});
Expand Down
11 changes: 6 additions & 5 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import * as util from '../util/util-index.js';
import { ReplicationEventPayload } from './ReplicationEventPayload.js';
import { SourceEntityDescriptor } from './SourceEntity.js';
import { SourceTable } from './SourceTable.js';
import { BatchedCustomWriteCheckpointOptions, ReplicaId, WriteCheckpointAPI } from './storage-index.js';
import { BatchedCustomWriteCheckpointOptions, ReplicaId } from './storage-index.js';
import { SyncStorageWriteCheckpointAPI } from './WriteCheckpointAPI.js';

export interface BucketStorageFactoryListener extends DisposableListener {
syncStorageCreated: (storage: SyncRulesBucketStorage) => void;
replicationEvent: (event: ReplicationEventPayload) => void;
}

export interface BucketStorageFactory
extends DisposableObserverClient<BucketStorageFactoryListener>,
WriteCheckpointAPI {
export interface BucketStorageFactory extends DisposableObserverClient<BucketStorageFactoryListener> {
/**
* Update sync rules from configuration, if changed.
*/
Expand Down Expand Up @@ -206,7 +205,9 @@ export interface SyncRulesBucketStorageListener extends DisposableListener {
batchStarted: (batch: BucketStorageBatch) => void;
}

export interface SyncRulesBucketStorage extends DisposableObserverClient<SyncRulesBucketStorageListener> {
export interface SyncRulesBucketStorage
extends DisposableObserverClient<SyncRulesBucketStorageListener>,
SyncStorageWriteCheckpointAPI {
readonly group_id: number;
readonly slot_name: string;

Expand Down
44 changes: 6 additions & 38 deletions packages/service-core/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,7 @@ import { PowerSyncMongo, PowerSyncMongoOptions } from './mongo/db.js';
import { SyncRuleDocument, SyncRuleState } from './mongo/models.js';
import { MongoPersistedSyncRulesContent } from './mongo/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage } from './mongo/MongoSyncBucketStorage.js';
import { MongoWriteCheckpointAPI } from './mongo/MongoWriteCheckpointAPI.js';
import { generateSlotName } from './mongo/util.js';
import {
CustomWriteCheckpointOptions,
DEFAULT_WRITE_CHECKPOINT_MODE,
LastWriteCheckpointFilters,
ManagedWriteCheckpointOptions,
WriteCheckpointAPI,
WriteCheckpointMode
} from './write-checkpoint.js';

export interface MongoBucketStorageOptions extends PowerSyncMongoOptions {}

Expand All @@ -47,10 +38,6 @@ export class MongoBucketStorage
// TODO: This is still Postgres specific and needs to be reworked
public readonly slot_name_prefix: string;

readonly write_checkpoint_mode: WriteCheckpointMode;

protected readonly writeCheckpointAPI: WriteCheckpointAPI;

private readonly storageCache = new LRUCache<number, MongoSyncBucketStorage>({
max: 3,
fetchMethod: async (id) => {
Expand Down Expand Up @@ -78,19 +65,13 @@ export class MongoBucketStorage
db: PowerSyncMongo,
options: {
slot_name_prefix: string;
write_checkpoint_mode?: WriteCheckpointMode;
}
) {
super();
this.client = db.client;
this.db = db;
this.session = this.client.startSession();
this.slot_name_prefix = options.slot_name_prefix;
this.write_checkpoint_mode = options.write_checkpoint_mode ?? DEFAULT_WRITE_CHECKPOINT_MODE;
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
db,
mode: this.write_checkpoint_mode
});
}

getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage {
Expand Down Expand Up @@ -299,22 +280,6 @@ export class MongoBucketStorage
});
}

async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void> {
return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints);
}

async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise<bigint> {
return this.writeCheckpointAPI.createCustomWriteCheckpoint(options);
}

async createManagedWriteCheckpoint(options: ManagedWriteCheckpointOptions): Promise<bigint> {
return this.writeCheckpointAPI.createManagedWriteCheckpoint(options);
}

async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise<bigint | null> {
return this.writeCheckpointAPI.lastWriteCheckpoint(filters);
}

async getActiveCheckpoint(): Promise<ActiveCheckpoint> {
const doc = await this.db.sync_rules.findOne(
{
Expand Down Expand Up @@ -426,7 +391,7 @@ export class MongoBucketStorage
}
return (await this.storageCache.fetch(doc._id)) ?? null;
}
};
} satisfies ActiveCheckpoint;
}

/**
Expand Down Expand Up @@ -516,6 +481,7 @@ export class MongoBucketStorage
if (doc == null) {
continue;
}

const op = this.makeActiveCheckpoint(doc);
// Check for LSN / checkpoint changes - ignore other metadata changes
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
Expand Down Expand Up @@ -546,12 +512,14 @@ export class MongoBucketStorage
// 1. checkpoint (op_id) changes.
// 2. write checkpoint changes for the specific user
const bucketStorage = await cp.getBucketStorage();
if (!bucketStorage) {
continue;
}

const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};

const currentWriteCheckpoint = await this.lastWriteCheckpoint({
const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({
user_id,
sync_rules_id: bucketStorage?.group_id,
heads: {
...lsnFilters
}
Expand Down
26 changes: 2 additions & 24 deletions packages/service-core/src/storage/StorageEngine.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { DisposableListener, DisposableObserver, logger } from '@powersync/lib-services-framework';
import { ResolvedPowerSyncConfig } from '../util/util-index.js';
import { BucketStorageFactory } from './BucketStorage.js';
import { ActiveStorage, BucketStorageProvider, StorageSettings } from './StorageProvider.js';
import { DEFAULT_WRITE_CHECKPOINT_MODE } from './write-checkpoint.js';
import { ActiveStorage, BucketStorageProvider } from './StorageProvider.js';

export type StorageEngineOptions = {
configuration: ResolvedPowerSyncConfig;
};

export const DEFAULT_STORAGE_SETTINGS: StorageSettings = {
writeCheckpointMode: DEFAULT_WRITE_CHECKPOINT_MODE
};

export interface StorageEngineListener extends DisposableListener {
storageActivated: (storage: BucketStorageFactory) => void;
}
Expand All @@ -20,11 +15,9 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
// TODO: This will need to revisited when we actually support multiple storage providers.
private storageProviders: Map<string, BucketStorageProvider> = new Map();
private currentActiveStorage: ActiveStorage | null = null;
private _activeSettings: StorageSettings;

constructor(private options: StorageEngineOptions) {
super();
this._activeSettings = DEFAULT_STORAGE_SETTINGS;
}

get activeBucketStorage(): BucketStorageFactory {
Expand All @@ -39,20 +32,6 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
return this.currentActiveStorage;
}

get activeSettings(): StorageSettings {
return { ...this._activeSettings };
}

updateSettings(settings: Partial<StorageSettings>) {
if (this.currentActiveStorage) {
throw new Error(`Storage is already active, settings cannot be modified.`);
}
this._activeSettings = {
...this._activeSettings,
...settings
};
}

/**
* Register a provider which generates a {@link BucketStorageFactory}
* given the matching config specified in the loaded {@link ResolvedPowerSyncConfig}
Expand All @@ -65,8 +44,7 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
logger.info('Starting Storage Engine...');
const { configuration } = this.options;
this.currentActiveStorage = await this.storageProviders.get(configuration.storage.type)!.getStorage({
resolvedConfig: configuration,
...this.activeSettings
resolvedConfig: configuration
});
this.iterateListeners((cb) => cb.storageActivated?.(this.activeBucketStorage));
logger.info(`Successfully activated storage: ${configuration.storage.type}.`);
Expand Down
10 changes: 1 addition & 9 deletions packages/service-core/src/storage/StorageProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as util from '../util/util-index.js';
import { BucketStorageFactory } from './BucketStorage.js';
import { WriteCheckpointMode } from './write-checkpoint.js';

export interface ActiveStorage {
storage: BucketStorageFactory;
Expand All @@ -12,14 +11,7 @@ export interface ActiveStorage {
tearDown(): Promise<boolean>;
}

/**
* Settings which can be modified by various modules in their initialization.
*/
export interface StorageSettings {
writeCheckpointMode: WriteCheckpointMode;
}

export interface GetStorageOptions extends StorageSettings {
export interface GetStorageOptions {
// TODO: This should just be the storage config. Update once the slot name prefix coupling has been removed from the storage
resolvedConfig: util.ResolvedPowerSyncConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ export enum WriteCheckpointMode {
* Raw mappings of `user_id` to `write_checkpoint`s should
* be supplied for each set of sync rules.
*/
CUSTOM = 'manual',
CUSTOM = 'custom',
/**
* Write checkpoints are stored as a mapping of `user_id` plus
* replication HEAD (lsn in Postgres) to an automatically generated
Expand All @@ -26,19 +26,19 @@ export interface CustomWriteCheckpointFilters extends BaseWriteCheckpointIdentif
sync_rules_id: number;
}

export interface CustomWriteCheckpointOptions extends CustomWriteCheckpointFilters {
export interface BatchedCustomWriteCheckpointOptions extends BaseWriteCheckpointIdentifier {
/**
* A supplied incrementing Write Checkpoint number
*/
checkpoint: bigint;
}

/**
* Options for creating a custom Write Checkpoint in a batch.
* A {@link BucketStorageBatch} is already associated with a Sync Rules instance.
* The `sync_rules_id` is not required here.
*/
export type BatchedCustomWriteCheckpointOptions = Omit<CustomWriteCheckpointOptions, 'sync_rules_id'>;
export interface CustomWriteCheckpointOptions extends BatchedCustomWriteCheckpointOptions {
/**
* Sync rules which were active when this checkpoint was created.
*/
sync_rules_id: number;
}

/**
* Managed Write Checkpoints are a mapping of User ID to replication HEAD
Expand All @@ -52,15 +52,33 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti

export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters;

export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters;
export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters;

export interface WriteCheckpointAPI {
batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void>;

createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise<bigint>;

export interface BaseWriteCheckpointAPI {
readonly writeCheckpointMode: WriteCheckpointMode;
setWriteCheckpointMode(mode: WriteCheckpointMode): void;
createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise<bigint>;
}

/**
* Write Checkpoint API to be used in conjunction with a {@link SyncRulesBucketStorage}.
* This storage corresponds with a set of sync rules. These APIs don't require specifying a
* sync rules id.
*/
export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI {
batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise<void>;
createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise<bigint>;
lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise<bigint | null>;
}

/**
* Write Checkpoint API which is interfaced directly with the storage layer. This requires
* sync rules identifiers for custom write checkpoints.
*/
export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI {
batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void>;
createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise<bigint>;
lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise<bigint | null>;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
SaveOptions
} from '../BucketStorage.js';
import { SourceTable } from '../SourceTable.js';
import { CustomWriteCheckpointOptions } from '../write-checkpoint.js';
import { BatchedCustomWriteCheckpointOptions, CustomWriteCheckpointOptions } from '../WriteCheckpointAPI.js';
import { PowerSyncMongo } from './db.js';
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
import { MongoIdSequence } from './MongoIdSequence.js';
Expand Down Expand Up @@ -83,7 +83,7 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
this.sync_rules = sync_rules;
}

addCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): void {
addCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): void {
this.write_checkpoint_batch.push({
...checkpoint,
sync_rules_id: this.group_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ export class MongoStorageProvider implements BucketStorageProvider {
return {
storage: new MongoBucketStorage(database, {
// TODO currently need the entire resolved config due to this
slot_name_prefix: resolvedConfig.slot_name_prefix,
write_checkpoint_mode: options.writeCheckpointMode
slot_name_prefix: resolvedConfig.slot_name_prefix
}),
shutDown: () => client.close(),
tearDown: () => {
Expand Down
Loading
Loading