Skip to content

Commit

Permalink
Incrementally compute checksums and include zero checksums.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Jun 10, 2024
1 parent 511f6d7 commit f6e94a1
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 40 deletions.
11 changes: 10 additions & 1 deletion packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,16 @@ export interface SyncRulesBucketStorage {
options?: BucketDataBatchOptions
): AsyncIterable<util.SyncBucketData>;

getChecksums(checkpoint: util.OpId, buckets: string[]): Promise<util.BucketChecksum[]>;
/**
* Compute checksums for a given list of buckets.
*
* If fromCheckpoint is specified, the result is a diff. Otherwise, it is the full checksum.
*/
getChecksums(
checkpoint: util.OpId,
fromCheckpoint: util.OpId | null,
buckets: string[]
): Promise<util.BucketChecksum[]>;

/**
* Terminate the sync rules.
Expand Down
17 changes: 14 additions & 3 deletions packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,29 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
}
}

async getChecksums(checkpoint: util.OpId, buckets: string[]): Promise<util.BucketChecksum[]> {
async getChecksums(
checkpoint: util.OpId,
fromCheckpoint: util.OpId | null,
buckets: string[]
): Promise<util.BucketChecksum[]> {
if (buckets.length == 0) {
return [];
}

if (fromCheckpoint == checkpoint) {
return [];
}

const start = fromCheckpoint ? BigInt(fromCheckpoint) : new bson.MinKey();

const filters: any[] = [];
for (let name of buckets) {
filters.push({
_id: {
$gt: {
g: this.group_id,
b: name,
o: new bson.MinKey()
o: start
},
$lte: {
g: this.group_id,
Expand Down Expand Up @@ -358,7 +369,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
return {
bucket: doc._id,
count: doc.count,
checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 4294967295
checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff
};
});
}
Expand Down
42 changes: 26 additions & 16 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async function* streamResponseInner(
// This starts with the state from the client. May contain buckets that the user do not have access to (anymore).
let dataBuckets = new Map<string, string>();

let last_checksums: util.BucketChecksum[] | null = null;
let last_write_checkpoint: bigint | null = null;
let lastChecksums: { checkpoint: util.OpId; checksums: Map<string, util.BucketChecksum> } | null = null;
let lastWriteCheckpoint: bigint | null = null;

const { raw_data, binary_data } = params;

Expand Down Expand Up @@ -113,59 +113,69 @@ async function* streamResponseInner(
throw new Error(`Too many buckets: ${allBuckets.length}`);
}

let checksums: util.BucketChecksum[] | undefined = undefined;

let dataBucketsNew = new Map<string, string>();
for (let bucket of allBuckets) {
dataBucketsNew.set(bucket, dataBuckets.get(bucket) ?? '0');
}
dataBuckets = dataBucketsNew;

checksums = await storage.getChecksums(checkpoint, [...dataBuckets.keys()]);
const bucketList = [...dataBuckets.keys()];
const checksumDiff = await storage.getChecksums(checkpoint, lastChecksums?.checkpoint ?? null, bucketList);

if (last_checksums) {
const diff = util.checksumsDiff(last_checksums, checksums);
if (lastChecksums) {
const diff = util.checksumsDiff(lastChecksums.checksums, bucketList, checksumDiff);

if (
last_write_checkpoint == writeCheckpoint &&
diff.removed_buckets.length == 0 &&
diff.updated_buckets.length == 0
lastWriteCheckpoint == writeCheckpoint &&
diff.removedBuckets.length == 0 &&
diff.updatedBuckets.length == 0
) {
// No changes - don't send anything to the client
continue;
}

let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} | `;
message += `updated: ${limitedBuckets(diff.updated_buckets, 20)} | `;
message += `removed: ${limitedBuckets(diff.removed_buckets, 20)} | `;
message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `;
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `;
micro.logger.info(message);

const checksum_line: util.StreamingSyncCheckpointDiff = {
checkpoint_diff: {
last_op_id: checkpoint,
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
...diff
removed_buckets: diff.removedBuckets,
updated_buckets: diff.updatedBuckets
}
};

yield checksum_line;

lastChecksums = {
checkpoint,
checksums: diff.nextBuckets
};
} else {
const nextBuckets = util.fillEmptyChecksums(bucketList, checksumDiff);
lastChecksums = {
checkpoint,
checksums: nextBuckets
};

let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`;
micro.logger.info(message);
const checksum_line: util.StreamingSyncCheckpoint = {
checkpoint: {
last_op_id: checkpoint,
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
buckets: checksums
buckets: [...nextBuckets.values()]
}
};
yield checksum_line;
}

last_checksums = checksums;
last_write_checkpoint = writeCheckpoint;
lastWriteCheckpoint = writeCheckpoint;

yield* bucketDataInBatches(storage, checkpoint, dataBuckets, raw_data, binary_data, signal);

Expand Down
86 changes: 71 additions & 15 deletions packages/service-core/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,85 @@ export function timestampToOpId(ts: bigint): OpId {
return ts.toString(10);
}

export function checksumsDiff(previous: BucketChecksum[], current: BucketChecksum[]) {
const updated_buckets: BucketChecksum[] = [];
export function fillEmptyChecksums(currentBuckets: string[], checksums: BucketChecksum[]) {
// All current values
const nextBuckets = new Map<string, BucketChecksum>();

const previousBuckets = new Map<string, BucketChecksum>();
for (let checksum of previous) {
previousBuckets.set(checksum.bucket, checksum);
for (let checksum of checksums) {
// Added checksum
nextBuckets.set(checksum.bucket, checksum);
}
for (let checksum of current) {
if (!previousBuckets.has(checksum.bucket)) {
updated_buckets.push(checksum);

for (let bucket of currentBuckets) {
if (!nextBuckets.has(bucket)) {
// Empty diff - empty bucket
const checksum: BucketChecksum = {
bucket,
checksum: 0,
count: 0
};
nextBuckets.set(bucket, checksum);
}
}

return nextBuckets;
}

export function checksumsDiff(
previousBuckets: Map<string, BucketChecksum>,
currentBuckets: string[],
checksumDiff: BucketChecksum[]
) {
// All changed ones
const updatedBuckets = new Map<string, BucketChecksum>();
// All current values
const nextBuckets = new Map<string, BucketChecksum>();

for (let cdiff of checksumDiff) {
const p = previousBuckets.get(cdiff.bucket);
if (p == null) {
// Added
updatedBuckets.set(cdiff.bucket, cdiff);
nextBuckets.set(cdiff.bucket, cdiff);
} else {
const p = previousBuckets.get(checksum.bucket);
if (p?.checksum != checksum.checksum || p?.count != checksum.count) {
updated_buckets.push(checksum);
// Updated
const checksum: BucketChecksum = {
bucket: cdiff.bucket,
count: p.count + cdiff.count,
checksum: (p.checksum + cdiff.checksum) & 0xffffffff
};
updatedBuckets.set(checksum.bucket, checksum);
nextBuckets.set(checksum.bucket, checksum);
previousBuckets.delete(cdiff.bucket);
}
}

for (let bucket of currentBuckets) {
if (!updatedBuckets.has(bucket)) {
// Empty diff - either empty bucket, or unchanged
const p = previousBuckets.get(bucket);
if (p == null) {
// Emtpy bucket
const checksum: BucketChecksum = {
bucket,
checksum: 0,
count: 0
};
updatedBuckets.set(bucket, checksum);
nextBuckets.set(bucket, checksum);
} else {
// Unchanged bucket
nextBuckets.set(bucket, p);
previousBuckets.delete(bucket);
}
previousBuckets.delete(checksum.bucket);
}
}

const removed_buckets: string[] = [...previousBuckets.keys()];
const removedBuckets: string[] = [...previousBuckets.keys()];
return {
updated_buckets,
removed_buckets
updatedBuckets: [...updatedBuckets.values()],
removedBuckets,
nextBuckets
};
}

Expand Down
16 changes: 14 additions & 2 deletions packages/service-core/test/src/__snapshots__/sync.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ exports[`sync - mongodb > expiring token 1`] = `
[
{
"checkpoint": {
"buckets": [],
"buckets": [
{
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
},
],
"last_op_id": "0",
"write_checkpoint": undefined,
},
Expand Down Expand Up @@ -135,7 +141,13 @@ exports[`sync - mongodb > sync updates to global data 1`] = `
[
{
"checkpoint": {
"buckets": [],
"buckets": [
{
"bucket": "mybucket[]",
"checksum": 0,
"count": 0,
},
],
"last_op_id": "0",
"write_checkpoint": undefined,
},
Expand Down
6 changes: 3 additions & 3 deletions packages/service-core/test/src/data_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ bucket_definitions:
{ op: 'REMOVE', object_id: 'test1', checksum: c2 }
]);

const checksums = await storage.getChecksums(checkpoint, ['global[]']);
const checksums = await storage.getChecksums(checkpoint, null, ['global[]']);
expect(checksums).toEqual([
{
bucket: 'global[]',
Expand Down Expand Up @@ -599,7 +599,7 @@ bucket_definitions:
{ op: 'REMOVE', object_id: 'test1', checksum: c2 }
]);

const checksums = await storage.getChecksums(checkpoint, ['global[]']);
const checksums = await storage.getChecksums(checkpoint, null, ['global[]']);
expect(checksums).toEqual([
{
bucket: 'global[]',
Expand Down Expand Up @@ -713,7 +713,7 @@ bucket_definitions:
{ op: 'REMOVE', object_id: 'test1', checksum: c2 }
]);

const checksums = await storage.getChecksums(checkpoint, ['global[]']);
const checksums = await storage.getChecksums(checkpoint, null, ['global[]']);
expect(checksums).toEqual([
{
bucket: 'global[]',
Expand Down

0 comments on commit f6e94a1

Please # to comment.