From 106f67758813b6be6256201135c2b5921e6eb578 Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 6 Dec 2024 16:28:33 +0800 Subject: [PATCH] [ISSUE #1603]dd #[inline] to PopProcessQueue method and optimize code --- .../consumer_impl/pop_process_queue.rs | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs b/rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs index 7a04df9c..3a27c0a7 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs @@ -43,6 +43,7 @@ impl Default for PopProcessQueue { } impl PopProcessQueue { + #[inline] pub(crate) fn new() -> Self { PopProcessQueue { last_pop_timestamp: get_current_millis(), @@ -51,45 +52,54 @@ impl PopProcessQueue { } } + #[inline] pub(crate) fn get_last_pop_timestamp(&self) -> u64 { self.last_pop_timestamp } + #[inline] pub(crate) fn set_last_pop_timestamp(&mut self, last_pop_timestamp: u64) { self.last_pop_timestamp = last_pop_timestamp; } - pub(crate) fn inc_found_msg(&self, count: isize) { - self.wait_ack_counter - .fetch_add(count as usize, Ordering::Relaxed); + #[inline] + pub(crate) fn inc_found_msg(&self, count: usize) { + self.wait_ack_counter.fetch_add(count, Ordering::AcqRel); } + #[inline] pub(crate) fn ack(&self) -> usize { - self.wait_ack_counter.fetch_sub(1, Ordering::Relaxed) + self.wait_ack_counter.fetch_sub(1, Ordering::AcqRel) } - pub(crate) fn dec_found_msg(&self, count: isize) { - self.wait_ack_counter - .fetch_add(-count as usize, Ordering::Relaxed); + + #[inline] + pub(crate) fn dec_found_msg(&self, count: usize) { + self.wait_ack_counter.fetch_sub(count, Ordering::AcqRel); } + #[inline] pub(crate) fn get_wai_ack_msg_count(&self) -> usize { - self.wait_ack_counter.load(Ordering::Relaxed) + self.wait_ack_counter.load(Ordering::Acquire) } + #[inline] pub(crate) fn is_dropped(&self) -> bool { - self.dropped.load(Ordering::Relaxed) + self.dropped.load(Ordering::Acquire) } + #[inline] pub(crate) fn set_dropped(&self, dropped: bool) { - self.dropped.store(dropped, Ordering::Relaxed); + self.dropped.store(dropped, Ordering::Release); } + #[inline] pub(crate) fn fill_pop_process_queue_info(&self, info: &mut PopProcessQueueInfo) { info.set_wait_ack_count(self.get_wai_ack_msg_count() as i32); info.set_droped(self.is_dropped()); info.set_last_pop_timestamp(self.get_last_pop_timestamp()); } + #[inline] pub(crate) fn is_pull_expired(&self) -> bool { let current_time = get_current_millis(); current_time.saturating_sub(self.last_pop_timestamp) > *PULL_MAX_IDLE_TIME @@ -97,6 +107,7 @@ impl PopProcessQueue { } impl Display for PopProcessQueue { + #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f,