Skip to content

Commit 5865a71

Browse files
committed
feat: Periodicaly check if domain name changed and close connections to old database.
wip: periodic check for domain change wip: periodic checks wip: close sockets on instance closed
1 parent c6f575f commit 5865a71

File tree

8 files changed

+203
-25
lines changed

8 files changed

+203
-25
lines changed

.github/workflows/tests.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ jobs:
154154
MYSQL_DB:${{ vars.GOOGLE_CLOUD_PROJECT }}/MYSQL_DB
155155
POSTGRES_CONNECTION_NAME:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_CONNECTION_NAME
156156
POSTGRES_USER:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER
157-
POSTGRES_USER_IAM_NODE:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER_IAM_NODE
157+
POSTGRES_IAM_USER:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER_IAM_NODE
158158
POSTGRES_PASS:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_PASS
159159
POSTGRES_DB:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_DB
160160
POSTGRES_CAS_CONNECTION_NAME:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_CAS_CONNECTION_NAME
@@ -183,7 +183,7 @@ jobs:
183183
MYSQL_DB: "${{ steps.secrets.outputs.MYSQL_DB }}"
184184
POSTGRES_CONNECTION_NAME: "${{ steps.secrets.outputs.POSTGRES_CONNECTION_NAME }}"
185185
POSTGRES_USER: "${{ steps.secrets.outputs.POSTGRES_USER }}"
186-
POSTGRES_USER_IAM_NODE: "${{ steps.secrets.outputs.POSTGRES_USER_IAM_NODE }}"
186+
POSTGRES_IAM_USER: "${{ steps.secrets.outputs.POSTGRES_IAM_USER }}"
187187
POSTGRES_PASS: "${{ steps.secrets.outputs.POSTGRES_PASS }}"
188188
POSTGRES_DB: "${{ steps.secrets.outputs.POSTGRES_DB }}"
189189
POSTGRES_CAS_CONNECTION_NAME: "${{ steps.secrets.outputs.POSTGRES_CAS_CONNECTION_NAME }}"
@@ -279,7 +279,7 @@ jobs:
279279
MYSQL_DB:${{ vars.GOOGLE_CLOUD_PROJECT }}/MYSQL_DB
280280
POSTGRES_CONNECTION_NAME:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_CONNECTION_NAME
281281
POSTGRES_USER:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER
282-
POSTGRES_USER_IAM_NODE:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER_IAM_NODE
282+
POSTGRES_IAM_USER:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_USER_IAM_NODE
283283
POSTGRES_PASS:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_PASS
284284
POSTGRES_DB:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_DB
285285
POSTGRES_CAS_CONNECTION_NAME:${{ vars.GOOGLE_CLOUD_PROJECT }}/POSTGRES_CAS_CONNECTION_NAME
@@ -302,7 +302,7 @@ jobs:
302302
MYSQL_DB: "${{ steps.secrets.outputs.MYSQL_DB }}"
303303
POSTGRES_CONNECTION_NAME: "${{ steps.secrets.outputs.POSTGRES_CONNECTION_NAME }}"
304304
POSTGRES_USER: "${{ steps.secrets.outputs.POSTGRES_USER }}"
305-
POSTGRES_IAM_USER: "${{ steps.secrets.outputs.POSTGRES_USER_IAM_NODE }}"
305+
POSTGRES_IAM_USER: "${{ steps.secrets.outputs.POSTGRES_IAM_USER }}"
306306
POSTGRES_PASS: "${{ steps.secrets.outputs.POSTGRES_PASS }}"
307307
POSTGRES_DB: "${{ steps.secrets.outputs.POSTGRES_DB }}"
308308
POSTGRES_CAS_CONNECTION_NAME: "${{ steps.secrets.outputs.POSTGRES_CAS_CONNECTION_NAME }}"

src/cloud-sql-instance.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,26 @@
1414

1515
import {IpAddressTypes, selectIpAddress} from './ip-addresses';
1616
import {InstanceConnectionInfo} from './instance-connection-info';
17-
import {resolveInstanceName} from './parse-instance-connection-name';
17+
import {
18+
isSameInstance,
19+
resolveInstanceName,
20+
} from './parse-instance-connection-name';
1821
import {InstanceMetadata} from './sqladmin-fetcher';
1922
import {generateKeys} from './crypto';
2023
import {RSAKeys} from './rsa-keys';
2124
import {SslCert} from './ssl-cert';
2225
import {getRefreshInterval, isExpirationTimeValid} from './time';
2326
import {AuthTypes} from './auth-types';
27+
import {CloudSQLConnectorError} from './errors';
28+
29+
// Private types that describe exactly the methods
30+
// needed from tls.Socket to be able to close
31+
// sockets when the DNS Name changes.
32+
type EventFn = () => void;
33+
type ClosableSocket = {
34+
destroy: (error?: Error) => void;
35+
once: (name: string, handler: EventFn) => void;
36+
};
2437

2538
interface Fetcher {
2639
getInstanceMetadata({
@@ -42,6 +55,7 @@ interface CloudSQLInstanceOptions {
4255
ipType: IpAddressTypes;
4356
limitRateInterval?: number;
4457
sqlAdminFetcher: Fetcher;
58+
checkDomainInterval?: number;
4559
}
4660

4761
interface RefreshResult {
@@ -74,9 +88,13 @@ export class CloudSQLInstance {
7488
// The ongoing refresh promise is referenced by the `next` property
7589
private next?: Promise<RefreshResult>;
7690
private scheduledRefreshID?: ReturnType<typeof setTimeout> | null = undefined;
91+
private checkDomainID?: ReturnType<typeof setInterval> | null = undefined;
7792
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
7893
private throttle?: any;
7994
private closed = false;
95+
private checkDomainInterval: number;
96+
private sockets = new Set<ClosableSocket>();
97+
8098
public readonly instanceInfo: InstanceConnectionInfo;
8199
public ephemeralCert?: SslCert;
82100
public host?: string;
@@ -98,6 +116,7 @@ export class CloudSQLInstance {
98116
this.ipType = options.ipType || IpAddressTypes.PUBLIC;
99117
this.limitRateInterval = options.limitRateInterval || 30 * 1000; // 30 seconds
100118
this.sqlAdminFetcher = options.sqlAdminFetcher;
119+
this.checkDomainInterval = options.checkDomainInterval || 30 * 1000;
101120
}
102121

103122
// p-throttle library has to be initialized in an async scope in order to
@@ -152,6 +171,14 @@ export class CloudSQLInstance {
152171
this.next = undefined;
153172
return Promise.reject('closed');
154173
}
174+
if (this?.instanceInfo?.domainName && !this.checkDomainID) {
175+
this.checkDomainID = setInterval(
176+
() => {
177+
this.checkDomainChanged();
178+
},
179+
this.checkDomainInterval || 30 * 1000
180+
);
181+
}
155182

156183
const currentRefreshId = this.scheduledRefreshID;
157184

@@ -296,8 +323,8 @@ export class CloudSQLInstance {
296323
// If refresh has not yet started, then cancel the setTimeout
297324
if (this.scheduledRefreshID) {
298325
clearTimeout(this.scheduledRefreshID);
326+
this.scheduledRefreshID = null;
299327
}
300-
this.scheduledRefreshID = null;
301328
}
302329

303330
// Mark this instance as having an active connection. This is important to
@@ -312,9 +339,48 @@ export class CloudSQLInstance {
312339
close(): void {
313340
this.closed = true;
314341
this.cancelRefresh();
342+
if (this.checkDomainID) {
343+
clearInterval(this.checkDomainID);
344+
this.checkDomainID = null;
345+
}
346+
for (const socket of this.sockets) {
347+
socket.destroy(
348+
new CloudSQLConnectorError({
349+
code: 'ERRCLOSED',
350+
message: 'The connector was closed.',
351+
})
352+
);
353+
}
315354
}
316355

317356
isClosed(): boolean {
318357
return this.closed;
319358
}
359+
async checkDomainChanged() {
360+
if (!this.instanceInfo.domainName) {
361+
return;
362+
}
363+
364+
const newInfo = await resolveInstanceName(
365+
undefined,
366+
this.instanceInfo.domainName
367+
);
368+
if (!isSameInstance(this.instanceInfo, newInfo)) {
369+
// Domain name changed. Close and remove, then create a new map entry.
370+
this.close();
371+
}
372+
}
373+
addSocket(socket: ClosableSocket) {
374+
if (!this.instanceInfo.domainName) {
375+
// This was not connected by domain name. Ignore all sockets.
376+
return;
377+
}
378+
379+
// Add the socket to the list
380+
this.sockets.add(socket);
381+
// When the socket is closed, remove it.
382+
socket.once('closed', () => {
383+
this.sockets.delete(socket);
384+
});
385+
}
320386
}

src/connector.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export declare interface ConnectionOptions {
4444
ipType?: IpAddressTypes;
4545
instanceConnectionName: string;
4646
domainName?: string;
47+
checkDomainInterval?: number;
4748
limitRateInterval?: number;
4849
}
4950

@@ -129,6 +130,7 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
129130
const entry = this.get(key);
130131
if (entry) {
131132
if (entry.isResolved()) {
133+
await entry.instance?.checkDomainChanged();
132134
if (!entry.instance?.isClosed()) {
133135
// The instance is open and the domain has not changed.
134136
// use the cached instance.
@@ -154,6 +156,7 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
154156
ipType: opts.ipType || IpAddressTypes.PUBLIC,
155157
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
156158
sqlAdminFetcher: this.sqlAdminFetcher,
159+
checkDomainInterval: opts.checkDomainInterval,
157160
});
158161
this.set(key, new CacheEntry(promise));
159162

@@ -257,6 +260,9 @@ export class Connector {
257260
tlsSocket.once('secureConnect', async () => {
258261
cloudSqlInstance.setEstablishedConnection();
259262
});
263+
264+
cloudSqlInstance.addSocket(tlsSocket);
265+
260266
return tlsSocket;
261267
}
262268

system-test/pg-connect.cjs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ const {Client} = pg;
2020
t.test('open connection and retrieves standard pg tables', async t => {
2121
const connector = new Connector();
2222
const clientOpts = await connector.getOptions({
23-
instanceConnectionName: String(process.env.POSTGRES_CONNECTION_NAME),
23+
instanceConnectionName: process.env.POSTGRES_CONNECTION_NAME,
2424
});
2525
const client = new Client({
2626
...clientOpts,
27-
user: String(process.env.POSTGRES_USER),
28-
password: String(process.env.POSTGRES_PASS),
29-
database: String(process.env.POSTGRES_DB),
27+
user: process.env.POSTGRES_USER,
28+
password: process.env.POSTGRES_PASS,
29+
database: process.env.POSTGRES_DB,
3030
});
3131
t.after(async () => {
3232
try {
@@ -48,14 +48,14 @@ t.test('open connection and retrieves standard pg tables', async t => {
4848
t.test('open IAM connection and retrieves standard pg tables', async t => {
4949
const connector = new Connector();
5050
const clientOpts = await connector.getOptions({
51-
instanceConnectionName: String(process.env.POSTGRES_CONNECTION_NAME),
52-
ipType: "PUBLIC",
53-
authType: "IAM",
51+
instanceConnectionName: process.env.POSTGRES_CONNECTION_NAME,
52+
ipType: 'PUBLIC',
53+
authType: 'IAM',
5454
});
5555
const client = new Client({
5656
...clientOpts,
57-
user: String(process.env.POSTGRES_USER_IAM_NODE),
58-
database: String(process.env.POSTGRES_DB),
57+
user: process.env.POSTGRES_IAM_USER,
58+
database: process.env.POSTGRES_DB,
5959
});
6060
t.after(async () => {
6161
try {
@@ -82,9 +82,9 @@ t.test(
8282
});
8383
const client = new Client({
8484
...clientOpts,
85-
user: String(process.env.POSTGRES_USER),
86-
password: String(process.env.POSTGRES_CAS_PASS),
87-
database: String(process.env.POSTGRES_DB),
85+
user: process.env.POSTGRES_USER,
86+
password: process.env.POSTGRES_CAS_PASS,
87+
database: process.env.POSTGRES_DB,
8888
});
8989
t.after(async () => {
9090
try {

system-test/pg-connect.mjs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ t.test('open IAM connection and retrieves standard pg tables', async t => {
5050
const connector = new Connector();
5151
const clientOpts = await connector.getOptions({
5252
instanceConnectionName: String(process.env.POSTGRES_CONNECTION_NAME),
53-
ipType: "PUBLIC",
54-
authType: "IAM",
53+
ipType: 'PUBLIC',
54+
authType: 'IAM',
5555
});
5656
const client = new Client({
5757
...clientOpts,
58-
user: String(process.env.POSTGRES_USER_IAM_NODE),
58+
user: String(process.env.POSTGRES_IAM_USER),
5959
database: String(process.env.POSTGRES_DB),
6060
});
6161
t.after(async () => {

system-test/pg-connect.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ t.test('open IAM connection and retrieves standard pg tables', async t => {
5858
});
5959
const client = new Client({
6060
...clientOpts,
61-
user: String(process.env.POSTGRES_USER_IAM_NODE),
61+
user: String(process.env.POSTGRES_IAM_USER),
6262
database: String(process.env.POSTGRES_DB),
6363
});
6464
t.after(async () => {

test/cloud-sql-instance.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ t.test('CloudSQLInstance', async t => {
6767
instanceConnectionName: 'my-project:us-east1:my-instance',
6868
sqlAdminFetcher: fetcher,
6969
});
70+
t.after(() => instance.close());
7071

7172
t.same(
7273
instance.ephemeralCert.cert,
@@ -115,6 +116,7 @@ t.test('CloudSQLInstance', async t => {
115116
limitRateInterval: 50,
116117
},
117118
});
119+
t.after(() => instance.close());
118120

119121
await t.rejects(
120122
instance.refresh(),
@@ -135,6 +137,7 @@ t.test('CloudSQLInstance', async t => {
135137
limitRateInterval: 50,
136138
},
137139
});
140+
t.after(() => instance.close());
138141
instance.refresh = () => {
139142
if (refreshCount === 2) {
140143
const end = Date.now();
@@ -177,6 +180,7 @@ t.test('CloudSQLInstance', async t => {
177180
limitRateInterval: 50,
178181
},
179182
});
183+
t.after(() => instance.close());
180184
await (() =>
181185
new Promise((res): void => {
182186
let refreshCount = 0;
@@ -233,6 +237,7 @@ t.test('CloudSQLInstance', async t => {
233237
limitRateInterval: 50,
234238
},
235239
});
240+
t.after(() => instance.close());
236241
await (() =>
237242
new Promise((res): void => {
238243
let refreshCount = 0;
@@ -263,6 +268,7 @@ t.test('CloudSQLInstance', async t => {
263268
limitRateInterval: 50,
264269
},
265270
});
271+
t.after(() => instance.close());
266272

267273
await instance.refresh();
268274

@@ -301,6 +307,7 @@ t.test('CloudSQLInstance', async t => {
301307
limitRateInterval: 50,
302308
},
303309
});
310+
t.after(() => instance.close());
304311

305312
let cancelRefreshCalled = false;
306313
let refreshCalled = false;
@@ -338,6 +345,7 @@ t.test('CloudSQLInstance', async t => {
338345
sqlAdminFetcher: fetcher,
339346
},
340347
});
348+
t.after(() => instance.close());
341349

342350
const start = Date.now();
343351
// starts regular refresh cycle
@@ -377,6 +385,7 @@ t.test('CloudSQLInstance', async t => {
377385
sqlAdminFetcher: fetcher,
378386
},
379387
});
388+
t.after(() => instance.close());
380389
const start = Date.now();
381390
// starts out refresh logic
382391
let refreshCount = 1;
@@ -424,6 +433,7 @@ t.test('CloudSQLInstance', async t => {
424433
limitRateInterval: 50,
425434
},
426435
});
436+
t.after(() => instance.close());
427437

428438
// starts a new refresh cycle but do not await on it
429439
instance.refresh();
@@ -451,6 +461,7 @@ t.test('CloudSQLInstance', async t => {
451461
limitRateInterval: 50,
452462
},
453463
});
464+
t.after(() => instance.close());
454465

455466
// simulates an ongoing instance, already has data
456467
await instance.refresh();
@@ -487,6 +498,7 @@ t.test('CloudSQLInstance', async t => {
487498
limitRateInterval: 50,
488499
},
489500
});
501+
t.after(() => instance.close());
490502

491503
await instance.refresh();
492504
instance.setEstablishedConnection();
@@ -522,6 +534,7 @@ t.test('CloudSQLInstance', async t => {
522534
limitRateInterval: 50,
523535
},
524536
});
537+
t.after(() => instance.close());
525538

526539
await instance.refresh();
527540
instance.setEstablishedConnection();
@@ -589,6 +602,7 @@ t.test('CloudSQLInstance', async t => {
589602
limitRateInterval: 0,
590603
},
591604
});
605+
t.after(() => instance.close());
592606
await (() =>
593607
new Promise((res): void => {
594608
let refreshCount = 0;

0 commit comments

Comments
 (0)