Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Postgres] Improve timeouts and table snapshots #152

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions modules/module-postgres/src/replication/PgManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import * as pgwire from '@powersync/service-jpgwire';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';

/**
* Shorter timeout for snapshot connections than for replication connections.
*/
const SNAPSHOT_SOCKET_TIMEOUT = 30_000;

export class PgManager {
/**
* Do not use this for any transactions.
Expand Down Expand Up @@ -39,9 +44,18 @@ export class PgManager {
const p = pgwire.connectPgWire(this.options, { type: 'standard' });
this.connectionPromises.push(p);
const connection = await p;

// Use an shorter timeout for snapshot connections.
// This is to detect broken connections early, instead of waiting
// for the full 6 minutes.
// This we are constantly using the connection, we don't need any
// custom keepalives.
(connection as any)._socket.setTimeout(SNAPSHOT_SOCKET_TIMEOUT);

// Disable statement timeout for snapshot queries.
// On Supabase, the default is 2 minutes.
await connection.query(`set session statement_timeout = 0`);

return connection;
}

Expand Down
121 changes: 75 additions & 46 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,25 +342,35 @@ WHERE oid = $1::regclass`,
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: true },
async (batch) => {
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
const startLsn = rs.rows[0][0];

for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
for (let table of tables) {
if (table.snapshotComplete) {
logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`);
continue;
}
await this.snapshotTable(batch, db, table);
let tableLsnNotBefore: string;
await db.query('BEGIN');
try {
await this.snapshotTable(batch, db, table);

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
tableLsnNotBefore = rs.rows[0][0];
} finally {
// Read-only transaction, commit does not actually do anything.
await db.query('COMMIT');
}

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
const tableLsnNotBefore = rs.rows[0][0];
await batch.markSnapshotDone([table], tableLsnNotBefore);
await touch();
}
}
await batch.commit(startLsn);

// Always commit the initial snapshot at zero.
// This makes sure we don't skip any changes applied before starting this snapshot,
// in the case of snapshot retries.
// We could alternatively commit at the replication slot LSN.
await batch.commit(ZERO_LSN);
rkistner marked this conversation as resolved.
Show resolved Hide resolved
}
);
}
Expand All @@ -376,51 +386,70 @@ WHERE oid = $1::regclass`,
const estimatedCount = await this.estimatedCount(db, table);
let at = 0;
let lastLogIndex = 0;
const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` });
let columns: { i: number; name: string }[] = [];
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.

for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
continue;
}

const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
// We do streaming on two levels:
// 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time.
// 2. Fine level: Stream chunks from each fetch call.
await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`);

let columns: { i: number; name: string }[] = [];
let hasRemainingData = true;
while (hasRemainingData) {
// Fetch 10k at a time.
// The balance here is between latency overhead per FETCH call,
// and not spending too much time on each FETCH call.
// We aim for a couple of seconds on each FETCH call.
const cursor = db.stream({
statement: `FETCH 10000 FROM powersync_cursor`
});
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}
hasRemainingData = false;
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.
// There are typically 100-200 rows per chunk.
for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
// We get a RowDescription for each FETCH call, but they should
// all be the same.
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
continue;
}

for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
});
}
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
hasRemainingData = true;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}

for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}

at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);
at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);

await touch();
await touch();
}
}

await batch.flush();
Expand Down
27 changes: 26 additions & 1 deletion packages/jpgwire/src/pgwire_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import { recordBytesRead } from './metrics.js';
// pgwire doesn't natively support configuring timeouts, but we just hardcode a default.
// Timeout idle connections after 6 minutes (we ping at least every 5 minutes).
const POWERSYNC_SOCKET_DEFAULT_TIMEOUT = 360_000;

// Timeout for the initial connection (pre-TLS)
// Must be less than the timeout for a HTTP request
const POWERSYNC_SOCKET_CONNECT_TIMEOUT = 20_000;

// TCP keepalive delay in milliseconds.
// This can help detect dead connections earlier.
const POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY = 40_000;
// END POWERSYNC

const pbkdf2 = promisify(_pbkdf2);
Expand Down Expand Up @@ -66,7 +71,20 @@ class SocketAdapter {
static async connect(host, port) {
// START POWERSYNC
// Custom timeout handling
const socket = net.connect({ host, port, timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT });
const socket = net.connect({
host,
port,

// This closes the connection if no data was sent or received for the given time,
// even if the connection is still actaully alive.
timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT,

// This configures TCP keepalive.
keepAlive: true,
keepAliveInitialDelay: POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY
// Unfortunately it is not possible to set tcp_keepalive_intvl or
// tcp_keepalive_probes here.
});
try {
const timeout = setTimeout(() => {
socket.destroy(new Error(`Timeout while connecting to ${host}:${port}`));
Expand Down Expand Up @@ -102,6 +120,13 @@ class SocketAdapter {
this._writeResume();
});
}

// START POWERSYNC CUSTOM TIMEOUT
setTimeout(timeout) {
this._socket.setTimeout(timeout);
}
// END POWERSYNC CUSTOM TIMEOUT

async startTls(host, ca) {
// START POWERSYNC CUSTOM OPTIONS HANDLING
if (!Array.isArray(ca) && typeof ca[0] == 'object') {
Expand Down
Loading