Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #777]🔥Fix Broker can not started🐛 #778

Merged
merged 1 commit into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -416,10 +415,11 @@ impl BrokerRuntime {
self.broker_config.clone(),
));

let pull_message_result_handler = pull_message_result_handler.as_mut() as &mut dyn Any;
let pull_message_result_handler = pull_message_result_handler.as_mut().as_mut();
pull_message_result_handler
.as_any_mut()
.downcast_mut::<DefaultPullMessageResultHandler>()
.unwrap()
.expect("downcast DefaultPullMessageResultHandler failed")
.set_pull_request_hold_service(Some(Arc::new(
self.pull_request_hold_service.clone().unwrap(),
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::any::Any;
use std::net::SocketAddr;
use std::sync::Arc;

Expand Down Expand Up @@ -231,6 +231,14 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
_ => None,
}
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl DefaultPullMessageResultHandler {
Expand Down
43 changes: 42 additions & 1 deletion rocketmq-broker/src/processor/pull_message_result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;

use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
Expand All @@ -25,7 +26,37 @@ use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::filter::MessageFilter;

pub trait PullMessageResultHandler: Sync + Send + 'static {
/// Trait defining the behavior for handling the result of a pull message request.
///
/// This trait is designed to be implemented by types that handle the result of a pull message
/// request in a RocketMQ broker. It provides a method for processing the result of a message
/// retrieval operation, along with various parameters related to the request and the broker's
/// state.
pub trait PullMessageResultHandler: Sync + Send + Any + 'static {
/// Handles the result of a pull message request.
///
/// This method processes the result of a message retrieval operation (`get_message_result`),
/// using the provided request information, channel, context, subscription data, and other
/// parameters to generate an appropriate response.
///
/// # Parameters
/// - `get_message_result`: The result of the message retrieval operation.
/// - `request`: The original remoting command representing the pull message request.
/// - `request_header`: The header of the pull message request, containing request-specific
/// information.
/// - `channel`: The channel through which the request was received.
/// - `ctx`: The connection handler context associated with the request.
/// - `subscription_data`: Subscription data for the consumer making the request.
/// - `subscription_group_config`: Configuration for the subscription group of the consumer.
/// - `broker_allow_suspend`: Flag indicating whether the broker allows suspending the request.
/// - `message_filter`: The message filter to apply to the retrieved messages.
/// - `response`: The initial response remoting command to be potentially modified and returned.
/// - `mapping_context`: Context for topic-queue mapping.
/// - `begin_time_mills`: The timestamp (in milliseconds) when the request began processing.
///
/// # Returns
/// An optional `RemotingCommand` representing the response to the pull message request.
/// If `None`, it indicates that no response should be sent back to the client.
fn handle(
&self,
get_message_result: GetMessageResult,
Expand All @@ -41,4 +72,14 @@ pub trait PullMessageResultHandler: Sync + Send + 'static {
mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
) -> Option<RemotingCommand>;

/// Returns a mutable reference to `self` as a trait object of type `Any`.
///
/// This method is useful for downcasting the trait object to its concrete type.
fn as_any_mut(&mut self) -> &mut dyn Any;

/// Returns a reference to `self` as a trait object of type `Any`.
///
/// This method is useful for downcasting the trait object to its concrete type.
fn as_any(&self) -> &dyn Any;
}
Loading