Skip to content

Commit

Permalink
[ISSUE #2047]🧐Implement PopMessageProcessor#buildCkMsg🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 3, 2025
1 parent 8ad41fe commit 337d8ff
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
1 change: 1 addition & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ impl BrokerRuntime {
self.subscription_group_manager.clone(),
self.consumer_filter_manager.clone(),
self.pop_inflight_message_counter.clone(),
self.store_host,
));
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
self.topic_config_manager.clone(),
Expand Down
26 changes: 26 additions & 0 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#![allow(unused_variables)]

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
Expand All @@ -36,6 +37,9 @@ use rocketmq_common::common::filter::expression_type::ExpressionType;
use rocketmq_common::common::key_builder::KeyBuilder;
use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::mix_all;
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
use rocketmq_common::common::FAQUrl;
Expand All @@ -51,6 +55,7 @@ use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::get_message_result::GetMessageResult;
Expand Down Expand Up @@ -97,6 +102,7 @@ pub struct PopMessageProcessor<MS> {
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
queue_lock_manager: QueueLockManager,
revive_topic: CheetahString,
store_host: SocketAddr,
}

impl<MS> PopMessageProcessor<MS> {
Expand All @@ -111,6 +117,7 @@ impl<MS> PopMessageProcessor<MS> {
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
consumer_filter_manager: Arc<ConsumerFilterManager>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
store_host: SocketAddr,
) -> Self {
let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
broker_config.broker_identity.broker_cluster_name.as_str(),
Expand All @@ -136,6 +143,7 @@ impl<MS> PopMessageProcessor<MS> {
pop_inflight_message_counter,
queue_lock_manager,
revive_topic,
store_host,
}
}
}
Expand Down Expand Up @@ -1177,6 +1185,24 @@ impl<MS> PopMessageProcessor<MS> {
pub fn pop_buffer_merge_service_mut(&mut self) -> &mut ArcMut<PopBufferMergeService> {
&mut self.pop_buffer_merge_service
}

pub fn build_ck_msg(&self, ck: &PopCheckPoint, revive_qid: i32) -> MessageExtBrokerInner {
let mut msg = MessageExtBrokerInner::default();
msg.set_topic(self.revive_topic.clone());
msg.set_body(Bytes::from(ck.to_json().unwrap()));
msg.message_ext_inner.queue_id = revive_qid;
msg.set_tags(CheetahString::from_static_str(PopAckConstants::CK_TAG));
msg.message_ext_inner.born_timestamp = get_current_millis() as i64;
msg.message_ext_inner.born_host = self.store_host;
msg.message_ext_inner.store_host = self.store_host;
msg.set_delay_time_ms((ck.get_revive_time() - PopAckConstants::ACK_TIME_INTERVAL) as u64);
msg.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
CheetahString::from_string(PopMessageProcessor::<MS>::gen_ck_unique_id(ck)),
);
msg.properties_string = message_decoder::message_properties_to_string(msg.get_properties());
msg
}
}

struct TimedLock {
Expand Down

0 comments on commit 337d8ff

Please # to comment.