From 55126f9a1bcce45c9a03c5c0bb60cd40fd3ca606 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 14 Jul 2024 09:00:50 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#777]=F0=9F=94=A5Fix=20Broker=20can=20?= =?UTF-8?q?not=20started=F0=9F=90=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 6 +-- .../default_pull_message_result_handler.rs | 10 ++++- .../processor/pull_message_result_handler.rs | 43 ++++++++++++++++++- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 381cd285..decd8ac0 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::any::Any; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; @@ -416,10 +415,11 @@ impl BrokerRuntime { self.broker_config.clone(), )); - let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any; + let pull_message_result_handler = pull_message_result_handler.as_mut().as_mut(); pull_message_result_handler + .as_any_mut() .downcast_mut::() - .unwrap() + .expect("downcast DefaultPullMessageResultHandler failed") .set_pull_request_hold_service(Some(Arc::new( self.pull_request_hold_service.clone().unwrap(), ))); diff --git a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs index d83896ba..a4228c8e 100644 --- a/rocketmq-broker/src/processor/default_pull_message_result_handler.rs +++ b/rocketmq-broker/src/processor/default_pull_message_result_handler.rs @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +use std::any::Any; use std::net::SocketAddr; use std::sync::Arc; @@ -231,6 +231,14 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler { _ => None, } } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn as_any(&self) -> &dyn Any { + self + } } impl DefaultPullMessageResultHandler { diff --git a/rocketmq-broker/src/processor/pull_message_result_handler.rs b/rocketmq-broker/src/processor/pull_message_result_handler.rs index 5ccbfdbb..151475f8 100644 --- a/rocketmq-broker/src/processor/pull_message_result_handler.rs +++ b/rocketmq-broker/src/processor/pull_message_result_handler.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::any::Any; use rocketmq_remoting::net::channel::Channel; use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader; @@ -25,7 +26,37 @@ use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::base::get_message_result::GetMessageResult; use rocketmq_store::filter::MessageFilter; -pub trait PullMessageResultHandler: Sync + Send + 'static { +/// Trait defining the behavior for handling the result of a pull message request. +/// +/// This trait is designed to be implemented by types that handle the result of a pull message +/// request in a RocketMQ broker. It provides a method for processing the result of a message +/// retrieval operation, along with various parameters related to the request and the broker's +/// state. +pub trait PullMessageResultHandler: Sync + Send + Any + 'static { + /// Handles the result of a pull message request. + /// + /// This method processes the result of a message retrieval operation (`get_message_result`), + /// using the provided request information, channel, context, subscription data, and other + /// parameters to generate an appropriate response. + /// + /// # Parameters + /// - `get_message_result`: The result of the message retrieval operation. + /// - `request`: The original remoting command representing the pull message request. + /// - `request_header`: The header of the pull message request, containing request-specific + /// information. + /// - `channel`: The channel through which the request was received. + /// - `ctx`: The connection handler context associated with the request. + /// - `subscription_data`: Subscription data for the consumer making the request. + /// - `subscription_group_config`: Configuration for the subscription group of the consumer. + /// - `broker_allow_suspend`: Flag indicating whether the broker allows suspending the request. + /// - `message_filter`: The message filter to apply to the retrieved messages. + /// - `response`: The initial response remoting command to be potentially modified and returned. + /// - `mapping_context`: Context for topic-queue mapping. + /// - `begin_time_mills`: The timestamp (in milliseconds) when the request began processing. + /// + /// # Returns + /// An optional `RemotingCommand` representing the response to the pull message request. + /// If `None`, it indicates that no response should be sent back to the client. fn handle( &self, get_message_result: GetMessageResult, @@ -41,4 +72,14 @@ pub trait PullMessageResultHandler: Sync + Send + 'static { mapping_context: TopicQueueMappingContext, begin_time_mills: u64, ) -> Option; + + /// Returns a mutable reference to `self` as a trait object of type `Any`. + /// + /// This method is useful for downcasting the trait object to its concrete type. + fn as_any_mut(&mut self) -> &mut dyn Any; + + /// Returns a reference to `self` as a trait object of type `Any`. + /// + /// This method is useful for downcasting the trait object to its concrete type. + fn as_any(&self) -> &dyn Any; }