Skip to content

Commit

Permalink
[ISSUE #1848]🔥Implement AckMessageProcessor#append_ack method-2 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 19, 2024
1 parent 1314f5d commit ae75e2c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 47 deletions.
2 changes: 2 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ impl BrokerRuntime {
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
self.topic_config_manager.clone(),
self.message_store.as_ref().unwrap().clone(),
self.escape_bridge.clone(),
self.broker_config.clone(),

Check warning on line 552 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L551-L552

Added lines #L551 - L552 were not covered by tests
));
BrokerRequestProcessor {
send_message_processor: ArcMut::new(send_message_processor),
Expand Down
124 changes: 87 additions & 37 deletions rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
*/
#![allow(unused_variables)]

use std::net::SocketAddr;
use std::sync::Arc;

use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
Expand All @@ -37,12 +42,16 @@ use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::log_file::MessageStore;
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::AckMessage;
use tracing::error;

use crate::broker_error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::failover::escape_bridge::EscapeBridge;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::processor_service::pop_buffer_merge_service::PopBufferMergeService;
use crate::topic::manager::topic_config_manager::TopicConfigManager;
Expand All @@ -51,6 +60,8 @@ pub struct AckMessageProcessor<MS> {
topic_config_manager: TopicConfigManager,
message_store: ArcMut<MS>,
pop_buffer_merge_service: ArcMut<PopBufferMergeService>,
escape_bridge: ArcMut<EscapeBridge<MS>>,
store_host: SocketAddr,
}

impl<MS> AckMessageProcessor<MS>
Expand All @@ -60,12 +71,19 @@ where
pub fn new(
topic_config_manager: TopicConfigManager,
message_store: ArcMut<MS>,
escape_bridge: ArcMut<EscapeBridge<MS>>,
broker_config: Arc<BrokerConfig>,

Check warning on line 75 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L74-L75

Added lines #L74 - L75 were not covered by tests
) -> AckMessageProcessor<MS> {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
.unwrap();

Check warning on line 79 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L77-L79

Added lines #L77 - L79 were not covered by tests
AckMessageProcessor {
topic_config_manager,
message_store,
/* need to implement PopBufferMergeService */
pop_buffer_merge_service: ArcMut::new(PopBufferMergeService),
escape_bridge,
store_host,

Check warning on line 86 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L85-L86

Added lines #L85 - L86 were not covered by tests
}
}

Expand All @@ -77,8 +95,10 @@ where
request: RemotingCommand,
) -> crate::Result<Option<RemotingCommand>> {
match request_code {
RequestCode::AckMessage => self.process_ack(channel, ctx, request, true),
RequestCode::BatchAckMessage => self.process_batch_ack(channel, ctx, request, true),
RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await,

Check warning on line 98 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L98

Added line #L98 was not covered by tests
RequestCode::BatchAckMessage => {
self.process_batch_ack(channel, ctx, request, true).await

Check warning on line 100 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L100

Added line #L100 was not covered by tests
}
_ => Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::MessageIllegal,
Expand All @@ -96,7 +116,7 @@ impl<MS> AckMessageProcessor<MS>
where
MS: MessageStore,
{
fn process_ack(
async fn process_ack(

Check warning on line 119 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L119

Added line #L119 was not covered by tests
&mut self,
channel: Channel,
_ctx: ConnectionHandlerContext,
Expand Down Expand Up @@ -159,11 +179,12 @@ where
));
}
let mut response = RemotingCommand::create_response_command();
self.append_ack(Some(request_header), &mut response, None, &channel, None);
self.append_ack(Some(request_header), &mut response, None, &channel, None)
.await;

Check warning on line 183 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L182-L183

Added lines #L182 - L183 were not covered by tests
Ok(Some(response))
}

fn process_batch_ack(
async fn process_batch_ack(

Check warning on line 187 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L187

Added line #L187 was not covered by tests
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
Expand All @@ -185,20 +206,20 @@ where
let mut response = RemotingCommand::create_response_command();
let broker_name = &req_body.broker_name;
for ack in req_body.acks {
self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name));
self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name))
.await;

Check warning on line 210 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L209-L210

Added lines #L209 - L210 were not covered by tests
}
Ok(Some(response))
}

fn append_ack(
async fn append_ack(

Check warning on line 215 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L215

Added line #L215 was not covered by tests
&mut self,
request_header: Option<AckMessageRequestHeader>,
response: &mut RemotingCommand,
batch_ack: Option<BatchAck>,
channel: &Channel,
broker_name: Option<&CheetahString>,
) {
let is_batch_ack = request_header.is_none();
//handle single ack
let (
consume_group,
Expand Down Expand Up @@ -252,7 +273,7 @@ where
pop_time,
invisible_time,
ack_count,
ack,
Box::new(ack) as Box<dyn AckMessage + Send>,

Check warning on line 276 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L276

Added line #L276 was not covered by tests
CheetahString::from(broker_name),
)
} else {
Expand Down Expand Up @@ -280,18 +301,15 @@ where
return;
}

let batch_ack_msg = BatchAckMsg::default();
//need to ack orderly
/* let bit_set = &batch_ack.bit_set.0;
let mut i = bit_set.iter().next();
while let Some(bit) = i {
let x = bit.;
if bit.deref() == u32::MAX {
let mut batch_ack_msg = BatchAckMsg::default();

let bit_set = &batch_ack.bit_set.0;
for i in bit_set.iter_ones() {
if i == usize::MAX {

Check warning on line 308 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L304-L308

Added lines #L304 - L308 were not covered by tests
break;
}
let offset = start_offset + bit as u64;
let offset = batch_ack.start_offset + i as i64;

Check warning on line 311 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L311

Added line #L311 was not covered by tests
if offset < min_offset || offset > max_offset {
i = bit_set.iter().next();
continue;
}
if r_qid == POP_ORDER_REVIVE_QUEUE {
Expand All @@ -308,13 +326,15 @@ where
} else {
batch_ack_msg.ack_offset_list.push(offset);
}
i = bit_set.iter().next();
}*/
}
if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
return;
}

Check warning on line 332 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L330-L332

Added lines #L330 - L332 were not covered by tests
if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
return;
}
let ack_count = batch_ack_msg.ack_offset_list.len();
let ack = batch_ack_msg.ack_msg;
//let ack = batch_ack_msg.ack_msg;

Check warning on line 337 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L337

Added line #L337 was not covered by tests
(
consume_group,
topic,
Expand All @@ -325,48 +345,78 @@ where
pop_time,
invisible_time,
ack_count,
ack,
Box::new(batch_ack_msg) as Box<dyn AckMessage + Send>,

Check warning on line 348 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L348

Added line #L348 was not covered by tests
broker_name.unwrap().clone(),
)
};

//this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
//this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic,
// ackCount);
ack_msg.consumer_group = consume_group;
ack_msg.topic = topic.clone();
ack_msg.queue_id = qid;
ack_msg.start_offset = start_offset;
ack_msg.ack_offset = ack_offset;
ack_msg.pop_time = pop_time;
ack_msg.broker_name = broker_name;
if self.pop_buffer_merge_service.add_ack(r_qid, &ack_msg) {
ack_msg.set_consumer_group(consume_group);
ack_msg.set_topic(topic.clone());
ack_msg.set_queue_id(qid);
ack_msg.set_start_offset(start_offset);
ack_msg.set_ack_offset(ack_offset);
ack_msg.set_pop_time(pop_time);
ack_msg.set_broker_name(broker_name);
if self
.pop_buffer_merge_service
.add_ack(r_qid, ack_msg.as_ref())

Check warning on line 365 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L356-L365

Added lines #L356 - L365 were not covered by tests
{
return;
}
let mut inner = MessageExtBrokerInner::default();
inner.set_topic(topic);
inner.set_body(Bytes::from(ack_msg.encode().unwrap()));
inner.message_ext_inner.queue_id = qid;
if is_batch_ack {
/*inner.set_tags(CheetahString::from_static_str(
if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
inner.set_body(Bytes::from(batch_ack.encode().unwrap()));
inner.set_tags(CheetahString::from_static_str(

Check warning on line 374 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L372-L374

Added lines #L372 - L374 were not covered by tests
PopAckConstants::BATCH_ACK_TAG,
));
inner.put_property(
CheetahString::from_static_str(
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
),
CheetahString::from(PopMessageProcessor::gen_batch_ack_unique_id()),
);*/
} else {
CheetahString::from(PopMessageProcessor::gen_batch_ack_unique_id(batch_ack)),
);
} else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::<AckMsg>() {
inner.set_body(Bytes::from(ack_msg.encode().unwrap()));

Check warning on line 384 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L381-L384

Added lines #L381 - L384 were not covered by tests
inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
inner.put_property(
CheetahString::from_static_str(
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
),
CheetahString::from(PopMessageProcessor::gen_ack_unique_id(&ack_msg)),
CheetahString::from(PopMessageProcessor::gen_ack_unique_id(
ack_msg as &dyn AckMessage,
)),

Check warning on line 392 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L390-L392

Added lines #L390 - L392 were not covered by tests
);
}
inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
inner.message_ext_inner.store_host = self.store_host;
inner.set_delay_time_ms((pop_time + invisible_time) as u64);
inner.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
CheetahString::from(PopMessageProcessor::gen_ack_unique_id(ack_msg.as_ref())),
);
inner.properties_string =
message_decoder::message_properties_to_string(inner.get_properties());
let put_message_result = self
.escape_bridge
.put_message_to_specific_queue(inner)
.await;
match put_message_result.put_message_status() {

Check warning on line 408 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L396-L408

Added lines #L396 - L408 were not covered by tests
PutMessageStatus::PutOk
| PutMessageStatus::FlushDiskTimeout
| PutMessageStatus::FlushSlaveTimeout
| PutMessageStatus::SlaveNotAvailable => {}

Check warning on line 412 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L412

Added line #L412 was not covered by tests
_ => {
error!(
"put ack msg error:{:?}",
put_message_result.put_message_status()

Check warning on line 416 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L414-L416

Added lines #L414 - L416 were not covered by tests
);
}
}
}

fn ack_orderly(
Expand Down
17 changes: 9 additions & 8 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use rocketmq_store::pop::AckMessage;
use tokio::sync::Mutex;
use tracing::info;

Expand All @@ -53,20 +53,20 @@ impl PopMessageProcessor {
}

impl PopMessageProcessor {
pub fn gen_ack_unique_id(ack_msg: &AckMsg) -> String {
pub fn gen_ack_unique_id(ack_msg: &dyn AckMessage) -> String {
format!(
"{}{}{}{}{}{}{}{}{}{}{}{}{}",
ack_msg.topic,
ack_msg.topic(),
PopAckConstants::SPLIT,
ack_msg.queue_id,
ack_msg.queue_id(),
PopAckConstants::SPLIT,
ack_msg.ack_offset,
ack_msg.ack_offset(),
PopAckConstants::SPLIT,
ack_msg.consumer_group,
ack_msg.consumer_group(),
PopAckConstants::SPLIT,
ack_msg.pop_time,
ack_msg.pop_time(),
PopAckConstants::SPLIT,
ack_msg.broker_name,
ack_msg.broker_name(),
PopAckConstants::SPLIT,
PopAckConstants::ACK_TAG
)
Expand Down Expand Up @@ -232,6 +232,7 @@ impl QueueLockManager {
#[cfg(test)]
mod tests {
use cheetah_string::CheetahString;
use rocketmq_store::pop::ack_msg::AckMsg;

use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::AckMessage;

pub(crate) struct PopBufferMergeService;

impl PopBufferMergeService {
pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &AckMsg) -> bool {
pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &dyn AckMessage) -> bool {

Check warning on line 23 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L23

Added line #L23 was not covered by tests
unimplemented!("Not implemented yet");
}
}

0 comments on commit ae75e2c

Please # to comment.