diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index 3d5e236c..d21cfc67 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -33,7 +33,7 @@ pub const TAG: &str = "*"; #[rocketmq::main] pub async fn main() -> Result<()> { //init logger - rocketmq_common::log::init_logger(); + //rocketmq_common::log::init_logger(); // create a producer builder with default configuration let builder = DefaultMQPushConsumer::builder(); @@ -59,6 +59,7 @@ impl MessageListenerConcurrently for MyMessageListener { ) -> Result { for msg in msgs { info!("Receive message: {:?}", msg); + println!("Receive message: {:?}", msg); } Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) } diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index 6e25825f..f4e2ef88 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -1317,7 +1317,7 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl { } } - async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet) { + async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet) { let sub_table = self.rebalance_impl.get_subscription_inner(); let sub_table_inner = sub_table.read().await; if sub_table_inner.contains_key(topic) { diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 3fb13ee0..5d4d3b73 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -327,9 +327,6 @@ where } } MessageModel::Clustering => { - let topic_sub_cloned = self.topic_subscribe_info_table.clone(); - let topic_subscribe_info_table_inner = topic_sub_cloned.read().await; - let mq_set = topic_subscribe_info_table_inner.get(topic); //get consumer id list from broker let cid_all = self .client_instance @@ -337,6 +334,9 @@ where .unwrap() .find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap()) .await; + let topic_sub_cloned = self.topic_subscribe_info_table.clone(); + let topic_subscribe_info_table_inner = topic_sub_cloned.read().await; + let mq_set = topic_subscribe_info_table_inner.get(topic); if mq_set.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { if let Some(mut sub_rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() diff --git a/rocketmq-client/src/consumer/mq_consumer_inner.rs b/rocketmq-client/src/consumer/mq_consumer_inner.rs index 43e3daa3..451dc191 100644 --- a/rocketmq-client/src/consumer/mq_consumer_inner.rs +++ b/rocketmq-client/src/consumer/mq_consumer_inner.rs @@ -67,7 +67,7 @@ pub trait MQConsumerInnerLocal: MQConsumerInnerAny + Sync + 'static { /// /// * `topic` - A string slice that holds the name of the topic. /// * `info` - A reference to a `HashSet` containing `MessageQueue` information. - async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet); + async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet); /// Checks if the subscription information for a given topic needs to be updated asynchronously. /// @@ -204,7 +204,7 @@ impl MQConsumerInner for MQConsumerInnerImpl { panic!("default_mqpush_consumer_impl is None"); } - async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet) { + async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet) { if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl { if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() { return MQConsumerInner::update_topic_subscribe_info( diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index a9e52a51..c7e4b640 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -206,6 +206,8 @@ impl MQClientInstance { broker_version_table: Arc::new(Default::default()), send_heartbeat_times_total: Arc::new(AtomicI64::new(0)), }); + let instance_clone = instance.clone(); + instance.mq_admin_impl.set_client(instance_clone); let weak_instance = ArcRefCellWrapper::downgrade(&instance); let (tx, mut rx) = tokio::sync::broadcast::channel::(16); @@ -583,11 +585,11 @@ impl MQClientInstance { // Update sub info { - let mut consumer_table = self.consumer_table.write().await; + let consumer_table = self.consumer_table.read().await; if !consumer_table.is_empty() { let subscribe_info = topic_route_data2topic_subscribe_info(topic, &topic_route_data); - for (_, value) in consumer_table.iter_mut() { + for (_, value) in consumer_table.iter() { value .update_topic_subscribe_info(topic, &subscribe_info) .await; diff --git a/rocketmq-client/src/implementation/mq_admin_impl.rs b/rocketmq-client/src/implementation/mq_admin_impl.rs index 50fbad3d..c7e3b632 100644 --- a/rocketmq-client/src/implementation/mq_admin_impl.rs +++ b/rocketmq-client/src/implementation/mq_admin_impl.rs @@ -21,19 +21,26 @@ use rocketmq_remoting::protocol::namespace_util::NamespaceUtil; use crate::base::client_config::ClientConfig; use crate::error::MQClientError::MQClientErr; use crate::factory::mq_client_instance; +use crate::factory::mq_client_instance::MQClientInstance; use crate::implementation::mq_client_api_impl::MQClientAPIImpl; use crate::Result; pub struct MQAdminImpl { timeout_millis: u64, + client: Option>, } impl MQAdminImpl { pub fn new() -> Self { MQAdminImpl { timeout_millis: 60000, + client: None, } } + + pub fn set_client(&mut self, client: ArcRefCellWrapper) { + self.client = Some(client); + } } impl MQAdminImpl { @@ -93,6 +100,30 @@ impl MQAdminImpl { } pub async fn max_offset(&mut self, mq: &MessageQueue) -> Result { + let client = self.client.as_mut().expect("client is None"); + let broker_name = client.get_broker_name_from_message_queue(mq).await; + let mut broker_addr = client + .find_broker_address_in_publish(broker_name.as_str()) + .await; + if broker_addr.is_none() { + client + .update_topic_route_info_from_name_server_topic(mq.get_topic()) + .await; + let broker_name = client.get_broker_name_from_message_queue(mq).await; + broker_addr = client + .find_broker_address_in_publish(broker_name.as_str()) + .await; + } + if let Some(ref broker_addr) = broker_addr { + let offset = client + .mq_client_api_impl + .as_mut() + .expect("mq_client_api_impl is None") + .get_max_offset(broker_addr, mq, self.timeout_millis) + .await?; + return Ok(offset); + } + unimplemented!("max_offset") } pub async fn search_offset(&mut self, mq: &MessageQueue, timestamp: u64) -> Result { diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 76cc434f..8fe9873b 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -47,6 +47,8 @@ use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequ use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader; use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader; use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader; +use rocketmq_remoting::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader; +use rocketmq_remoting::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader; use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader; use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader; use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; @@ -68,6 +70,7 @@ use rocketmq_remoting::protocol::RemotingDeserializable; use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::remoting::RemotingService; use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader; +use rocketmq_remoting::rpc::topic_request_header::TopicRequestHeader; use rocketmq_remoting::runtime::config::client_config::TokioClientConfig; use rocketmq_remoting::runtime::RPCHook; use tracing::error; @@ -1147,4 +1150,50 @@ impl MQClientAPIImpl { .await; Ok(()) } + + pub async fn get_max_offset( + &mut self, + addr: &str, + message_queue: &MessageQueue, + timeout_millis: u64, + ) -> Result { + let request_header = GetMaxOffsetRequestHeader { + topic: message_queue.get_topic().to_string(), + queue_id: message_queue.get_queue_id(), + committed: false, + topic_request_header: Some(TopicRequestHeader { + rpc_request_header: Some(RpcRequestHeader { + broker_name: Some(message_queue.get_broker_name().to_string()), + ..Default::default() + }), + lo: None, + }), + }; + + let request = + RemotingCommand::create_request_command(RequestCode::GetMaxOffset, request_header); + + let response = self + .remoting_client + .invoke_async( + Some(mix_all::broker_vip_channel( + self.client_config.vip_channel_enabled, + addr, + )), + request, + timeout_millis, + ) + .await?; + if ResponseCode::from(response.code()) == ResponseCode::Success { + let response_header = response + .decode_command_custom_header::() + .expect("decode error"); + return Ok(response_header.offset); + } + Err(MQBrokerError( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string(), + )) + } }