Skip to content

Commit

Permalink
[ISSUE #2392]💫Implement QueueWithTime thread sync safety🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 24, 2025
1 parent acdec58 commit 856bff8
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
let queue = self.commit_offsets.get(lock_key);
if let Some(queue) = queue {
let queue = queue.value();
let queue = queue.get_queue();
let queue = queue.get().lock();

Check warning on line 210 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L210

Added line #L210 was not covered by tests
if queue.is_empty() {
return -1;
}
Expand Down Expand Up @@ -449,7 +449,8 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
let mut count = 0;
for entry in self.commit_offsets.iter() {
let queue = entry.value();
count += queue.get_queue().len();
let guard = queue.get().lock();
count += guard.len();

Check warning on line 453 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L452-L453

Added lines #L452 - L453 were not covered by tests
}
count
}
Expand Down Expand Up @@ -548,11 +549,12 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
}

fn put_offset_queue(&mut self, point_wrapper: PopCheckPointWrapper) -> bool {
let mut queue = self
let queue = self

Check warning on line 552 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L552

Added line #L552 was not covered by tests
.commit_offsets
.entry(point_wrapper.lock_key.clone())
.or_insert(QueueWithTime::new());
queue.get_queue_mut().push_back(point_wrapper);
let mut guard = queue.get().lock();
guard.push_back(point_wrapper);

Check warning on line 557 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L556-L557

Added lines #L556 - L557 were not covered by tests
true
}

Expand Down Expand Up @@ -713,14 +715,14 @@ fn is_ck_done_for_finish(point_wrapper: &PopCheckPointWrapper) -> bool {
}

pub struct QueueWithTime<T> {
queue: VecDeque<T>,
queue: Arc<parking_lot::Mutex<VecDeque<T>>>,
time: u64,
}

impl<T> QueueWithTime<T> {
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
queue: Arc::new(parking_lot::Mutex::new(VecDeque::new())),
time: get_current_millis(),
}
}
Expand All @@ -733,13 +735,9 @@ impl<T> QueueWithTime<T> {
self.time
}

pub fn get_queue(&self) -> &VecDeque<T> {
pub fn get(&self) -> &Arc<parking_lot::Mutex<VecDeque<T>>> {
&self.queue
}

pub fn get_queue_mut(&mut self) -> &mut VecDeque<T> {
&mut self.queue
}
}

pub struct PopCheckPointWrapper {
Expand Down Expand Up @@ -956,7 +954,8 @@ mod tests {
#[test]
fn queue_with_time_initializes_correctly() {
let queue_with_time: QueueWithTime<i32> = QueueWithTime::new();
assert!(queue_with_time.get_queue().is_empty());
let guard = queue_with_time.get().lock();
assert!(guard.is_empty());
assert!(queue_with_time.get_time() > 0);
}

Expand All @@ -969,8 +968,9 @@ mod tests {

#[test]
fn get_queue_mut_returns_mutable_reference() {
let mut queue_with_time: QueueWithTime<i32> = QueueWithTime::new();
queue_with_time.get_queue_mut().push_back(1);
assert_eq!(queue_with_time.get_queue().len(), 1);
let queue_with_time: QueueWithTime<i32> = QueueWithTime::new();
let mut guard = queue_with_time.get().lock();
guard.push_back(1);
assert_eq!(guard.len(), 1);
}
}

0 comments on commit 856bff8

Please # to comment.