Skip to content

Commit

Permalink
Merge pull request #923 from MissionCriticalCloud/set-kafka-topic
Browse files Browse the repository at this point in the history
Correctly set Kafka topic
  • Loading branch information
ddegoede authored Feb 18, 2021
2 parents e4fbe5e + bad1722 commit 2864ff3
Showing 1 changed file with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class KafkaEventBus extends ManagerBase implements EventBus {
@Inject
public VMInstanceDao _vmDao;

private String defaultTopic = "cosmic";
private static final String DEFAULT_TOPIC = "cosmic";
private String _topic = DEFAULT_TOPIC;
private Producer<String,String> _producer;
private static final Logger s_logger = LoggerFactory.getLogger(KafkaEventBus.class);
@Override
Expand All @@ -40,13 +41,18 @@ public boolean configure(String name, Map<String, Object> params) throws Configu
try {
final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties"));
props.load(is);

_topic = (String) props.remove("topic");
if (_topic == null) {
_topic = DEFAULT_TOPIC;
}

is.close();
} catch (Exception e) {
// Fallback to default properties
props.setProperty("bootstrap.servers", "192.168.22.1:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "1");
props.setProperty("topic", "cosmic");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
Expand Down Expand Up @@ -75,13 +81,7 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev

@Override
public void publish(Event event) throws EventBusException {

String topic = getDefaultTopic();
if (event.getTopic() != null) {
topic = event.getTopic();
}

ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, event.getResourceUUID(), event.getDescription());
ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(record);
}

Expand All @@ -99,8 +99,4 @@ public boolean start() {
public boolean stop() {
return true;
}

public String getDefaultTopic() {
return defaultTopic;
}
}

0 comments on commit 2864ff3

Please # to comment.