diff --git a/rocketmq-broker/src/failover/escape_bridge.rs b/rocketmq-broker/src/failover/escape_bridge.rs index 5f403638..f09689be 100644 --- a/rocketmq-broker/src/failover/escape_bridge.rs +++ b/rocketmq-broker/src/failover/escape_bridge.rs @@ -216,6 +216,51 @@ where Some(value) => value, } } + + pub async fn async_put_message( + &mut self, + mut message_ext: MessageExtBrokerInner, + ) -> PutMessageResult { + if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID { + self.message_store.put_message(message_ext).await + } else if self.broker_config.enable_slave_acting_master + && self.broker_config.enable_remote_escape + { + message_ext.set_wait_store_msg_ok(false); + let topic_publish_info = self + .topic_route_info_manager + .try_to_find_topic_publish_info(message_ext.get_topic()) + .await; + if topic_publish_info.is_none() { + return PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable); + } + let topic_publish_info = topic_publish_info.unwrap(); + let mq_selected = topic_publish_info.select_one_message_queue(); + if mq_selected.is_none() { + return PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable); + } + let message_queue = mq_selected.unwrap(); + message_ext.message_ext_inner.queue_id = message_queue.get_queue_id(); + let broker_name_to_send = message_queue.get_broker_name(); + let broker_addr_to_send = self + .topic_route_info_manager + .find_broker_address_in_publish(Some(broker_name_to_send)); + let producer_group = self.get_producer_group(&message_ext); + let result = self + .broker_outer_api + .send_message_to_specific_broker( + broker_addr_to_send.as_ref().unwrap(), + broker_name_to_send, + message_ext.message_ext_inner, + producer_group, + SEND_TIMEOUT, + ) + .await; + transform_send_result2put_result(result.ok()) + } else { + PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable) + } + } } #[inline]