-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstartup.js
91 lines (82 loc) · 2.44 KB
/
startup.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const
promisify = require('es6-promisify'),
rabbit = require('amqplib'),
config = require('./config.json'),
mongoose = require('mongoose'),
log = require('./logger'),
loadClassifier = promisify(require('natural').BayesClassifier.load);
mongoose.Promise = Promise;
/**
* Connect to the database.
*
* This connection only needs to be established once upon process creation
* and then it can be used in subsequent modules along the execution chain
*/
const databasePromise = () => new Promise(resolve => {
mongoose.connect(config.db)
.then(resolve)
.catch(err => {throw new Error(err)});
});
/**
* Load the classifier json from disk
*/
const classifierPromise = () => loadClassifier('classifier.json', null);
/**
* Connect to the rabbitmq service
*
* Each subscriber will then open channels and assert specific queues
*/
const queueConnectionPromise = () => rabbit.connect(config.rabbit.queue);
/**
* Create the exchange
*
*/
const exchangePromise = () => new Promise((resolve, reject) => {
rabbit.connect(config.rabbit.queue).then(conn => {
return conn.createChannel().then(channel => {
return channel.assertExchange(config.rabbit.exchange_name, 'topic')
.then(ok => channel.close());
})
.then(() => conn.close())
.then(resolve);
});
});
/**
* Turn the results of the above promises into an object to be
* forwarded to the executing process
*/
const createResultObject = results => {
return Promise.resolve({
classifier: results[0],
queueConnection: results[3]
});
};
module.exports = new Promise((resolve, reject) => {
retry(queueConnectionPromise, 'connect to rabbit at ' + config.rabbit.queue, 10, 15000).then(conn => {
const promises = [
retry(classifierPromise, 'load classifier'),
retry(databasePromise, 'connect to mongoose'),
retry(exchangePromise, 'create rabbit exchange')
];
Promise.all(promises)
.then(results => {
results.push(conn);
return createResultObject(results)
})
.then(resolve)
.catch(reject);
});
});
/**
* Retry a promise
*/
function retry(promise, message, attempts = 5, interval = 500) {
return new Promise((resolve, reject) => {
promise().then(resolve).catch(err => {
if(attempts === 0) throw new Error('Max retries reached for ' + message);
else setTimeout(() => {
return retry(promise, message, --attempts, interval).then(resolve);
}, interval);
});
});
}