Skip to content

Commit

Permalink
[ISSUE #1979]♻️PopMessageProcessor supports process_request handle-5🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 31, 2024
1 parent dcff411 commit db8071a
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ where
let pop_time = get_current_millis();

let message_filter = message_filter.map(Arc::new);
let mut rest_num = 0;
if need_retry && !request_header.order.unwrap_or(false) {
if need_retry_v1 {
rest_num = if need_retry_v1 {
let retry_topic = CheetahString::from_string(KeyBuilder::build_pop_retry_topic_v1(
&request_header.topic,
&request_header.consumer_group,
Expand All @@ -454,7 +455,7 @@ where
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,
)
.await
} else {
Expand All @@ -476,12 +477,13 @@ where
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,
)
.await
};
}
let mut rest_num = if request_header.queue_id < 0 {
rest_num = if request_header.queue_id < 0 {
// read all queue
self.pop_msg_from_topic(
&topic_config,
false,
Expand All @@ -495,7 +497,7 @@ where
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,
)
.await
} else {
Expand All @@ -512,10 +514,11 @@ where
&mut msg_offset_info,
&mut order_count_info,
randomq,
0,
rest_num,
)
.await
};
// if not full , fetch retry again
if !need_retry
&& get_message_result.message_mapped_list().len() < request_header.max_msg_nums as usize
&& !request_header.order.unwrap_or(false)
Expand Down Expand Up @@ -567,10 +570,10 @@ where
};
}
let mut final_response = RemotingCommand::create_response_command();
if get_message_result.message_mapped_list().is_empty() {
if !get_message_result.message_mapped_list().is_empty() {
get_message_result.set_status(Some(GetMessageStatus::Found));
if rest_num > 0 {
//
// all queue pop can not notify specified queue pop, and vice versa
self.pop_long_polling_service.notify_message_arriving(
&request_header.topic,
request_header.queue_id,
Expand Down Expand Up @@ -607,10 +610,9 @@ where
PollingResult::PollingFull => {
final_response.set_code_ref(ResponseCode::PollingFull);
}
PollingResult::PollingTimeout => {
_ => {
final_response.set_code_ref(ResponseCode::PollingTimeout);
}
PollingResult::NotPolling => {}
}
get_message_result.set_status(Some(GetMessageStatus::NoMessageInQueue));
}
Expand Down

0 comments on commit db8071a

Please # to comment.