From 337d8ff76fd329029913591a8f238bf7b8e4fb36 Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 3 Jan 2025 06:01:15 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2047]=F0=9F=A7=90Implement=20PopMessa?= =?UTF-8?q?geProcessor#buildCkMsg=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 1 + .../src/processor/pop_message_processor.rs | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index ac50f75a..d24e6188 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -571,6 +571,7 @@ impl BrokerRuntime { self.subscription_group_manager.clone(), self.consumer_filter_manager.clone(), self.pop_inflight_message_counter.clone(), + self.store_host, )); let ack_message_processor = ArcMut::new(AckMessageProcessor::new( self.topic_config_manager.clone(), diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index e296d3b2..23a7ee97 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -17,6 +17,7 @@ #![allow(unused_variables)] use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; @@ -36,6 +37,9 @@ use rocketmq_common::common::filter::expression_type::ExpressionType; use rocketmq_common::common::key_builder::KeyBuilder; use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE; use rocketmq_common::common::message::message_decoder; +use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner; +use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; use rocketmq_common::common::mix_all; use rocketmq_common::common::pop_ack_constants::PopAckConstants; use rocketmq_common::common::FAQUrl; @@ -51,6 +55,7 @@ use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; +use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use rocketmq_rust::ArcMut; use rocketmq_store::base::get_message_result::GetMessageResult; @@ -97,6 +102,7 @@ pub struct PopMessageProcessor { pop_inflight_message_counter: Arc, queue_lock_manager: QueueLockManager, revive_topic: CheetahString, + store_host: SocketAddr, } impl PopMessageProcessor { @@ -111,6 +117,7 @@ impl PopMessageProcessor { subscription_group_manager: Arc>, consumer_filter_manager: Arc, pop_inflight_message_counter: Arc, + store_host: SocketAddr, ) -> Self { let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic( broker_config.broker_identity.broker_cluster_name.as_str(), @@ -136,6 +143,7 @@ impl PopMessageProcessor { pop_inflight_message_counter, queue_lock_manager, revive_topic, + store_host, } } } @@ -1177,6 +1185,24 @@ impl PopMessageProcessor { pub fn pop_buffer_merge_service_mut(&mut self) -> &mut ArcMut { &mut self.pop_buffer_merge_service } + + pub fn build_ck_msg(&self, ck: &PopCheckPoint, revive_qid: i32) -> MessageExtBrokerInner { + let mut msg = MessageExtBrokerInner::default(); + msg.set_topic(self.revive_topic.clone()); + msg.set_body(Bytes::from(ck.to_json().unwrap())); + msg.message_ext_inner.queue_id = revive_qid; + msg.set_tags(CheetahString::from_static_str(PopAckConstants::CK_TAG)); + msg.message_ext_inner.born_timestamp = get_current_millis() as i64; + msg.message_ext_inner.born_host = self.store_host; + msg.message_ext_inner.store_host = self.store_host; + msg.set_delay_time_ms((ck.get_revive_time() - PopAckConstants::ACK_TIME_INTERVAL) as u64); + msg.put_property( + CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), + CheetahString::from_string(PopMessageProcessor::::gen_ck_unique_id(ck)), + ); + msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties()); + msg + } } struct TimedLock {