From 559e382a3c8f95abe13b14a7a7309b10337cd7fc Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 19 Dec 2024 16:34:05 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1870]=E2=9A=A1=EF=B8=8FReplace=20othe?= =?UTF-8?q?r=20structs=20attribute=20store=5Fhost=20with=20BrokerRuntime?= =?UTF-8?q?=20store=5Fhost=20(#1871)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 6 +++++- rocketmq-broker/src/processor/ack_message_processor.rs | 4 +--- rocketmq-broker/src/processor/reply_message_processor.rs | 4 +--- rocketmq-broker/src/processor/send_message_processor.rs | 4 +--- .../src/transaction/queue/transactional_message_bridge.rs | 4 +--- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index d4b59c4f..902802a4 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -473,6 +473,7 @@ impl BrokerRuntime { self.transactional_message_service.as_ref().unwrap().clone(), self.rebalance_lock_manager.clone(), self.broker_stats_manager.clone(), + self.store_host, ); let reply_message_processor = ReplyMessageProcessor::new( self.topic_queue_mapping_manager.clone(), @@ -484,6 +485,7 @@ impl BrokerRuntime { self.broker_stats_manager.clone(), Some(self.producer_manager.clone()), self.transactional_message_service.as_ref().unwrap().clone(), + self.store_host, ); let mut pull_message_result_handler = ArcMut::new(Box::new(DefaultPullMessageResultHandler::new( @@ -566,6 +568,7 @@ impl BrokerRuntime { self.escape_bridge.clone(), self.broker_config.clone(), self.pop_inflight_message_counter.clone(), + self.store_host, )); BrokerRequestProcessor { send_message_processor: ArcMut::new(send_message_processor), @@ -749,7 +752,8 @@ impl BrokerRuntime { self.broker_stats_manager.clone(), self.consumer_offset_manager.clone(), self.broker_config.clone(), - self.topic_config_manager.clone() + self.topic_config_manager.clone(), + self.store_host ); let service = DefaultTransactionalMessageService::new(bridge); self.transactional_message_service = Some(ArcMut::new(service)); diff --git a/rocketmq-broker/src/processor/ack_message_processor.rs b/rocketmq-broker/src/processor/ack_message_processor.rs index 6d75037f..6d3b65fb 100644 --- a/rocketmq-broker/src/processor/ack_message_processor.rs +++ b/rocketmq-broker/src/processor/ack_message_processor.rs @@ -76,10 +76,8 @@ where escape_bridge: ArcMut>, broker_config: Arc, pop_inflight_message_counter: Arc, + store_host: SocketAddr, ) -> AckMessageProcessor { - let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) - .parse::() - .unwrap(); AckMessageProcessor { topic_config_manager, message_store, diff --git a/rocketmq-broker/src/processor/reply_message_processor.rs b/rocketmq-broker/src/processor/reply_message_processor.rs index 1932d5da..aa9c2de6 100644 --- a/rocketmq-broker/src/processor/reply_message_processor.rs +++ b/rocketmq-broker/src/processor/reply_message_processor.rs @@ -73,10 +73,8 @@ where broker_stats_manager: Arc, producer_manager: Option>, transactional_message_service: ArcMut, + store_host: SocketAddr, ) -> Self { - let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) - .parse::() - .unwrap(); Self { inner: Inner { broker_config, diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index 81a87b2a..eb5115c0 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -193,10 +193,8 @@ where transactional_message_service: ArcMut, rebalance_lock_manager: Arc, broker_stats_manager: Arc, + store_host: SocketAddr, ) -> Self { - let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) - .parse::() - .unwrap(); Self { inner: ArcMut::new(Inner { broker_config, diff --git a/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs b/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs index cc2d7cae..76047e32 100644 --- a/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs +++ b/rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs @@ -75,10 +75,8 @@ where consumer_offset_manager: ConsumerOffsetManager, broker_config: Arc, topic_config_manager: TopicConfigManager, + store_host: SocketAddr, ) -> Self { - let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) - .parse::() - .expect("parse store host failed"); Self { op_queue_map: Arc::new(Mutex::new(HashMap::new())), message_store,