Skip to content

Commit 7b10465

Browse files
authored
Merge pull request #8 from journeyapps-platform/feat/rework-teardown
Implement powersync instance teardown functionality
2 parents d6d22b5 + 285afb0 commit 7b10465

27 files changed

+346
-215
lines changed

modules/module-postgres/src/module/PostgresModule.ts

+46-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1-
import { api, auth, ConfigurationFileSyncRulesProvider, replication, system } from '@powersync/service-core';
1+
import {
2+
api,
3+
auth,
4+
ConfigurationFileSyncRulesProvider,
5+
replication,
6+
system,
7+
TearDownOptions
8+
} from '@powersync/service-core';
29
import * as jpgwire from '@powersync/service-jpgwire';
310
import * as types from '../types/types.js';
411
import { PostgresRouteAPIAdapter } from '../api/PostgresRouteAPIAdapter.js';
512
import { SupabaseKeyCollector } from '../auth/SupabaseKeyCollector.js';
613
import { WalStreamReplicator } from '../replication/WalStreamReplicator.js';
714
import { ConnectionManagerFactory } from '../replication/ConnectionManagerFactory.js';
815
import { PostgresErrorRateLimiter } from '../replication/PostgresErrorRateLimiter.js';
16+
import { cleanUpReplicationSlot } from '../replication/replication-utils.js';
17+
import { PgManager } from '../replication/PgManager.js';
918

1019
export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
1120
constructor() {
@@ -24,29 +33,28 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
2433
this.registerSupabaseAuth(context);
2534
}
2635

27-
jpgwire.setMetricsRecorder({
28-
addBytesRead(bytes) {
29-
context.metrics.data_replicated_bytes.add(bytes);
30-
}
31-
});
36+
if (context.metrics) {
37+
jpgwire.setMetricsRecorder({
38+
addBytesRead(bytes) {
39+
context.metrics!.data_replicated_bytes.add(bytes);
40+
}
41+
});
42+
}
3243
}
3344

34-
protected createRouteAPIAdapter(decodedConfig: types.PostgresConnectionConfig): api.RouteAPI {
35-
return new PostgresRouteAPIAdapter(this.resolveConfig(decodedConfig));
45+
protected createRouteAPIAdapter(): api.RouteAPI {
46+
return new PostgresRouteAPIAdapter(this.resolveConfig(this.decodedConfig!));
3647
}
3748

38-
protected createReplicator(
39-
decodedConfig: types.PostgresConnectionConfig,
40-
context: system.ServiceContext
41-
): replication.AbstractReplicator {
42-
const normalisedConfig = this.resolveConfig(decodedConfig);
43-
const connectionFactory = new ConnectionManagerFactory(normalisedConfig);
49+
protected createReplicator(context: system.ServiceContext): replication.AbstractReplicator {
50+
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
4451
const syncRuleProvider = new ConfigurationFileSyncRulesProvider(context.configuration.sync_rules);
52+
const connectionFactory = new ConnectionManagerFactory(normalisedConfig);
4553

4654
return new WalStreamReplicator({
4755
id: this.getDefaultId(normalisedConfig.database),
4856
syncRuleProvider: syncRuleProvider,
49-
storageEngine: context.storage,
57+
storageEngine: context.storageEngine,
5058
connectionFactory: connectionFactory,
5159
rateLimiter: new PostgresErrorRateLimiter()
5260
});
@@ -62,7 +70,29 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
6270
};
6371
}
6472

65-
async teardown(): Promise<void> {}
73+
async teardown(options: TearDownOptions): Promise<void> {
74+
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
75+
const connectionManager = new PgManager(normalisedConfig, {
76+
idleTimeout: 30_000,
77+
maxSize: 1
78+
});
79+
80+
try {
81+
if (options.syncRules) {
82+
// TODO: In the future, once we have more replication types, we will need to check if these syncRules are for Postgres
83+
for (let syncRules of options.syncRules) {
84+
try {
85+
await cleanUpReplicationSlot(syncRules.slot_name, connectionManager.pool);
86+
} catch (e) {
87+
// Not really much we can do here for failures, most likely the database is no longer accessible
88+
this.logger.warn(`Failed to fully clean up Postgres replication slot: ${syncRules.slot_name}`, e);
89+
}
90+
}
91+
}
92+
} finally {
93+
await connectionManager.end();
94+
}
95+
}
6696

6797
// TODO: This should rather be done by registering the key collector in some kind of auth engine
6898
private registerSupabaseAuth(context: system.ServiceContextContainer) {

modules/module-postgres/src/replication/WalStreamReplicationJob.ts

+14-14
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import { PgManager } from './PgManager.js';
44

55
import { replication } from '@powersync/service-core';
66
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
7+
import { cleanUpReplicationSlot } from './replication-utils.js';
78

89
export interface WalStreamReplicationJobOptions extends replication.AbstractReplicationJobOptions {
910
connectionFactory: ConnectionManagerFactory;
1011
}
1112

1213
export class WalStreamReplicationJob extends replication.AbstractReplicationJob {
1314
private connectionFactory: ConnectionManagerFactory;
14-
private connectionManager: PgManager;
15+
private readonly connectionManager: PgManager;
1516

1617
constructor(options: WalStreamReplicationJobOptions) {
1718
super(options);
@@ -24,15 +25,14 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
2425
}
2526

2627
async cleanUp(): Promise<void> {
27-
this.logger.info(`Cleaning up replication slot: ${this.slot_name}`);
28-
28+
const connectionManager = this.connectionFactory.create({
29+
idleTimeout: 30_000,
30+
maxSize: 1
31+
});
2932
try {
30-
await this.connectionManager.pool.query({
31-
statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1',
32-
params: [{ type: 'varchar', value: this.slot_name }]
33-
});
33+
await cleanUpReplicationSlot(this.slotName, connectionManager.pool);
3434
} finally {
35-
await this.connectionManager.end();
35+
await connectionManager.end();
3636
}
3737
}
3838

@@ -56,7 +56,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
5656
}
5757
}
5858

59-
get slot_name() {
59+
get slotName() {
6060
return this.options.storage.slot_name;
6161
}
6262

@@ -67,14 +67,14 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
6767
// Fatal exception
6868
container.reporter.captureException(e, {
6969
metadata: {
70-
replication_slot: this.slot_name
70+
replication_slot: this.slotName
7171
}
7272
});
73-
this.logger.error(`Replication failed on ${this.slot_name}`, e);
73+
this.logger.error(`Replication failed on ${this.slotName}`, e);
7474

7575
if (e instanceof MissingReplicationSlotError) {
7676
// This stops replication on this slot, and creates a new slot
77-
await this.options.storage.factory.slotRemoved(this.slot_name);
77+
await this.options.storage.factory.slotRemoved(this.slotName);
7878
}
7979
} finally {
8080
this.abortController.abort();
@@ -108,7 +108,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
108108
const stream = new WalStream({
109109
abort_signal: this.abortController.signal,
110110
storage: this.options.storage,
111-
connections: this.connectionManager
111+
connections: connectionManager
112112
});
113113
await stream.replicate();
114114
} catch (e) {
@@ -142,7 +142,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob
142142
// Report the error if relevant, before retrying
143143
container.reporter.captureException(e, {
144144
metadata: {
145-
replication_slot: this.slot_name
145+
replication_slot: this.slotName
146146
}
147147
});
148148
// This sets the retry delay

modules/module-postgres/src/replication/WalStreamReplicator.ts

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import { AbstractReplicatorOptions, replication } from '@powersync/service-core';
1+
import { storage, replication } from '@powersync/service-core';
22
import { WalStreamReplicationJob } from './WalStreamReplicationJob.js';
33
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
4+
import { cleanUpReplicationSlot } from './replication-utils.js';
45

5-
export interface WalStreamReplicatorOptions extends AbstractReplicatorOptions {
6+
export interface WalStreamReplicatorOptions extends replication.AbstractReplicatorOptions {
67
connectionFactory: ConnectionManagerFactory;
78
}
89

@@ -23,6 +24,19 @@ export class WalStreamReplicator extends replication.AbstractReplicator<WalStrea
2324
});
2425
}
2526

27+
async cleanUp(syncRulesStorage: storage.SyncRulesBucketStorage): Promise<void> {
28+
const connectionManager = this.connectionFactory.create({
29+
idleTimeout: 30_000,
30+
maxSize: 1
31+
});
32+
try {
33+
// TODO: Slot_name will likely have to come from a different source in the future
34+
await cleanUpReplicationSlot(syncRulesStorage.slot_name, connectionManager.pool);
35+
} finally {
36+
await connectionManager.end();
37+
}
38+
}
39+
2640
async stop(): Promise<void> {
2741
await super.stop();
2842
await this.connectionFactory.shutdown();

modules/module-postgres/src/replication/replication-utils.ts

+10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
77
import * as service_types from '@powersync/service-types';
88
import * as pg_utils from '../utils/pgwire_utils.js';
99
import * as util from '../utils/pgwire_utils.js';
10+
import { logger } from '@powersync/lib-services-framework';
1011

1112
export interface ReplicaIdentityResult {
1213
replicationColumns: storage.ColumnDescriptor[];
@@ -317,3 +318,12 @@ export async function getDebugTableInfo(options: GetDebugTableInfoOptions): Prom
317318
) as service_types.ReplicationError[]
318319
};
319320
}
321+
322+
export async function cleanUpReplicationSlot(slotName: string, db: pgwire.PgClient): Promise<void> {
323+
logger.info(`Cleaning up Postgres replication slot: ${slotName}...`);
324+
325+
await db.query({
326+
statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1',
327+
params: [{ type: 'varchar', value: slotName }]
328+
});
329+
}

packages/service-core/src/api/diagnostics.ts

+10-18
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { DEFAULT_TAG, SourceTableInterface, SqlSyncRules } from '@powersync/serv
33
import { SyncRulesStatus, TableInfo } from '@powersync/service-types';
44

55
import * as storage from '../storage/storage-index.js';
6-
import * as system from '../system/system-index.js';
6+
import { RouteAPI } from './RouteAPI.js';
77

88
export interface DiagnosticsOptions {
99
/**
@@ -27,7 +27,8 @@ export interface DiagnosticsOptions {
2727
export const DEFAULT_DATASOURCE_ID = 'default';
2828

2929
export async function getSyncRulesStatus(
30-
serviceContext: system.ServiceContext,
30+
bucketStorage: storage.BucketStorageFactory,
31+
apiHandler: RouteAPI,
3132
sync_rules: storage.PersistedSyncRulesContent | null,
3233
options: DiagnosticsOptions
3334
): Promise<SyncRulesStatus | undefined> {
@@ -52,24 +53,15 @@ export async function getSyncRulesStatus(
5253
};
5354
}
5455

55-
const {
56-
storage: { activeBucketStorage }
57-
} = serviceContext;
58-
const api = serviceContext.routerEngine.getAPI();
59-
60-
const systemStorage = live_status ? activeBucketStorage.getInstance(persisted) : undefined;
56+
const systemStorage = live_status ? bucketStorage.getInstance(persisted) : undefined;
6157
const status = await systemStorage?.getStatus();
6258
let replication_lag_bytes: number | undefined = undefined;
6359

6460
let tables_flat: TableInfo[] = [];
6561

6662
if (check_connection) {
67-
if (!api) {
68-
throw new Error('No connection configured');
69-
}
70-
7163
const source_table_patterns = rules.getSourceTables();
72-
const resolved_tables = await api.getDebugTablesInfo(source_table_patterns, rules);
64+
const resolved_tables = await apiHandler.getDebugTablesInfo(source_table_patterns, rules);
7365
tables_flat = resolved_tables.flatMap((info) => {
7466
if (info.table) {
7567
return [info.table];
@@ -82,7 +74,7 @@ export async function getSyncRulesStatus(
8274

8375
if (systemStorage) {
8476
try {
85-
replication_lag_bytes = await api.getReplicationLag(systemStorage.slot_name);
77+
replication_lag_bytes = await apiHandler.getReplicationLag(systemStorage.slot_name);
8678
} catch (e) {
8779
// Ignore
8880
logger.warn(`Unable to get replication lag`, e);
@@ -136,15 +128,15 @@ export async function getSyncRulesStatus(
136128
})
137129
);
138130

139-
const sourceConfig = await api?.getSourceConfig();
140-
const tag = sourceConfig?.tag ?? DEFAULT_TAG;
131+
const sourceConfig = await apiHandler.getSourceConfig();
132+
const tag = sourceConfig.tag ?? DEFAULT_TAG;
141133

142134
return {
143135
content: include_content ? sync_rules.sync_rules_content : undefined,
144136
connections: [
145137
{
146-
id: sourceConfig?.id ?? DEFAULT_DATASOURCE_ID,
147-
tag: sourceConfig?.tag ?? DEFAULT_TAG,
138+
id: sourceConfig.id ?? DEFAULT_DATASOURCE_ID,
139+
tag: sourceConfig.tag ?? DEFAULT_TAG,
148140
slot_name: sync_rules.slot_name,
149141
initial_replication_done: status?.snapshot_done ?? false,
150142
// TODO: Rename?

packages/service-core/src/api/schema.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { internal_routes } from '@powersync/service-types';
22

33
import * as api from '../api/api-index.js';
44

5-
export async function getConnectionsSchema(api: api.RouteAPI | null): Promise<internal_routes.GetSchemaResponse> {
5+
export async function getConnectionsSchema(api: api.RouteAPI): Promise<internal_routes.GetSchemaResponse> {
66
if (!api) {
77
return {
88
connections: []

packages/service-core/src/entry/commands/teardown-action.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export function registerTearDownAction(program: Command) {
1212

1313
return teardownCommand
1414
.argument('[ack]', 'Type `TEARDOWN` to confirm teardown should occur')
15-
.description('Terminate all replicating sync rules, deleting the replication slots')
15+
.description('Terminate all replicating sync rules, clear remote configuration and remove all data')
1616
.action(async (ack, options) => {
1717
if (ack !== 'TEARDOWN') {
1818
throw new Error('TEARDOWN was not acknowledged.');

packages/service-core/src/modules/AbstractModule.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import { ServiceContextContainer } from '../system/ServiceContext.js';
22
import { logger } from '@powersync/lib-services-framework';
33
import winston from 'winston';
4+
import { PersistedSyncRulesContent } from '../storage/BucketStorage.js';
5+
6+
export interface TearDownOptions {
7+
/**
8+
* If required, tear down any configuration/state for the specific sync rules
9+
*/
10+
syncRules?: PersistedSyncRulesContent[];
11+
}
412

513
export interface AbstractModuleOptions {
614
name: string;
@@ -19,9 +27,9 @@ export abstract class AbstractModule {
1927
public abstract initialize(context: ServiceContextContainer): Promise<void>;
2028

2129
/**
22-
* Terminate and clean up any resources managed by the module right away
30+
* Permanently clean up and dispose of any configuration or state for this module.
2331
*/
24-
public abstract teardown(): Promise<void>;
32+
public abstract teardown(options: TearDownOptions): Promise<void>;
2533

2634
public get name() {
2735
return this.options.name;

packages/service-core/src/modules/ModuleManager.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { logger } from '@powersync/lib-services-framework';
22
import * as system from '../system/system-index.js';
3-
import { AbstractModule } from './AbstractModule.js';
3+
import { AbstractModule, TearDownOptions } from './AbstractModule.js';
44
/**
5-
* The module manager is responsible for managing the lifecycle of all modules in the system.
5+
* The module manager keeps track of activated modules
66
*/
77
export class ModuleManager {
88
private readonly modules: Map<string, AbstractModule> = new Map();
@@ -26,9 +26,9 @@ export class ModuleManager {
2626
logger.info(`Successfully Initialized modules.`);
2727
}
2828

29-
async tearDown() {
29+
async tearDown(options: TearDownOptions) {
3030
for (const module of this.modules.values()) {
31-
await module.teardown();
31+
await module.teardown(options);
3232
}
3333
}
3434
}

0 commit comments

Comments
 (0)