Skip to content

Commit

Permalink
Refactor watchWriteCheckpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 6, 2025
1 parent b5d3a8b commit d2e1051
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 19 deletions.
12 changes: 9 additions & 3 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache/min';
import * as timers from 'timers/promises';

import { storage, sync, utils } from '@powersync/service-core';
import { storage, sync, utils, WatchWriteCheckpointOptions } from '@powersync/service-core';

import { DisposableObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -492,7 +492,8 @@ export class MongoBucketStorage
/**
* User-specific watch on the latest checkpoint and/or write checkpoint.
*/
async *watchWriteCheckpoint(user_id: string, signal: AbortSignal): AsyncIterable<storage.WriteCheckpoint> {
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpoint> {
const { user_id, signal, filter } = options;
let lastCheckpoint: utils.OpId | null = null;
let lastWriteCheckpoint: bigint | null = null;

Expand Down Expand Up @@ -529,7 +530,12 @@ export class MongoBucketStorage
lastWriteCheckpoint = currentWriteCheckpoint;
lastCheckpoint = checkpoint;

yield { base: cp, writeCheckpoint: currentWriteCheckpoint };
yield {
base: cp,
writeCheckpoint: currentWriteCheckpoint,
storage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(options.parseOptions)
};
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as framework from '@powersync/lib-services-framework';
import { storage, sync, utils } from '@powersync/service-core';
import { storage, sync, utils, WatchWriteCheckpointOptions } from '@powersync/service-core';
import * as pg_wire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import crypto from 'crypto';
Expand Down Expand Up @@ -392,10 +392,12 @@ export class PostgresBucketStorageFactory
return this.makeActiveCheckpoint(activeCheckpoint);
}

async *watchWriteCheckpoint(user_id: string, signal: AbortSignal): AsyncIterable<storage.WriteCheckpoint> {
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpoint> {
let lastCheckpoint: utils.OpId | null = null;
let lastWriteCheckpoint: bigint | null = null;

const { signal, user_id, parseOptions } = options;

const iter = wrapWithAbort(this.sharedIterator, signal);
for await (const cp of iter) {
const { checkpoint, lsn } = cp;
Expand All @@ -409,6 +411,8 @@ export class PostgresBucketStorageFactory
continue;
}

const syncRules = bucketStorage.getParsedSyncRules(parseOptions);

const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};

const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({
Expand All @@ -429,7 +433,7 @@ export class PostgresBucketStorageFactory
lastWriteCheckpoint = currentWriteCheckpoint;
lastCheckpoint = checkpoint;

yield { base: cp, writeCheckpoint: currentWriteCheckpoint };
yield { base: cp, writeCheckpoint: currentWriteCheckpoint, syncRules, storage: bucketStorage };
}
}

Expand Down
21 changes: 20 additions & 1 deletion packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient<Buck
/**
* Yields the latest user write checkpoint whenever the sync checkpoint updates.
*/
watchWriteCheckpoint(user_id: string, signal: AbortSignal): AsyncIterable<WriteCheckpoint>;
watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<WriteCheckpoint>;

/**
* Get storage size of active sync rules.
Expand All @@ -150,6 +150,22 @@ export interface ReplicationCheckpoint {
readonly lsn: string | null;
}

export interface WatchWriteCheckpointOptions {
/** user_id and client_id combined. */
user_id: string;

signal: AbortSignal;

parseOptions: ParseSyncRulesOptions;

filter?: (event: WatchFilterEvent) => boolean;
}

export interface WatchFilterEvent {
bucket?: string;
invalidate?: boolean;
}

export interface ActiveCheckpoint extends ReplicationCheckpoint {
hasSyncRules(): boolean;

Expand All @@ -159,6 +175,9 @@ export interface ActiveCheckpoint extends ReplicationCheckpoint {
export interface WriteCheckpoint {
base: ActiveCheckpoint;
writeCheckpoint: bigint | null;

syncRules: SqlSyncRules;
storage: SyncRulesBucketStorage;
}

export interface StorageMetrics {
Expand Down
17 changes: 5 additions & 12 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async function* streamResponseInner(
}

const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id);
const stream = storage.watchWriteCheckpoint(checkpointUserId, signal);
const stream = storage.watchWriteCheckpoint({ user_id: checkpointUserId, signal, parseOptions });

const checksumState = new BucketChecksumState(parseOptions, syncParams, initialBuckets);

Expand All @@ -121,7 +121,7 @@ async function* streamResponseInner(
continue;
}

const { checkpointLine, bucketsToFetch, storage, currentBucketPositions } = line;
const { checkpointLine, bucketsToFetch, currentBucketPositions } = line;

yield checkpointLine;

Expand All @@ -135,7 +135,7 @@ async function* streamResponseInner(

const bucketDataWithPriority = (endIndex?: number) => {
return bucketDataInBatches({
storage,
storage: next.storage,
checkpoint: next.base.checkpoint,
bucketsToFetch: bucketsToFetch.slice(firstBucketInSamePriority, endIndex),
dataBuckets: currentBucketPositions,
Expand Down Expand Up @@ -362,7 +362,6 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number)
interface CheckpointLine {
checkpointLine: util.StreamingSyncCheckpointDiff | util.StreamingSyncCheckpoint;
bucketsToFetch: BucketDescription[];
storage: storage.SyncRulesBucketStorage;
currentBucketPositions: Map<string, BucketSyncState>;
}

Expand Down Expand Up @@ -392,13 +391,8 @@ export class BucketChecksumState {
const { writeCheckpoint, base } = next;
const user_id = this.parameterState.syncParams.user_id;

const storage = await base.getBucketStorage();
if (storage == null) {
// Sync rules not available / deleted in the meantime - try again with the next checkpoint.
return null;
}

const syncRules = storage.getParsedSyncRules(this.parseOptions);
const storage = next.storage;
const syncRules = next.syncRules;

const { buckets: allBuckets } = await this.parameterState.getCheckpointUpdate(syncRules, storage, base.checkpoint);

Expand Down Expand Up @@ -480,7 +474,6 @@ export class BucketChecksumState {
return {
checkpointLine,
bucketsToFetch,
storage,
currentBucketPositions: this.dataBuckets
};
}
Expand Down

0 comments on commit d2e1051

Please # to comment.