-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathProducer.js
64 lines (48 loc) · 1.58 KB
/
Producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
const Kafka = require('node-rdkafka');
const host = require('./util/host');
class Producer
{
constructor(logger, config) {
this.logger = logger;
this.producer = new Kafka.HighLevelProducer({
'client.id' : 'websocket-' + host(),
'metadata.broker.list': config.get('kafka_host'),
'batch.num.messages': '1',
'queue.buffering.max.ms':'25',
'socket.blocking.max.ms':'25',
'socket.nagle.disable':'true',
'request.required.acks':'1',
});
}
connect() {
return new Promise((resolve, reject) => {
this.producer.connect({}, () => {
reject({message: 'Kafka service is not available, try again...'});
});
this.producer.on('event.error', (err) => {
reject("Producer Error: " + err);
});
// when the connection is ready to produce, resolve
this.producer.on('ready', () => {
resolve();
});
});
}
publish(message) {
//console.log(message);
const errorHandler = (err, offset) => {
if (err) {
// TODO - Retry?
this.logger.error(err);
}
};
this.producer.produce('as_parallel_bg_queue_low_priority', // The topic
null, // The partition - Let it auto select
Buffer.from(message), // The message to send
null, // No keys
Date.now(),
errorHandler
);
}
}
module.exports = Producer;