Skip to content

Commit

Permalink
[ISSUE #1698]🚀EscapeBridge supports asyncPutMessage (#1713)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 11, 2024
1 parent 5eb115a commit 809de18
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions rocketmq-broker/src/failover/escape_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,51 @@ where
Some(value) => value,
}
}

pub async fn async_put_message(
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
self.message_store.put_message(message_ext).await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
{
message_ext.set_wait_store_msg_ok(false);
let topic_publish_info = self
.topic_route_info_manager
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
return PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable);
}
let topic_publish_info = topic_publish_info.unwrap();
let mq_selected = topic_publish_info.select_one_message_queue();
if mq_selected.is_none() {
return PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable);
}
let message_queue = mq_selected.unwrap();
message_ext.message_ext_inner.queue_id = message_queue.get_queue_id();
let broker_name_to_send = message_queue.get_broker_name();
let broker_addr_to_send = self
.topic_route_info_manager
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
let result = self
.broker_outer_api
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send,
message_ext.message_ext_inner,
producer_group,
SEND_TIMEOUT,
)
.await;
transform_send_result2put_result(result.ok())
} else {
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
}
}

#[inline]
Expand Down

0 comments on commit 809de18

Please # to comment.