-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathserver.js
70 lines (57 loc) · 1.97 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/local/bin/node
var io = require('socket.io').listen(7979),
fs = require('fs'),
zmq = require('zmq'),
StatsD = require ('node-statsd').StatsD;
var Throughput = require('./app/throughput');
io.configure(function() {
//io.set('transports', ['websocket']);
io.set('log level', 2); // info
if (process.argv[3]) {
console.log("restricting origin: "+process.argv[3]);
io.set("origins", process.argv[3]);
}
});
var stats = new StatsD(process.argv[2], 8125);
var queue = zmq.createSocket('pull');
var throughput = new Throughput(2000);
var handled = {};
var handledArray = [];
throughput.setStats(stats, 'server');
queue.bind('tcp://*:5556', function(err) {
if (err) throw err;
console.log('bound ZMQ pull server');
queue.on('message', function(data) {
throughput.measure(data);
// @todo ideally we wouldn't parse the inbound data *just* to get the tweet ID. Perhaps
// the processor daemons can put a delemited message on instead?
var tweet = {};
try {
tweet = JSON.parse(data);
} catch (e) {
console.log("could not parse tweet");
stats.increment('nodeflakes.server.parse_error');
return;
}
if (handled[tweet.id] != null) {
console.log("ignoring duplicate tweet ["+tweet.id+"]");
stats.increment('nodeflakes.server.duplicate_tweet');
return;
}
handled[tweet.id] = tweet;
handledArray.push(tweet.id);
if (handledArray.length > 20) {
var id = handledArray.shift();
delete handled[id];
}
stats.increment('nodeflakes.server.tweet');
io.sockets.emit('tweet', data.toString('utf8'));
});
});
io.sockets.on('connection', function(socket) {
// any ack?
stats.increment('nodeflakes.server.connect');
socket.on('disconnect', function() {
stats.increment('nodeflakes.server.disconnect');
});
});