From f6e94a106de50680a3c1178294169e1b605709e6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Jun 2024 15:45:23 +0200 Subject: [PATCH] Incrementally compute checksums and include zero checksums. --- .../service-core/src/storage/BucketStorage.ts | 11 ++- .../storage/mongo/MongoSyncBucketStorage.ts | 17 +++- packages/service-core/src/sync/sync.ts | 42 +++++---- packages/service-core/src/util/utils.ts | 86 +++++++++++++++---- .../test/src/__snapshots__/sync.test.ts.snap | 16 +++- .../test/src/data_storage.test.ts | 6 +- 6 files changed, 138 insertions(+), 40 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 091bdda9..99754d4a 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -230,7 +230,16 @@ export interface SyncRulesBucketStorage { options?: BucketDataBatchOptions ): AsyncIterable; - getChecksums(checkpoint: util.OpId, buckets: string[]): Promise; + /** + * Compute checksums for a given list of buckets. + * + * If fromCheckpoint is specified, the result is a diff. Otherwise, it is the full checksum. + */ + getChecksums( + checkpoint: util.OpId, + fromCheckpoint: util.OpId | null, + buckets: string[] + ): Promise; /** * Terminate the sync rules. diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 012a22f5..d07f10ac 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -316,10 +316,21 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { } } - async getChecksums(checkpoint: util.OpId, buckets: string[]): Promise { + async getChecksums( + checkpoint: util.OpId, + fromCheckpoint: util.OpId | null, + buckets: string[] + ): Promise { if (buckets.length == 0) { return []; } + + if (fromCheckpoint == checkpoint) { + return []; + } + + const start = fromCheckpoint ? BigInt(fromCheckpoint) : new bson.MinKey(); + const filters: any[] = []; for (let name of buckets) { filters.push({ @@ -327,7 +338,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { $gt: { g: this.group_id, b: name, - o: new bson.MinKey() + o: start }, $lte: { g: this.group_id, @@ -358,7 +369,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { return { bucket: doc._id, count: doc.count, - checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 4294967295 + checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff }; }); } diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 618a21a7..032eda9b 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -78,8 +78,8 @@ async function* streamResponseInner( // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). let dataBuckets = new Map(); - let last_checksums: util.BucketChecksum[] | null = null; - let last_write_checkpoint: bigint | null = null; + let lastChecksums: { checkpoint: util.OpId; checksums: Map } | null = null; + let lastWriteCheckpoint: bigint | null = null; const { raw_data, binary_data } = params; @@ -113,23 +113,22 @@ async function* streamResponseInner( throw new Error(`Too many buckets: ${allBuckets.length}`); } - let checksums: util.BucketChecksum[] | undefined = undefined; - let dataBucketsNew = new Map(); for (let bucket of allBuckets) { dataBucketsNew.set(bucket, dataBuckets.get(bucket) ?? '0'); } dataBuckets = dataBucketsNew; - checksums = await storage.getChecksums(checkpoint, [...dataBuckets.keys()]); + const bucketList = [...dataBuckets.keys()]; + const checksumDiff = await storage.getChecksums(checkpoint, lastChecksums?.checkpoint ?? null, bucketList); - if (last_checksums) { - const diff = util.checksumsDiff(last_checksums, checksums); + if (lastChecksums) { + const diff = util.checksumsDiff(lastChecksums.checksums, bucketList, checksumDiff); if ( - last_write_checkpoint == writeCheckpoint && - diff.removed_buckets.length == 0 && - diff.updated_buckets.length == 0 + lastWriteCheckpoint == writeCheckpoint && + diff.removedBuckets.length == 0 && + diff.updatedBuckets.length == 0 ) { // No changes - don't send anything to the client continue; @@ -137,20 +136,32 @@ async function* streamResponseInner( let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; - message += `updated: ${limitedBuckets(diff.updated_buckets, 20)} | `; - message += `removed: ${limitedBuckets(diff.removed_buckets, 20)} | `; + message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; + message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `; micro.logger.info(message); const checksum_line: util.StreamingSyncCheckpointDiff = { checkpoint_diff: { last_op_id: checkpoint, write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - ...diff + removed_buckets: diff.removedBuckets, + updated_buckets: diff.updatedBuckets } }; yield checksum_line; + + lastChecksums = { + checkpoint, + checksums: diff.nextBuckets + }; } else { + const nextBuckets = util.fillEmptyChecksums(bucketList, checksumDiff); + lastChecksums = { + checkpoint, + checksums: nextBuckets + }; + let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; micro.logger.info(message); @@ -158,14 +169,13 @@ async function* streamResponseInner( checkpoint: { last_op_id: checkpoint, write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - buckets: checksums + buckets: [...nextBuckets.values()] } }; yield checksum_line; } - last_checksums = checksums; - last_write_checkpoint = writeCheckpoint; + lastWriteCheckpoint = writeCheckpoint; yield* bucketDataInBatches(storage, checkpoint, dataBuckets, raw_data, binary_data, signal); diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 4a7bfe46..d5533e8d 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -30,29 +30,85 @@ export function timestampToOpId(ts: bigint): OpId { return ts.toString(10); } -export function checksumsDiff(previous: BucketChecksum[], current: BucketChecksum[]) { - const updated_buckets: BucketChecksum[] = []; +export function fillEmptyChecksums(currentBuckets: string[], checksums: BucketChecksum[]) { + // All current values + const nextBuckets = new Map(); - const previousBuckets = new Map(); - for (let checksum of previous) { - previousBuckets.set(checksum.bucket, checksum); + for (let checksum of checksums) { + // Added checksum + nextBuckets.set(checksum.bucket, checksum); } - for (let checksum of current) { - if (!previousBuckets.has(checksum.bucket)) { - updated_buckets.push(checksum); + + for (let bucket of currentBuckets) { + if (!nextBuckets.has(bucket)) { + // Empty diff - empty bucket + const checksum: BucketChecksum = { + bucket, + checksum: 0, + count: 0 + }; + nextBuckets.set(bucket, checksum); + } + } + + return nextBuckets; +} + +export function checksumsDiff( + previousBuckets: Map, + currentBuckets: string[], + checksumDiff: BucketChecksum[] +) { + // All changed ones + const updatedBuckets = new Map(); + // All current values + const nextBuckets = new Map(); + + for (let cdiff of checksumDiff) { + const p = previousBuckets.get(cdiff.bucket); + if (p == null) { + // Added + updatedBuckets.set(cdiff.bucket, cdiff); + nextBuckets.set(cdiff.bucket, cdiff); } else { - const p = previousBuckets.get(checksum.bucket); - if (p?.checksum != checksum.checksum || p?.count != checksum.count) { - updated_buckets.push(checksum); + // Updated + const checksum: BucketChecksum = { + bucket: cdiff.bucket, + count: p.count + cdiff.count, + checksum: (p.checksum + cdiff.checksum) & 0xffffffff + }; + updatedBuckets.set(checksum.bucket, checksum); + nextBuckets.set(checksum.bucket, checksum); + previousBuckets.delete(cdiff.bucket); + } + } + + for (let bucket of currentBuckets) { + if (!updatedBuckets.has(bucket)) { + // Empty diff - either empty bucket, or unchanged + const p = previousBuckets.get(bucket); + if (p == null) { + // Emtpy bucket + const checksum: BucketChecksum = { + bucket, + checksum: 0, + count: 0 + }; + updatedBuckets.set(bucket, checksum); + nextBuckets.set(bucket, checksum); + } else { + // Unchanged bucket + nextBuckets.set(bucket, p); + previousBuckets.delete(bucket); } - previousBuckets.delete(checksum.bucket); } } - const removed_buckets: string[] = [...previousBuckets.keys()]; + const removedBuckets: string[] = [...previousBuckets.keys()]; return { - updated_buckets, - removed_buckets + updatedBuckets: [...updatedBuckets.values()], + removedBuckets, + nextBuckets }; } diff --git a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap index 56160802..17648be3 100644 --- a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap +++ b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap @@ -12,7 +12,13 @@ exports[`sync - mongodb > expiring token 1`] = ` [ { "checkpoint": { - "buckets": [], + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": 0, + "count": 0, + }, + ], "last_op_id": "0", "write_checkpoint": undefined, }, @@ -135,7 +141,13 @@ exports[`sync - mongodb > sync updates to global data 1`] = ` [ { "checkpoint": { - "buckets": [], + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": 0, + "count": 0, + }, + ], "last_op_id": "0", "write_checkpoint": undefined, }, diff --git a/packages/service-core/test/src/data_storage.test.ts b/packages/service-core/test/src/data_storage.test.ts index c25d32cd..dc9cf1d1 100644 --- a/packages/service-core/test/src/data_storage.test.ts +++ b/packages/service-core/test/src/data_storage.test.ts @@ -252,7 +252,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]', @@ -599,7 +599,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]', @@ -713,7 +713,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]',