diff --git a/.changeset/clever-tomatoes-sip.md b/.changeset/clever-tomatoes-sip.md new file mode 100644 index 00000000..10feef5b --- /dev/null +++ b/.changeset/clever-tomatoes-sip.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres': patch +--- + +Revert Postgres snapshot strategy. diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 8aaf1b73..b1f690c8 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -349,18 +349,10 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`); continue; } - let tableLsnNotBefore: string; - await db.query('BEGIN'); - try { - await this.snapshotTable(batch, db, table); - - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - tableLsnNotBefore = rs.rows[0][0]; - } finally { - // Read-only transaction, commit does not actually do anything. - await db.query('COMMIT'); - } + await this.snapshotTable(batch, db, table); + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + const tableLsnNotBefore = rs.rows[0][0]; await batch.markSnapshotDone([table], tableLsnNotBefore); await touch(); } @@ -386,70 +378,51 @@ WHERE oid = $1::regclass`, const estimatedCount = await this.estimatedCount(db, table); let at = 0; let lastLogIndex = 0; - - // We do streaming on two levels: - // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. - // 2. Fine level: Stream chunks from each fetch call. - await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`); - + const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` }); let columns: { i: number; name: string }[] = []; - let hasRemainingData = true; - while (hasRemainingData) { - // Fetch 10k at a time. - // The balance here is between latency overhead per FETCH call, - // and not spending too much time on each FETCH call. - // We aim for a couple of seconds on each FETCH call. - const cursor = db.stream({ - statement: `FETCH 10000 FROM powersync_cursor` - }); - hasRemainingData = false; - // pgwire streams rows in chunks. - // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. - // There are typically 100-200 rows per chunk. - for await (let chunk of cursor) { - if (chunk.tag == 'RowDescription') { - // We get a RowDescription for each FETCH call, but they should - // all be the same. - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name }; - }); - continue; - } - - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = row[c.i]; - } - return q; + // pgwire streams rows in chunks. + // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. + + for await (let chunk of cursor) { + if (chunk.tag == 'RowDescription') { + let i = 0; + columns = chunk.payload.map((c) => { + return { i: i++, name: c.name }; }); - if (rows.length > 0 && at - lastLogIndex >= 5000) { - logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); - lastLogIndex = at; - hasRemainingData = true; - } - if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication of ${this.slot_name}`); - } + continue; + } - for (const record of WalStream.getQueryData(rows)) { - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) - }); + const rows = chunk.rows.map((row) => { + let q: DatabaseInputRow = {}; + for (let c of columns) { + q[c.name] = row[c.i]; } + return q; + }); + if (rows.length > 0 && at - lastLogIndex >= 5000) { + logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + lastLogIndex = at; + } + if (this.abort_signal.aborted) { + throw new Error(`Aborted initial replication of ${this.slot_name}`); + } - at += rows.length; - Metrics.getInstance().rows_replicated_total.add(rows.length); - - await touch(); + for (const record of WalStream.getQueryData(rows)) { + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); } + + at += rows.length; + Metrics.getInstance().rows_replicated_total.add(rows.length); + + await touch(); } await batch.flush();