Skip to content

Commit

Permalink
[ISSUE #2400]🤡Optimize PopMessageProcessor pop_msg_from_queue method🧑…
Browse files Browse the repository at this point in the history
…‍💻 (#2401)
  • Loading branch information
mxsm authored Jan 25, 2025
1 parent b1b31ac commit d186560
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ where
offset,
request_header.max_msg_nums as i32
- get_message_result.message_mapped_list().len() as i32,
// 1024 * 1024,
message_filter.clone(),
)
.await;
Expand Down Expand Up @@ -992,6 +991,7 @@ where
.get_max_offset_in_queue(topic, queue_id)
- atomic_offset.load(Ordering::Acquire)
+ atomic_rest_num.load(Ordering::Acquire);
self.queue_lock_manager().unlock_with_key(lock_key).await;
return num;
}
Some(value) => value,
Expand Down Expand Up @@ -1037,6 +1037,7 @@ where
.broker_name
.as_str(),
) {
self.queue_lock_manager().unlock_with_key(lock_key).await;
return atomic_rest_num.load(Ordering::Acquire)
+ result_inner.message_count() as i64;
}
Expand All @@ -1053,25 +1054,28 @@ where
queue_id,
result_inner.message_queue_offset().clone(),
);
} else if (result_inner.status().is_some()
&& (result_inner.status().unwrap() == GetMessageStatus::NoMatchedMessage
|| result_inner.status().unwrap() == GetMessageStatus::OffsetFoundNull
|| result_inner.status().unwrap() == GetMessageStatus::MessageWasRemoving
|| result_inner.status().unwrap() == GetMessageStatus::NoMatchedLogicQueue))
&& result_inner.next_begin_offset() > -1
{
if is_order {
self.broker_runtime_inner
.consumer_offset_manager()
.commit_offset(
channel.remote_address().to_string().into(),
&request_header.consumer_group,
topic,
queue_id,
result_inner.next_begin_offset(),
);
} else {
unimplemented!("PopMessageProcessor pop_msg_from_queue")
} else if let Some(status) = result_inner.status() {
if matches!(
status,
GetMessageStatus::NoMatchedMessage
| GetMessageStatus::OffsetFoundNull
| GetMessageStatus::MessageWasRemoving
| GetMessageStatus::NoMatchedLogicQueue
) && result_inner.next_begin_offset() > -1
{
if is_order {
self.broker_runtime_inner
.consumer_offset_manager()
.commit_offset(
channel.remote_address().to_string().into(),
&request_header.consumer_group,
topic,
queue_id,
result_inner.next_begin_offset(),
);
} else {
unimplemented!("PopMessageProcessor pop_msg_from_queue")
}
}
}

Expand Down

0 comments on commit d186560

Please # to comment.