-
Notifications
You must be signed in to change notification settings - Fork 3
/
kafclient.js
97 lines (82 loc) · 2.01 KB
/
kafclient.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
'use strict'
const req = require('@tpp/req')
const PORT = 7749
/* way/
* get a set of messages from the log file pass to the processor and ask
* the scheduler how/when to continue. Returns a control structure that
* can be used to stop getting more data (aside from using the scheduler)
*/
function get(log, processor, scheduler, from) {
let ctrl = { stop:false }
from = from || 1
let p = `http://localhost:${PORT}/get/${log}?from=`
get_1()
return ctrl
function get_1() {
if(ctrl.stop) return
let u = p + from
req.get(u, (err, resp) => {
if(ctrl.stop) return
let last = resp.headers()["x-kafjs-lastmsgsent"]
if(last) {
last = parseInt(last)
if(!isNaN(last)) from = last + 1
}
if(err) return schedule_1(err)
if(resp) {
if(!Array.isArray(resp.body)) {
return schedule_1({ err: "bad response", resp })
} else {
resp = resp.body
}
}
let end = (resp && resp.length) ? false : true
if(!end) processor(resp, from)
return schedule_1(null, end, from)
})
}
function schedule_1(err, end, from) {
let tm = scheduler(err, end, from)
if(tm) setTimeout(get_1, tm)
}
}
/* understand/
* put the logs in the order they come in, retrying on failure
*/
let PENDING = []
let sending
function sendPending() {
if(sending || !PENDING.length) return
sending = true
let m = PENDING[0]
let url = `http://localhost:${PORT}/put/${m.log}`
req.send({
method: "POST",
url,
data:m.msg,
headers: { "Content-Type": "application/json" },
}, (err, resp) => {
sending = false
if(err) {
console.error(err)
setTimeout(sendPending, 2 * 1000)
} else {
PENDING.shift()
m.cb && m.cb()
sendPending()
}
})
}
/* way/
* add the message to the put queue and kick off the sending process
*/
function put(msg, log, cb) {
msg = JSON.stringify(msg)
PENDING.push({ log, msg, cb })
sendPending()
}
module.exports = {
PORT,
put,
get,
}