|
| 1 | +var config = require('./config'); |
| 2 | +var fs = require("fs"); |
| 3 | + |
| 4 | +var pathToLog; |
| 5 | +var historyTopics = []; |
| 6 | + |
| 7 | +function log(x) { |
| 8 | + console.log("<History> "+x); |
| 9 | +} |
| 10 | + |
| 11 | +// ============================================================================= |
| 12 | +function getLogFileForDate(timespec, date) { |
| 13 | + if (!pathToLog) return; |
| 14 | + return pathToLog+"/"+timespec+"-"+date.getFullYear()+"-"+date.getMonth()+"-"+date.getDate(); |
| 15 | +} |
| 16 | + |
| 17 | +function logWrite(timespec, topic, data) { |
| 18 | + if (!pathToLog) return; |
| 19 | + log(" [LOG]"+timespec+" "+topic+" "+data); |
| 20 | + var file = getLogFileForDate(timespec, new Date()); |
| 21 | + fs.appendFileSync(file, Date.now()+" "+topic+" "+data+"\n"); |
| 22 | +}; |
| 23 | + |
| 24 | +function logReadTopic(interval, from, to, topic, callback) { |
| 25 | + var time = from.getTime(); |
| 26 | + var toTime = to.getTime(); |
| 27 | + if (from.getFullYear()<2018 || |
| 28 | + toTime > Date.now()+1000*60*60*24) return; // invalid date range |
| 29 | + var files = []; |
| 30 | + while (time <= toTime) { |
| 31 | + var file = getLogFileForDate(interval, new Date(time)); |
| 32 | + if (fs.existsSync(file)) files.push(file); |
| 33 | + time += 1000*60*60*24; // one day |
| 34 | + } |
| 35 | + function readFiles(result, callback) { |
| 36 | + if (!files.length) return callback(result); |
| 37 | + var file = files.shift(); // take first file off |
| 38 | + const rl = require("readline").createInterface({ |
| 39 | + input: fs.createReadStream(file), |
| 40 | + crlfDelay: Infinity |
| 41 | + }); |
| 42 | + rl.on('line', (line) => { |
| 43 | + var topicIdx = line.indexOf(" "); |
| 44 | + var dataIdx = line.indexOf(" ", topicIdx+1); |
| 45 | + var lTopic = line.substring(topicIdx+1, dataIdx); |
| 46 | + if (lTopic == topic) { |
| 47 | + result.times.push(parseInt(line.substr(0,topicIdx))); |
| 48 | + result.data.push(line.substr(dataIdx+1)); |
| 49 | + } |
| 50 | + }); |
| 51 | + rl.on('close', (line) => { |
| 52 | + readFiles(result, callback); |
| 53 | + }); |
| 54 | + } |
| 55 | + readFiles({ |
| 56 | + interval : interval, |
| 57 | + from : from.getTime(), |
| 58 | + to : to.getTime(), |
| 59 | + topic : topic, |
| 60 | + times : [], |
| 61 | + data : [], |
| 62 | + }, function(result) { |
| 63 | + callback(result); |
| 64 | + }); |
| 65 | +}; |
| 66 | + |
| 67 | +// ============================================================================= |
| 68 | +function onMQTTMessage(topic, message) { |
| 69 | + var msg = message.toString(); |
| 70 | + if (topic.indexOf(" ")>=0) return; // ignore topics with spaces |
| 71 | + if (topic.startsWith(config.history_path)) { |
| 72 | + handleCommand(topic, msg); |
| 73 | + } else { |
| 74 | + handleData(topic, msg); |
| 75 | + } |
| 76 | +} |
| 77 | + |
| 78 | +function handleData(topic, message) { |
| 79 | + var data = parseFloat(message); |
| 80 | + if (!isNaN(data)) { |
| 81 | + //log("MQTT>"+topic+" => "+data); |
| 82 | + if (topic in historyTopics) { |
| 83 | + historyTopics[topic].pushNumber(data); |
| 84 | + } else { |
| 85 | + historyTopics[topic] = new HistoryTopic(topic); |
| 86 | + historyTopics[topic].pushNumber(data); |
| 87 | + } |
| 88 | + } |
| 89 | +} |
| 90 | + |
| 91 | +function handleCommand(topic, message) { |
| 92 | + var cmdRequest = config.history_path+"request/"; |
| 93 | + //log("MQTT Command>"+topic+" => "+JSON.stringify(message)); |
| 94 | + if (topic.startsWith(cmdRequest)) { |
| 95 | + /* |
| 96 | + interval : "minute", |
| 97 | + //use age : 1, // hour |
| 98 | + //or from : "1 July 2018", to : "5 July 2018" (or anything that works in new Date(...)) |
| 99 | + topic : "/ble/advertise/..." |
| 100 | + */ |
| 101 | + var json; |
| 102 | + try { |
| 103 | + json = JSON.parse(message); |
| 104 | + } catch (e) { |
| 105 | + log("MQTT "+cmdRequest+" malformed JSON "+JSON.stringify(message)); |
| 106 | + return; |
| 107 | + } |
| 108 | + var tag = topic.substr(cmdRequest.length); |
| 109 | + log("REQUEST "+tag+" "+JSON.stringify(json)); |
| 110 | + // TODO: Validate request |
| 111 | + if (!json.topic) { |
| 112 | + log("MQTT "+cmdRequest+" no topic"); |
| 113 | + return; |
| 114 | + } |
| 115 | + if (!(json.interval in config.history_times)) { |
| 116 | + log("MQTT "+cmdRequest+" invalid interval"); |
| 117 | + return; |
| 118 | + } |
| 119 | + var dFrom,dTo; |
| 120 | + if (json.from) |
| 121 | + dFrom = new Date(json.from); |
| 122 | + if (json.age) |
| 123 | + dFrom = new Date(Date.now() - parseFloat(json.age*1000*60*60)); |
| 124 | + if (json.to) |
| 125 | + dTo = new Date(json.to); |
| 126 | + else |
| 127 | + dTo = new Date(); |
| 128 | + if (!dFrom || isNaN(dFrom.getTime()) || |
| 129 | + !dTo || isNaN(dTo.getTime())) { |
| 130 | + log("MQTT "+cmdRequest+" invalid from/to or age"); |
| 131 | + return; |
| 132 | + } |
| 133 | + logReadTopic(json.interval, dFrom, dTo, json.topic, function(data) { |
| 134 | + log("RESPONSE "+tag+" ("+data.data.length+" items)"); |
| 135 | + require('./mqttclient.js').send(config.history_path+"response/"+tag, JSON.stringify(data)); |
| 136 | + }); |
| 137 | + } |
| 138 | +} |
| 139 | + |
| 140 | +// ============================================================================= |
| 141 | +function HistoryTopic(topic) { |
| 142 | + log("New History Topic for "+topic); |
| 143 | + this.topic = topic; |
| 144 | + this.times = {}; |
| 145 | + for (var i in config.history_times) |
| 146 | + this.times[i] = { num : 0, sum : 0, time : 0 }; |
| 147 | +} |
| 148 | + |
| 149 | +HistoryTopic.prototype.pushNumber = function(n) { |
| 150 | + //log.write("all",this.topic,n); |
| 151 | + for (var i in config.history_times) { |
| 152 | + this.times[i].num++; |
| 153 | + this.times[i].sum+=n; |
| 154 | + } |
| 155 | +}; |
| 156 | + |
| 157 | +HistoryTopic.prototype.tick = function(time) { |
| 158 | + for (var period in config.history_times) { |
| 159 | + var t = this.times[period]; |
| 160 | + t.time += time; |
| 161 | + if (t.time > config.history_times[period]) { |
| 162 | + if (t.num) { |
| 163 | + var avr = t.sum / t.num; |
| 164 | + logWrite(period, this.topic,avr); |
| 165 | + require('./mqttclient.js').send(config.history_path+period+this.topic, ""+avr); |
| 166 | + } |
| 167 | + this.times[period] = { num : 0, sum : 0, time : 0 }; |
| 168 | + } |
| 169 | + } |
| 170 | +}; |
| 171 | + |
| 172 | +// ============================================================================= |
| 173 | +exports.init = function() { |
| 174 | + var mqtt = require("./mqttclient.js").client; |
| 175 | + // Link in to messages |
| 176 | + mqtt.on('connect', function () { |
| 177 | + // Subscribe to any BLE data |
| 178 | + mqtt.subscribe("/ble/#"); |
| 179 | + // Subscribe to history requests |
| 180 | + mqtt.subscribe(config.history_path+"#"); |
| 181 | + }); |
| 182 | + mqtt.on('message', onMQTTMessage); |
| 183 | + // Check all history topics and write to log if needed |
| 184 | + // TODO: could be just do this when we receive our data? |
| 185 | + setInterval(function() { |
| 186 | + Object.keys(historyTopics).forEach(function(el) { |
| 187 | + historyTopics[el].tick(1000); |
| 188 | + }); |
| 189 | + }, 1000); |
| 190 | + // Handle log dir serving |
| 191 | + pathToLog = require('path').resolve(__dirname, "../log"); |
| 192 | + if (require("fs").existsSync(pathToLog)) { |
| 193 | + log("log directory found at "+pathToLog+". Enabling logging."); |
| 194 | + } else { |
| 195 | + log("log directory not found. Not logging."); |
| 196 | + pathToLog = undefined; |
| 197 | + } |
| 198 | +} |
0 commit comments