diff --git a/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs b/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs index fe45865a..4a40d4e9 100644 --- a/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs +++ b/rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs @@ -207,7 +207,7 @@ impl PopBufferMergeService { let queue = self.commit_offsets.get(lock_key); if let Some(queue) = queue { let queue = queue.value(); - let queue = queue.get_queue(); + let queue = queue.get().lock(); if queue.is_empty() { return -1; } @@ -449,7 +449,8 @@ impl PopBufferMergeService { let mut count = 0; for entry in self.commit_offsets.iter() { let queue = entry.value(); - count += queue.get_queue().len(); + let guard = queue.get().lock(); + count += guard.len(); } count } @@ -548,11 +549,12 @@ impl PopBufferMergeService { } fn put_offset_queue(&mut self, point_wrapper: PopCheckPointWrapper) -> bool { - let mut queue = self + let queue = self .commit_offsets .entry(point_wrapper.lock_key.clone()) .or_insert(QueueWithTime::new()); - queue.get_queue_mut().push_back(point_wrapper); + let mut guard = queue.get().lock(); + guard.push_back(point_wrapper); true } @@ -713,14 +715,14 @@ fn is_ck_done_for_finish(point_wrapper: &PopCheckPointWrapper) -> bool { } pub struct QueueWithTime { - queue: VecDeque, + queue: Arc>>, time: u64, } impl QueueWithTime { pub fn new() -> Self { Self { - queue: VecDeque::new(), + queue: Arc::new(parking_lot::Mutex::new(VecDeque::new())), time: get_current_millis(), } } @@ -733,13 +735,9 @@ impl QueueWithTime { self.time } - pub fn get_queue(&self) -> &VecDeque { + pub fn get(&self) -> &Arc>> { &self.queue } - - pub fn get_queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } } pub struct PopCheckPointWrapper { @@ -956,7 +954,8 @@ mod tests { #[test] fn queue_with_time_initializes_correctly() { let queue_with_time: QueueWithTime = QueueWithTime::new(); - assert!(queue_with_time.get_queue().is_empty()); + let guard = queue_with_time.get().lock(); + assert!(guard.is_empty()); assert!(queue_with_time.get_time() > 0); } @@ -969,8 +968,9 @@ mod tests { #[test] fn get_queue_mut_returns_mutable_reference() { - let mut queue_with_time: QueueWithTime = QueueWithTime::new(); - queue_with_time.get_queue_mut().push_back(1); - assert_eq!(queue_with_time.get_queue().len(), 1); + let queue_with_time: QueueWithTime = QueueWithTime::new(); + let mut guard = queue_with_time.get().lock(); + guard.push_back(1); + assert_eq!(guard.len(), 1); } }