Skip to content

Commit b2761fb

Browse files
authored
feat: Upgrade Redis 3 to 4 for LiveQuery (#8333)
1 parent 40dd82f commit b2761fb

10 files changed

+136
-40
lines changed

spec/ParseGraphQLServer.spec.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => {
432432
const expressApp = express();
433433
httpServer = http.createServer(expressApp);
434434
expressApp.use('/parse', parseServer.app);
435-
parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {
435+
parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {
436436
port: 1338,
437437
});
438438
parseGraphQLServer.applyGraphQL(expressApp);

spec/ParseLiveQueryRedis.spec.js

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
2+
describe('ParseLiveQuery redis', () => {
3+
afterEach(async () => {
4+
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
5+
client.close();
6+
});
7+
it('can connect', async () => {
8+
await reconfigureServer({
9+
startLiveQueryServer: true,
10+
liveQuery: {
11+
classNames: ['TestObject'],
12+
redisURL: 'redis://localhost:6379',
13+
},
14+
liveQueryServerOptions: {
15+
redisURL: 'redis://localhost:6379',
16+
},
17+
});
18+
const subscription = await new Parse.Query('TestObject').subscribe();
19+
const [object] = await Promise.all([
20+
new Parse.Object('TestObject').save(),
21+
new Promise(resolve =>
22+
subscription.on('create', () => {
23+
resolve();
24+
})
25+
),
26+
]);
27+
await Promise.all([
28+
new Promise(resolve =>
29+
subscription.on('delete', () => {
30+
resolve();
31+
})
32+
),
33+
object.destroy(),
34+
]);
35+
});
36+
37+
it('can call connect twice', async () => {
38+
const server = await reconfigureServer({
39+
startLiveQueryServer: true,
40+
liveQuery: {
41+
classNames: ['TestObject'],
42+
redisURL: 'redis://localhost:6379',
43+
},
44+
liveQueryServerOptions: {
45+
redisURL: 'redis://localhost:6379',
46+
},
47+
});
48+
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
49+
await server.config.liveQueryController.connect();
50+
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
51+
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
52+
await server.liveQueryServer.connect();
53+
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
54+
});
55+
});
56+
}

spec/ParseLiveQueryServer.spec.js

+9-9
Original file line numberDiff line numberDiff line change
@@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () {
9494
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
9595
});
9696

97-
it('can be initialized from ParseServer', function () {
97+
it('can be initialized from ParseServer', async () => {
9898
const httpServer = {};
99-
const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {});
99+
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {});
100100

101101
expect(parseLiveQueryServer.clientId).toBeUndefined();
102102
expect(parseLiveQueryServer.clients.size).toBe(0);
103103
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
104104
});
105105

106-
it('can be initialized from ParseServer without httpServer', function (done) {
107-
const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, {
106+
it('can be initialized from ParseServer without httpServer', async () => {
107+
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, {
108108
port: 22345,
109109
});
110110

111111
expect(parseLiveQueryServer.clientId).toBeUndefined();
112112
expect(parseLiveQueryServer.clients.size).toBe(0);
113113
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
114-
parseLiveQueryServer.server.close(done);
114+
await new Promise(resolve => parseLiveQueryServer.server.close(resolve));
115115
});
116116

117117
describe_only_db('mongo')('initialization', () => {
118-
it('can be initialized through ParseServer without liveQueryServerOptions', function (done) {
119-
const parseServer = ParseServer.start({
118+
it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) {
119+
const parseServer = await ParseServer.start({
120120
appId: 'hello',
121121
masterKey: 'world',
122122
port: 22345,
@@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () {
137137
});
138138
});
139139

140-
it('can be initialized through ParseServer with liveQueryServerOptions', function (done) {
141-
const parseServer = ParseServer.start({
140+
it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) {
141+
const parseServer = await ParseServer.start({
142142
appId: 'hello',
143143
masterKey: 'world',
144144
port: 22346,

spec/RedisPubSub.spec.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
1515
});
1616

1717
const redis = require('redis');
18-
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
18+
expect(redis.createClient).toHaveBeenCalledWith({
19+
url: 'redisAddress',
1920
socket_keepalive: true,
2021
no_ready_check: true,
2122
});
@@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
2829
});
2930

3031
const redis = require('redis');
31-
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
32+
expect(redis.createClient).toHaveBeenCalledWith({
33+
url: 'redisAddress',
3234
socket_keepalive: true,
3335
no_ready_check: true,
3436
});

spec/helper.js

+13-11
Original file line numberDiff line numberDiff line change
@@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => {
173173
port,
174174
});
175175
cache.clear();
176-
parseServer = ParseServer.start(newConfiguration);
177-
parseServer.expressApp.use('/1', err => {
178-
console.error(err);
179-
fail('should not call next');
180-
});
181-
server = parseServer.server;
182-
server.on('connection', connection => {
183-
const key = `${connection.remoteAddress}:${connection.remotePort}`;
184-
openConnections[key] = connection;
185-
connection.on('close', () => {
186-
delete openConnections[key];
176+
ParseServer.start(newConfiguration).then(_parseServer => {
177+
parseServer = _parseServer;
178+
parseServer.expressApp.use('/1', err => {
179+
console.error(err);
180+
fail('should not call next');
181+
});
182+
server = parseServer.server;
183+
server.on('connection', connection => {
184+
const key = `${connection.remoteAddress}:${connection.remotePort}`;
185+
openConnections[key] = connection;
186+
connection.on('close', () => {
187+
delete openConnections[key];
188+
});
187189
});
188190
});
189191
} catch (error) {

src/Adapters/PubSub/RedisPubSub.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import { createClient } from 'redis';
22

33
function createPublisher({ redisURL, redisOptions = {} }): any {
44
redisOptions.no_ready_check = true;
5-
return createClient(redisURL, redisOptions);
5+
return createClient({ url: redisURL, ...redisOptions });
66
}
77

88
function createSubscriber({ redisURL, redisOptions = {} }): any {
99
redisOptions.no_ready_check = true;
10-
return createClient(redisURL, redisOptions);
10+
return createClient({ url: redisURL, ...redisOptions });
1111
}
1212

1313
const RedisPubSub = {

src/Controllers/LiveQueryController.js

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ export class LiveQueryController {
2121
this.liveQueryPublisher = new ParseCloudCodePublisher(config);
2222
}
2323

24+
connect() {
25+
return this.liveQueryPublisher.connect();
26+
}
27+
2428
onAfterSave(
2529
className: string,
2630
currentObject: any,

src/LiveQuery/ParseCloudCodePublisher.js

+9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ class ParseCloudCodePublisher {
1111
this.parsePublisher = ParsePubSub.createPublisher(config);
1212
}
1313

14+
async connect() {
15+
if (typeof this.parsePublisher.connect === 'function') {
16+
if (this.parsePublisher.isOpen) {
17+
return;
18+
}
19+
return Promise.resolve(this.parsePublisher.connect());
20+
}
21+
}
22+
1423
onCloudCodeAfterSave(request: any): void {
1524
this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request);
1625
}

src/LiveQuery/ParseLiveQueryServer.js

+24-9
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,25 @@ class ParseLiveQueryServer {
7373
parseWebsocket => this._onConnect(parseWebsocket),
7474
config
7575
);
76-
77-
// Initialize subscriber
7876
this.subscriber = ParsePubSub.createSubscriber(config);
79-
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
80-
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
81-
this.subscriber.subscribe(Parse.applicationId + 'clearCache');
82-
// Register message handler for subscriber. When publisher get messages, it will publish message
83-
// to the subscribers and the handler will be called.
84-
this.subscriber.on('message', (channel, messageStr) => {
77+
if (!this.subscriber.connect) {
78+
this.connect();
79+
}
80+
}
81+
82+
async connect() {
83+
if (this.subscriber.isOpen) {
84+
return;
85+
}
86+
if (typeof this.subscriber.connect === 'function') {
87+
await Promise.resolve(this.subscriber.connect());
88+
} else {
89+
this.subscriber.isOpen = true;
90+
}
91+
this._createSubscribers();
92+
}
93+
_createSubscribers() {
94+
const messageRecieved = (channel, messageStr) => {
8595
logger.verbose('Subscribe message %j', messageStr);
8696
let message;
8797
try {
@@ -102,7 +112,12 @@ class ParseLiveQueryServer {
102112
} else {
103113
logger.error('Get message %s from unknown channel %j', message, channel);
104114
}
105-
});
115+
};
116+
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
117+
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
118+
const channel = `${Parse.applicationId}${field}`;
119+
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
120+
}
106121
}
107122

108123
// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.

src/ParseServer.js

+14-6
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,12 @@ class ParseServer {
7777

7878
const allControllers = controllers.getControllers(options);
7979

80-
const { loggerController, databaseController, hooksController } = allControllers;
80+
const {
81+
loggerController,
82+
databaseController,
83+
hooksController,
84+
liveQueryController,
85+
} = allControllers;
8186
this.config = Config.put(Object.assign({}, options, allControllers));
8287

8388
logging.setLogger(loggerController);
@@ -98,6 +103,7 @@ class ParseServer {
98103
) {
99104
startupPromises.push(options.cacheAdapter.connect());
100105
}
106+
startupPromises.push(liveQueryController.connect());
101107
await Promise.all(startupPromises);
102108
if (serverStartComplete) {
103109
serverStartComplete();
@@ -263,7 +269,7 @@ class ParseServer {
263269
* @param {Function} callback called when the server has started
264270
* @returns {ParseServer} the parse server instance
265271
*/
266-
start(options: ParseServerOptions, callback: ?() => void) {
272+
async start(options: ParseServerOptions, callback: ?() => void) {
267273
const app = express();
268274
if (options.middleware) {
269275
let middleware;
@@ -307,7 +313,7 @@ class ParseServer {
307313
this.server = server;
308314

309315
if (options.startLiveQueryServer || options.liveQueryServerOptions) {
310-
this.liveQueryServer = ParseServer.createLiveQueryServer(
316+
this.liveQueryServer = await ParseServer.createLiveQueryServer(
311317
server,
312318
options.liveQueryServerOptions,
313319
options
@@ -338,9 +344,9 @@ class ParseServer {
338344
* @param {Server} httpServer an optional http server to pass
339345
* @param {LiveQueryServerOptions} config options for the liveQueryServer
340346
* @param {ParseServerOptions} options options for the ParseServer
341-
* @returns {ParseLiveQueryServer} the live query server instance
347+
* @returns {Promise<ParseLiveQueryServer>} the live query server instance
342348
*/
343-
static createLiveQueryServer(
349+
static async createLiveQueryServer(
344350
httpServer,
345351
config: LiveQueryServerOptions,
346352
options: ParseServerOptions
@@ -350,7 +356,9 @@ class ParseServer {
350356
httpServer = require('http').createServer(app);
351357
httpServer.listen(config.port);
352358
}
353-
return new ParseLiveQueryServer(httpServer, config, options);
359+
const server = new ParseLiveQueryServer(httpServer, config, options);
360+
await server.connect();
361+
return server;
354362
}
355363

356364
static verifyServerUrl(callback) {

0 commit comments

Comments
 (0)