Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Postgres] Revert "Use DECLARE CURSOR / FETCH for table snapshots." #160

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/clever-tomatoes-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres': patch
---

Revert Postgres snapshot strategy.
111 changes: 42 additions & 69 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,10 @@ WHERE oid = $1::regclass`,
logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`);
continue;
}
let tableLsnNotBefore: string;
await db.query('BEGIN');
try {
await this.snapshotTable(batch, db, table);

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
tableLsnNotBefore = rs.rows[0][0];
} finally {
// Read-only transaction, commit does not actually do anything.
await db.query('COMMIT');
}
await this.snapshotTable(batch, db, table);

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
const tableLsnNotBefore = rs.rows[0][0];
await batch.markSnapshotDone([table], tableLsnNotBefore);
await touch();
}
Expand All @@ -386,70 +378,51 @@ WHERE oid = $1::regclass`,
const estimatedCount = await this.estimatedCount(db, table);
let at = 0;
let lastLogIndex = 0;

// We do streaming on two levels:
// 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time.
// 2. Fine level: Stream chunks from each fetch call.
await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`);

const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` });
let columns: { i: number; name: string }[] = [];
let hasRemainingData = true;
while (hasRemainingData) {
// Fetch 10k at a time.
// The balance here is between latency overhead per FETCH call,
// and not spending too much time on each FETCH call.
// We aim for a couple of seconds on each FETCH call.
const cursor = db.stream({
statement: `FETCH 10000 FROM powersync_cursor`
});
hasRemainingData = false;
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.
// There are typically 100-200 rows per chunk.
for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
// We get a RowDescription for each FETCH call, but they should
// all be the same.
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
continue;
}

const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.

for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
hasRemainingData = true;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}
continue;
}

for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
});
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}

at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);

await touch();
for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}

at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);

await touch();
}

await batch.flush();
Expand Down
Loading