From b161264f68a4bb9182a23d5d2b4060c6ce49acf2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 6 Dec 2024 14:43:57 +0200 Subject: [PATCH 1/5] Add TCP keepalive, and shorter snapshot connection timeouts. --- .../src/replication/PgManager.ts | 14 ++++++++++ packages/jpgwire/src/pgwire_node.js | 27 ++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index c0850966..ad1ab899 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -1,6 +1,11 @@ import * as pgwire from '@powersync/service-jpgwire'; import { NormalizedPostgresConnectionConfig } from '../types/types.js'; +/** + * Shorter timeout for snapshot connections than for replication connections. + */ +const SNAPSHOT_SOCKET_TIMEOUT = 30_000; + export class PgManager { /** * Do not use this for any transactions. @@ -39,9 +44,18 @@ export class PgManager { const p = pgwire.connectPgWire(this.options, { type: 'standard' }); this.connectionPromises.push(p); const connection = await p; + + // Use an shorter timeout for snapshot connections. + // This is to detect broken connections early, instead of waiting + // for the full 6 minutes. + // This we are constantly using the connection, we don't need any + // custom keepalives. + (connection as any)._socket.setTimeout(SNAPSHOT_SOCKET_TIMEOUT); + // Disable statement timeout for snapshot queries. // On Supabase, the default is 2 minutes. await connection.query(`set session statement_timeout = 0`); + return connection; } diff --git a/packages/jpgwire/src/pgwire_node.js b/packages/jpgwire/src/pgwire_node.js index 77573d71..330d26f7 100644 --- a/packages/jpgwire/src/pgwire_node.js +++ b/packages/jpgwire/src/pgwire_node.js @@ -14,9 +14,14 @@ import { recordBytesRead } from './metrics.js'; // pgwire doesn't natively support configuring timeouts, but we just hardcode a default. // Timeout idle connections after 6 minutes (we ping at least every 5 minutes). const POWERSYNC_SOCKET_DEFAULT_TIMEOUT = 360_000; + // Timeout for the initial connection (pre-TLS) // Must be less than the timeout for a HTTP request const POWERSYNC_SOCKET_CONNECT_TIMEOUT = 20_000; + +// TCP keepalive delay in milliseconds. +// This can help detect dead connections earlier. +const POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY = 40_000; // END POWERSYNC const pbkdf2 = promisify(_pbkdf2); @@ -66,7 +71,20 @@ class SocketAdapter { static async connect(host, port) { // START POWERSYNC // Custom timeout handling - const socket = net.connect({ host, port, timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT }); + const socket = net.connect({ + host, + port, + + // This closes the connection if no data was sent or received for the given time, + // even if the connection is still actaully alive. + timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT, + + // This configures TCP keepalive. + keepAlive: true, + keepAliveInitialDelay: POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY + // Unfortunately it is not possible to set tcp_keepalive_intvl or + // tcp_keepalive_probes here. + }); try { const timeout = setTimeout(() => { socket.destroy(new Error(`Timeout while connecting to ${host}:${port}`)); @@ -102,6 +120,13 @@ class SocketAdapter { this._writeResume(); }); } + + // START POWERSYNC CUSTOM TIMEOUT + setTimeout(timeout) { + this._socket.setTimeout(timeout); + } + // END POWERSYNC CUSTOM TIMEOUT + async startTls(host, ca) { // START POWERSYNC CUSTOM OPTIONS HANDLING if (!Array.isArray(ca) && typeof ca[0] == 'object') { From 67b8eaf89f1da622092bc70102c2a650ff223bb1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 6 Dec 2024 14:54:58 +0200 Subject: [PATCH 2/5] Use DECLARE CURSOR / FETCH for table snapshots. --- .../src/replication/WalStream.ts | 112 +++++++++++------- 1 file changed, 70 insertions(+), 42 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index f7e0d1e0..52729d6c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -352,10 +352,19 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`); continue; } - await this.snapshotTable(batch, db, table); + let tableLsnNotBefore: string; + await db.query('BEGIN'); + try { + await this.snapshotTable(batch, db, table); + + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + tableLsnNotBefore = rs.rows[0][0]; + } finally { + // Read-only transaction, we don't need to worry about + // commit / rollback. + await db.query('END TRANSACTION'); + } - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const tableLsnNotBefore = rs.rows[0][0]; await batch.markSnapshotDone([table], tableLsnNotBefore); await touch(); } @@ -376,51 +385,70 @@ WHERE oid = $1::regclass`, const estimatedCount = await this.estimatedCount(db, table); let at = 0; let lastLogIndex = 0; - const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` }); - let columns: { i: number; name: string }[] = []; - // pgwire streams rows in chunks. - // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. - - for await (let chunk of cursor) { - if (chunk.tag == 'RowDescription') { - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name }; - }); - continue; - } - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = row[c.i]; - } - return q; + // We do streaming on two levels: + // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. + // 2. Fine level: Stream chunks from each fetch call. + await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`); + + let columns: { i: number; name: string }[] = []; + let hasRemainingData = true; + while (hasRemainingData) { + // Fetch 10k at a time. + // The balance here is between latency overhead per FETCH call, + // and not spending too much time on each FETCH call. + // We aim for a couple of seconds on each FETCH call. + const cursor = db.stream({ + statement: `FETCH 10000 FROM powersync_cursor` }); - if (rows.length > 0 && at - lastLogIndex >= 5000) { - logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); - lastLogIndex = at; - } - if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication of ${this.slot_name}`); - } + hasRemainingData = false; + // pgwire streams rows in chunks. + // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. + // There are typically 100-200 rows per chunk. + for await (let chunk of cursor) { + if (chunk.tag == 'RowDescription') { + // We get a RowDescription for each FETCH call, but they should + // all be the same. + let i = 0; + columns = chunk.payload.map((c) => { + return { i: i++, name: c.name }; + }); + continue; + } - for (const record of WalStream.getQueryData(rows)) { - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + const rows = chunk.rows.map((row) => { + let q: DatabaseInputRow = {}; + for (let c of columns) { + q[c.name] = row[c.i]; + } + return q; }); - } + if (rows.length > 0 && at - lastLogIndex >= 5000) { + logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + lastLogIndex = at; + hasRemainingData = true; + } + if (this.abort_signal.aborted) { + throw new Error(`Aborted initial replication of ${this.slot_name}`); + } + + for (const record of WalStream.getQueryData(rows)) { + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } - at += rows.length; - Metrics.getInstance().rows_replicated_total.add(rows.length); + at += rows.length; + Metrics.getInstance().rows_replicated_total.add(rows.length); - await touch(); + await touch(); + } } await batch.flush(); From afc9edef3e44dfaa01e18d87fe23bd9db0a9dc44 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 6 Dec 2024 15:13:54 +0200 Subject: [PATCH 3/5] Fix consistency issue. --- .../module-postgres/src/replication/WalStream.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 52729d6c..8aaf1b73 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -342,9 +342,6 @@ WHERE oid = $1::regclass`, await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: true }, async (batch) => { - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const startLsn = rs.rows[0][0]; - for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, db, tablePattern); for (let table of tables) { @@ -360,16 +357,20 @@ WHERE oid = $1::regclass`, const rs = await db.query(`select pg_current_wal_lsn() as lsn`); tableLsnNotBefore = rs.rows[0][0]; } finally { - // Read-only transaction, we don't need to worry about - // commit / rollback. - await db.query('END TRANSACTION'); + // Read-only transaction, commit does not actually do anything. + await db.query('COMMIT'); } await batch.markSnapshotDone([table], tableLsnNotBefore); await touch(); } } - await batch.commit(startLsn); + + // Always commit the initial snapshot at zero. + // This makes sure we don't skip any changes applied before starting this snapshot, + // in the case of snapshot retries. + // We could alternatively commit at the replication slot LSN. + await batch.commit(ZERO_LSN); } ); } From 73cb617fad1362e986323af10c7d88847d9afa2a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 6 Dec 2024 15:25:10 +0200 Subject: [PATCH 4/5] Add changeset. --- .changeset/young-rings-fold.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/young-rings-fold.md diff --git a/.changeset/young-rings-fold.md b/.changeset/young-rings-fold.md new file mode 100644 index 00000000..10591afd --- /dev/null +++ b/.changeset/young-rings-fold.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres': minor +'@powersync/service-jpgwire': patch +'@powersync/service-image': patch +--- + +Improve timeouts and table snapshots for Postgres initial replication. From 5e09399425fafa79bba9eb6a3edea09e21a4509c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Dec 2024 09:37:21 +0200 Subject: [PATCH 5/5] Tweak changesets. --- .changeset/chilled-bears-kneel.md | 3 ++- .changeset/young-rings-fold.md | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.changeset/chilled-bears-kneel.md b/.changeset/chilled-bears-kneel.md index e06cfbf1..b50c7a23 100644 --- a/.changeset/chilled-bears-kneel.md +++ b/.changeset/chilled-bears-kneel.md @@ -1,6 +1,7 @@ --- '@powersync/service-core': minor '@powersync/service-types': minor +'@powersync/service-image': minor --- -Add EdDSA support for signing JWTs +Add EdDSA support for JWTs. diff --git a/.changeset/young-rings-fold.md b/.changeset/young-rings-fold.md index 10591afd..d6c402a3 100644 --- a/.changeset/young-rings-fold.md +++ b/.changeset/young-rings-fold.md @@ -1,7 +1,7 @@ --- '@powersync/service-module-postgres': minor '@powersync/service-jpgwire': patch -'@powersync/service-image': patch +'@powersync/service-image': minor --- Improve timeouts and table snapshots for Postgres initial replication.