From ae75e2c16cb5ebe2a367ebede5021926028d6dd7 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 19 Dec 2024 03:53:17 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1848]=F0=9F=94=A5Implement=20AckMessa?= =?UTF-8?q?geProcessor#append=5Fack=20method-2=20=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 2 + .../src/processor/ack_message_processor.rs | 124 ++++++++++++------ .../src/processor/pop_message_processor.rs | 17 +-- .../pop_buffer_merge_service.rs | 4 +- 4 files changed, 100 insertions(+), 47 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index f404f139..403d3015 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -548,6 +548,8 @@ impl BrokerRuntime { let ack_message_processor = ArcMut::new(AckMessageProcessor::new( self.topic_config_manager.clone(), self.message_store.as_ref().unwrap().clone(), + self.escape_bridge.clone(), + self.broker_config.clone(), )); BrokerRequestProcessor { send_message_processor: ArcMut::new(send_message_processor), diff --git a/rocketmq-broker/src/processor/ack_message_processor.rs b/rocketmq-broker/src/processor/ack_message_processor.rs index 3b57d3f0..db9fc8cf 100644 --- a/rocketmq-broker/src/processor/ack_message_processor.rs +++ b/rocketmq-broker/src/processor/ack_message_processor.rs @@ -16,9 +16,14 @@ */ #![allow(unused_variables)] +use std::net::SocketAddr; +use std::sync::Arc; + use bytes::Bytes; use cheetah_string::CheetahString; +use rocketmq_common::common::broker::broker_config::BrokerConfig; use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE; +use rocketmq_common::common::message::message_decoder; use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner; use rocketmq_common::common::message::MessageConst; use rocketmq_common::common::message::MessageTrait; @@ -37,12 +42,16 @@ use rocketmq_remoting::protocol::RemotingDeserializable; use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use rocketmq_rust::ArcMut; +use rocketmq_store::base::message_status_enum::PutMessageStatus; use rocketmq_store::log_file::MessageStore; use rocketmq_store::pop::ack_msg::AckMsg; use rocketmq_store::pop::batch_ack_msg::BatchAckMsg; +use rocketmq_store::pop::AckMessage; +use tracing::error; use crate::broker_error::BrokerError::BrokerCommonError; use crate::broker_error::BrokerError::BrokerRemotingError; +use crate::failover::escape_bridge::EscapeBridge; use crate::processor::pop_message_processor::PopMessageProcessor; use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService; use crate::topic::manager::topic_config_manager::TopicConfigManager; @@ -51,6 +60,8 @@ pub struct AckMessageProcessor { topic_config_manager: TopicConfigManager, message_store: ArcMut, pop_buffer_merge_service: ArcMut, + escape_bridge: ArcMut>, + store_host: SocketAddr, } impl AckMessageProcessor @@ -60,12 +71,19 @@ where pub fn new( topic_config_manager: TopicConfigManager, message_store: ArcMut, + escape_bridge: ArcMut>, + broker_config: Arc, ) -> AckMessageProcessor { + let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) + .parse::() + .unwrap(); AckMessageProcessor { topic_config_manager, message_store, /* need to implement PopBufferMergeService */ pop_buffer_merge_service: ArcMut::new(PopBufferMergeService), + escape_bridge, + store_host, } } @@ -77,8 +95,10 @@ where request: RemotingCommand, ) -> crate::Result> { match request_code { - RequestCode::AckMessage => self.process_ack(channel, ctx, request, true), - RequestCode::BatchAckMessage => self.process_batch_ack(channel, ctx, request, true), + RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await, + RequestCode::BatchAckMessage => { + self.process_batch_ack(channel, ctx, request, true).await + } _ => Ok(Some( RemotingCommand::create_response_command_with_code_remark( ResponseCode::MessageIllegal, @@ -96,7 +116,7 @@ impl AckMessageProcessor where MS: MessageStore, { - fn process_ack( + async fn process_ack( &mut self, channel: Channel, _ctx: ConnectionHandlerContext, @@ -159,11 +179,12 @@ where )); } let mut response = RemotingCommand::create_response_command(); - self.append_ack(Some(request_header), &mut response, None, &channel, None); + self.append_ack(Some(request_header), &mut response, None, &channel, None) + .await; Ok(Some(response)) } - fn process_batch_ack( + async fn process_batch_ack( &mut self, _channel: Channel, _ctx: ConnectionHandlerContext, @@ -185,12 +206,13 @@ where let mut response = RemotingCommand::create_response_command(); let broker_name = &req_body.broker_name; for ack in req_body.acks { - self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name)); + self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name)) + .await; } Ok(Some(response)) } - fn append_ack( + async fn append_ack( &mut self, request_header: Option, response: &mut RemotingCommand, @@ -198,7 +220,6 @@ where channel: &Channel, broker_name: Option<&CheetahString>, ) { - let is_batch_ack = request_header.is_none(); //handle single ack let ( consume_group, @@ -252,7 +273,7 @@ where pop_time, invisible_time, ack_count, - ack, + Box::new(ack) as Box, CheetahString::from(broker_name), ) } else { @@ -280,18 +301,15 @@ where return; } - let batch_ack_msg = BatchAckMsg::default(); - //need to ack orderly - /* let bit_set = &batch_ack.bit_set.0; - let mut i = bit_set.iter().next(); - while let Some(bit) = i { - let x = bit.; - if bit.deref() == u32::MAX { + let mut batch_ack_msg = BatchAckMsg::default(); + + let bit_set = &batch_ack.bit_set.0; + for i in bit_set.iter_ones() { + if i == usize::MAX { break; } - let offset = start_offset + bit as u64; + let offset = batch_ack.start_offset + i as i64; if offset < min_offset || offset > max_offset { - i = bit_set.iter().next(); continue; } if r_qid == POP_ORDER_REVIVE_QUEUE { @@ -308,13 +326,15 @@ where } else { batch_ack_msg.ack_offset_list.push(offset); } - i = bit_set.iter().next(); - }*/ + } + if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() { + return; + } if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() { return; } let ack_count = batch_ack_msg.ack_offset_list.len(); - let ack = batch_ack_msg.ack_msg; + //let ack = batch_ack_msg.ack_msg; ( consume_group, topic, @@ -325,7 +345,7 @@ where pop_time, invisible_time, ack_count, - ack, + Box::new(batch_ack_msg) as Box, broker_name.unwrap().clone(), ) }; @@ -333,40 +353,70 @@ where //this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount); //this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic, // ackCount); - ack_msg.consumer_group = consume_group; - ack_msg.topic = topic.clone(); - ack_msg.queue_id = qid; - ack_msg.start_offset = start_offset; - ack_msg.ack_offset = ack_offset; - ack_msg.pop_time = pop_time; - ack_msg.broker_name = broker_name; - if self.pop_buffer_merge_service.add_ack(r_qid, &ack_msg) { + ack_msg.set_consumer_group(consume_group); + ack_msg.set_topic(topic.clone()); + ack_msg.set_queue_id(qid); + ack_msg.set_start_offset(start_offset); + ack_msg.set_ack_offset(ack_offset); + ack_msg.set_pop_time(pop_time); + ack_msg.set_broker_name(broker_name); + if self + .pop_buffer_merge_service + .add_ack(r_qid, ack_msg.as_ref()) + { return; } let mut inner = MessageExtBrokerInner::default(); inner.set_topic(topic); - inner.set_body(Bytes::from(ack_msg.encode().unwrap())); inner.message_ext_inner.queue_id = qid; - if is_batch_ack { - /*inner.set_tags(CheetahString::from_static_str( + if let Some(batch_ack) = ack_msg.as_any().downcast_ref::() { + inner.set_body(Bytes::from(batch_ack.encode().unwrap())); + inner.set_tags(CheetahString::from_static_str( PopAckConstants::BATCH_ACK_TAG, )); inner.put_property( CheetahString::from_static_str( MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ), - CheetahString::from(PopMessageProcessor::gen_batch_ack_unique_id()), - );*/ - } else { + CheetahString::from(PopMessageProcessor::gen_batch_ack_unique_id(batch_ack)), + ); + } else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::() { + inner.set_body(Bytes::from(ack_msg.encode().unwrap())); inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG)); inner.put_property( CheetahString::from_static_str( MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ), - CheetahString::from(PopMessageProcessor::gen_ack_unique_id(&ack_msg)), + CheetahString::from(PopMessageProcessor::gen_ack_unique_id( + ack_msg as &dyn AckMessage, + )), ); } inner.message_ext_inner.born_timestamp = get_current_millis() as i64; + inner.message_ext_inner.store_host = self.store_host; + inner.set_delay_time_ms((pop_time + invisible_time) as u64); + inner.put_property( + CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), + CheetahString::from(PopMessageProcessor::gen_ack_unique_id(ack_msg.as_ref())), + ); + inner.properties_string = + message_decoder::message_properties_to_string(inner.get_properties()); + let put_message_result = self + .escape_bridge + .put_message_to_specific_queue(inner) + .await; + match put_message_result.put_message_status() { + PutMessageStatus::PutOk + | PutMessageStatus::FlushDiskTimeout + | PutMessageStatus::FlushSlaveTimeout + | PutMessageStatus::SlaveNotAvailable => {} + _ => { + error!( + "put ack msg error:{:?}", + put_message_result.put_message_status() + ); + } + } } fn ack_orderly( diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index 01e92746..109b2ce2 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -27,9 +27,9 @@ use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::net::channel::Channel; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; -use rocketmq_store::pop::ack_msg::AckMsg; use rocketmq_store::pop::batch_ack_msg::BatchAckMsg; use rocketmq_store::pop::pop_check_point::PopCheckPoint; +use rocketmq_store::pop::AckMessage; use tokio::sync::Mutex; use tracing::info; @@ -53,20 +53,20 @@ impl PopMessageProcessor { } impl PopMessageProcessor { - pub fn gen_ack_unique_id(ack_msg: &AckMsg) -> String { + pub fn gen_ack_unique_id(ack_msg: &dyn AckMessage) -> String { format!( "{}{}{}{}{}{}{}{}{}{}{}{}{}", - ack_msg.topic, + ack_msg.topic(), PopAckConstants::SPLIT, - ack_msg.queue_id, + ack_msg.queue_id(), PopAckConstants::SPLIT, - ack_msg.ack_offset, + ack_msg.ack_offset(), PopAckConstants::SPLIT, - ack_msg.consumer_group, + ack_msg.consumer_group(), PopAckConstants::SPLIT, - ack_msg.pop_time, + ack_msg.pop_time(), PopAckConstants::SPLIT, - ack_msg.broker_name, + ack_msg.broker_name(), PopAckConstants::SPLIT, PopAckConstants::ACK_TAG ) @@ -232,6 +232,7 @@ impl QueueLockManager { #[cfg(test)] mod tests { use cheetah_string::CheetahString; + use rocketmq_store::pop::ack_msg::AckMsg; use super::*; diff --git a/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs b/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs index 9b19b817..f01a58a5 100644 --- a/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs +++ b/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs @@ -15,12 +15,12 @@ * limitations under the License. */ -use rocketmq_store::pop::ack_msg::AckMsg; +use rocketmq_store::pop::AckMessage; pub(crate) struct PopBufferMergeService; impl PopBufferMergeService { - pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &AckMsg) -> bool { + pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &dyn AckMessage) -> bool { unimplemented!("Not implemented yet"); } }