-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkafkaproducer.js
74 lines (59 loc) · 1.82 KB
/
kafkaproducer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
const logger = require('./util/logger');
const Kafka = require('node-rdkafka');
const config = require('config');
//log(Kafka.librdkafkaVersion);
const host = require('./util/host');
const SERVER_HOST = host();
const client_id = 'nodeutil-' + SERVER_HOST;
logger.info("CLIENT ID = " + client_id);
const KAFKA_HOST = config.get('kafka_host');
logger.info("KAFKA BROKER = " + KAFKA_HOST);
//TODO: This may need to be configurable
const producer = new Kafka.HighLevelProducer({
'client.id' : client_id,
'metadata.broker.list': KAFKA_HOST,
'batch.num.messages': '1',
'queue.buffering.max.ms':'50',
'socket.blocking.max.ms':'50',
'socket.nagle.disable':'true',
});
//logging all errors
producer.on('event.error', (err) => {
//logger.error('Error from producer');
//logger.error(err);
});
/**
* Class to send to Kafka message queue
*/
class MQProducer {
initializeConnection(indicateReady) {
producer.connect();
producer.on('ready', ()=> {
// Call the function to indicate we are ready
indicateReady();
});
}
/**
* This is a simple method to send a message to PHP queue
* @param messageToSend
*/
publishToMQ(messageToSend) {
try {
//Send it to the PHP queue
producer.produce('as_parallel_bg_queue_low_priority', // The topic
null, // The partition - Let it auto select
Buffer.from(messageToSend), // The message to send
null, // No keys
Date.now(),
(err, offset) => {
if (err) {
logger.error(err);
}
});
}
catch (err) {
logger.error("Failed to publish to KAFKA");
}
}
}
module.exports = MQProducer;