-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcumin.js
141 lines (112 loc) · 4.08 KB
/
cumin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
var redis = require("redis");
var consolePrefix = "[cumin]";
var promisify = require('util').promisify || function(x) { return x; };
module.exports = function(port, host, options) {
var redisArgs = arguments;
var nonBlockingClient = redis.createClient.apply(redis, redisArgs),
blockingClient;
var redisBlpopTimeout = 1;
var alreadyListening = false,
killSignalReceived = false,
pendingTasks = 0,
killWaitTimeout = 20;
function onKillSignal() {
if(!killSignalReceived) {
killSignalReceived = true;
console.info("\n" + consolePrefix, "Attempting clean shutdown...");
console.info(consolePrefix, "To force shutdown, hit Ctrl+C again.");
console.info(consolePrefix, "Waiting upto", redisBlpopTimeout, "seconds for next chance to kill the redis connection...");
setTimeout(function() {
console.info(consolePrefix, "Forcing kill due to", killWaitTimeout, "seconds timeout.");
process.exit();
}, killWaitTimeout * 1000)
} else {
console.info("\n" + consolePrefix, "Forcing shutdown now.");
setTimeout(process.exit, 500);
}
}
function attemptCleanShutdown() {
console.info(consolePrefix, "Not reconnecting to redis because of kill signal received.");
if(pendingTasks == 0) {
console.info(consolePrefix, "No pending tasks. Exiting now.");
process.exit();
} else {
console.info(consolePrefix, "Waiting for pending tasks to be completed. Pending count:", pendingTasks);
}
}
function continueListening(queueName, handler) {
var promiseMode = (handler.length < 2);
if(killSignalReceived) return attemptCleanShutdown();
blockingClient.blpop(queueName, redisBlpopTimeout, function(err, data) {
if(err) return console.log(err);
if(data) {
var bareQueueName = queueName.slice(("cumin.").length);
nonBlockingClient.hset("cuminmeta." + bareQueueName, "lastDequeued", Date.now());
nonBlockingClient.publish("cumin.dequeued", data[1]);
var queueItem = JSON.parse(data[1]);
var handlerOnComplete = function() {
pendingTasks--;
nonBlockingClient.hset("cuminmeta." + bareQueueName, "completed", Date.now());
nonBlockingClient.publish("cumin.processed", data[1]);
if(killSignalReceived && pendingTasks) {
console.info(consolePrefix, "Waiting for pending tasks to be completed. Pending count:", pendingTasks);
}
if(killSignalReceived && !pendingTasks) {
console.info(consolePrefix, "Pending tasks completed. Shutting down now.");
process.exit();
}
}
pendingTasks++;
if(promiseMode) {
handler(queueItem.data).then(handlerOnComplete);
} else {
handler(queueItem.data, handlerOnComplete);
}
}
process.nextTick(function() {
continueListening(queueName, handler);
});
});
}
return {
enqueue: promisify(function(queueName, queueData, done) {
if(!queueName) {
throw new Error("Queue name must be provided. eg. 'emailQueue'.");
}
var now = Date.now();
var message = JSON.stringify({
byPid: process.pid,
byTitle: process.title,
queueName: queueName,
date: now,
data: queueData,
retryCount: 0
});
nonBlockingClient.sadd("cuminqueues", queueName);
nonBlockingClient.hset("cuminmeta." + queueName, "lastEnqueued", now);
nonBlockingClient.rpush("cumin." + queueName, message);
nonBlockingClient.publish("cumin.enqueued", message, done);
}),
listen: function(queueName, handler) {
if(!queueName) {
throw new Error(consolePrefix, "Queue name must be provided. eg. 'emailQueue'.");
}
if(!handler) {
throw new Error(consolePrefix, "You must provide a hander to .listen.");
}
if(alreadyListening) {
throw new Error(consolePrefix, "You can only .listen once in an app. To listen to another queue, create another app.");
}
if(!blockingClient) {
blockingClient = redis.createClient.apply(redis, redisArgs);
}
process.on("SIGINT", onKillSignal);
process.on("SIGTERM", onKillSignal);
alreadyListening = true;
if(handler.length < 2) {
console.log(consolePrefix, 'Assuming that the handler returns a promise.');
}
continueListening("cumin." + queueName, handler);
}
}
}