From d186560d59a3bf157bdfd63754485544b4f9a806 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 25 Jan 2025 08:34:05 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2400]=F0=9F=A4=A1Optimize=20PopMessag?= =?UTF-8?q?eProcessor=20pop=5Fmsg=5Ffrom=5Fqueue=20method=F0=9F=A7=91?= =?UTF-8?q?=E2=80=8D=F0=9F=92=BB=20(#2401)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/processor/pop_message_processor.rs | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index 35b7efd5..2ea51fbc 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -928,7 +928,6 @@ where offset, request_header.max_msg_nums as i32 - get_message_result.message_mapped_list().len() as i32, - // 1024 * 1024, message_filter.clone(), ) .await; @@ -992,6 +991,7 @@ where .get_max_offset_in_queue(topic, queue_id) - atomic_offset.load(Ordering::Acquire) + atomic_rest_num.load(Ordering::Acquire); + self.queue_lock_manager().unlock_with_key(lock_key).await; return num; } Some(value) => value, @@ -1037,6 +1037,7 @@ where .broker_name .as_str(), ) { + self.queue_lock_manager().unlock_with_key(lock_key).await; return atomic_rest_num.load(Ordering::Acquire) + result_inner.message_count() as i64; } @@ -1053,25 +1054,28 @@ where queue_id, result_inner.message_queue_offset().clone(), ); - } else if (result_inner.status().is_some() - && (result_inner.status().unwrap() == GetMessageStatus::NoMatchedMessage - || result_inner.status().unwrap() == GetMessageStatus::OffsetFoundNull - || result_inner.status().unwrap() == GetMessageStatus::MessageWasRemoving - || result_inner.status().unwrap() == GetMessageStatus::NoMatchedLogicQueue)) - && result_inner.next_begin_offset() > -1 - { - if is_order { - self.broker_runtime_inner - .consumer_offset_manager() - .commit_offset( - channel.remote_address().to_string().into(), - &request_header.consumer_group, - topic, - queue_id, - result_inner.next_begin_offset(), - ); - } else { - unimplemented!("PopMessageProcessor pop_msg_from_queue") + } else if let Some(status) = result_inner.status() { + if matches!( + status, + GetMessageStatus::NoMatchedMessage + | GetMessageStatus::OffsetFoundNull + | GetMessageStatus::MessageWasRemoving + | GetMessageStatus::NoMatchedLogicQueue + ) && result_inner.next_begin_offset() > -1 + { + if is_order { + self.broker_runtime_inner + .consumer_offset_manager() + .commit_offset( + channel.remote_address().to_string().into(), + &request_header.consumer_group, + topic, + queue_id, + result_inner.next_begin_offset(), + ); + } else { + unimplemented!("PopMessageProcessor pop_msg_from_queue") + } } }