diff --git a/cosmic-core/server/src/main/java/com/cloud/mom/kafka/KafkaEventBus.java b/cosmic-core/server/src/main/java/com/cloud/mom/kafka/KafkaEventBus.java index 3b75a88414..4b4eee6786 100644 --- a/cosmic-core/server/src/main/java/com/cloud/mom/kafka/KafkaEventBus.java +++ b/cosmic-core/server/src/main/java/com/cloud/mom/kafka/KafkaEventBus.java @@ -29,7 +29,8 @@ public class KafkaEventBus extends ManagerBase implements EventBus { @Inject public VMInstanceDao _vmDao; - private String defaultTopic = "cosmic"; + private static final String DEFAULT_TOPIC = "cosmic"; + private String _topic = DEFAULT_TOPIC; private Producer _producer; private static final Logger s_logger = LoggerFactory.getLogger(KafkaEventBus.class); @Override @@ -40,13 +41,18 @@ public boolean configure(String name, Map params) throws Configu try { final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties")); props.load(is); + + _topic = (String) props.remove("topic"); + if (_topic == null) { + _topic = DEFAULT_TOPIC; + } + is.close(); } catch (Exception e) { // Fallback to default properties props.setProperty("bootstrap.servers", "192.168.22.1:9092"); props.setProperty("acks", "all"); props.setProperty("retries", "1"); - props.setProperty("topic", "cosmic"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } @@ -75,13 +81,7 @@ public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws Ev @Override public void publish(Event event) throws EventBusException { - - String topic = getDefaultTopic(); - if (event.getTopic() != null) { - topic = event.getTopic(); - } - - ProducerRecord record = new ProducerRecord(topic, event.getResourceUUID(), event.getDescription()); + ProducerRecord record = new ProducerRecord(_topic, event.getResourceUUID(), event.getDescription()); _producer.send(record); } @@ -99,8 +99,4 @@ public boolean start() { public boolean stop() { return true; } - - public String getDefaultTopic() { - return defaultTopic; - } } \ No newline at end of file