Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #1737]💥Implement QueueLockManager function🚀 #1738

Merged
merged 3 commits into from
Dec 13, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Dec 13, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1737

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced QueueLockManager for improved message processing concurrency control.
    • Added methods for managing locks, including creation, acquisition, and release.
    • Implemented a background task for periodic cleanup of unused locks.
  • Bug Fixes

    • Enhanced resource management by ensuring expired locks are removed efficiently.
  • Tests

    • Added unit tests for QueueLockManager to verify lock functionality and cache behavior.

Copy link
Contributor

coderabbitai bot commented Dec 13, 2024

Walkthrough

The changes introduce a new structure called QueueLockManager in the pop_message_processor.rs file, designed to manage locks for message processing in a concurrent environment. This structure includes methods for creating lock keys, acquiring and releasing locks, and cleaning up unused locks. A background task is initiated to periodically clean expired locks. Additionally, unit tests for the QueueLockManager are added to ensure the correctness of its functionalities.

Changes

File Path Change Summary
rocketmq-broker/src/processor/pop_message_processor.rs Added QueueLockManager struct with methods for lock management and background cleanup task. Also included unit tests for its functionality.

Assessment against linked issues

Objective Addressed Explanation
Implement QueueLockManager function (1737)

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • SpaceXCN
  • TeslaRustor

Poem

In the land of queues where messages flow,
A lock manager's born, to handle the show.
With keys that are clever and locks that are tight,
Concurrency dances, all day and all night.
So hop with delight, for changes are here,
A smoother process, let's all give a cheer! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

@rocketmq-rust-robot rocketmq-rust-robot added Difficulty level/Moderate Moderate difficult ISSUE feature🚀 Suggest an idea for this project. labels Dec 13, 2024
@rocketmq-rust-robot rocketmq-rust-robot added this to the v0.4.0 milestone Dec 13, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)

150-152: Consider renaming expired_local_cache to lock_cache for clarity

The field expired_local_cache in QueueLockManager may not clearly represent its purpose. Since it stores all active locks (not just expired ones), consider renaming it to lock_cache or lock_map to improve code readability.


161-174: Optimize build_lock_key to return CheetahString directly

Currently, build_lock_key returns a String, which is then converted to a CheetahString in the try_lock and unlock methods. To avoid unnecessary conversions and improve performance, consider modifying build_lock_key to return a CheetahString directly.

Apply this diff to modify the function:

 pub fn build_lock_key(
     topic: &CheetahString,
     consumer_group: &CheetahString,
     queue_id: i32,
 ) -> CheetahString {
-    format!(
+    CheetahString::from_string(format!(
         "{}{}{}{}{}",
         topic,
         PopAckConstants::SPLIT,
         consumer_group,
         PopAckConstants::SPLIT,
         queue_id
-    )
+    ))
 }

224-225: Update log output to reflect actual lock count after cleanup

After modifying clean_unused_locks to return the number of remaining locks, adjust the log statement to accurately report the current lock cache size.

Apply this diff to update the log:

 let count = self.clean_unused_locks(60000).await;
-info!("QueueLockSize={}", count);
+info!("QueueLockSize after cleanup={}", count);

150-229: Add documentation comments to public methods

Adding documentation comments to the public methods of QueueLockManager will enhance code readability and maintainability, helping other developers understand their purpose and usage.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eddc45d and a747372.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/pop_message_processor.rs (4 hunks)

Comment on lines +188 to +191
let mut cache = self.expired_local_cache.lock().await;
let lock = cache.entry(key).or_insert(TimedLock::new());
lock.try_lock()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a concurrent DashMap to reduce contention

Using a Mutex to guard the entire expired_local_cache may cause contention in high-concurrency scenarios. Consider replacing it with a thread-safe concurrent map like DashMap to improve performance.

Apply these changes to use DashMap:

  1. Add the dependency in Cargo.toml:

    [dependencies]
    dashmap = "5.3"
  2. Update the import statement:

    -use tokio::sync::Mutex;
    +use dashmap::DashMap;
  3. Modify the QueueLockManager struct:

    pub struct QueueLockManager {
    -    expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
    +    expired_local_cache: Arc<DashMap<CheetahString, TimedLock>>,
    }
  4. Update methods to use DashMap APIs without locking:

    -let mut cache = self.expired_local_cache.lock().await;
    -let lock = cache.entry(key).or_insert(TimedLock::new());
    +let lock = self.expired_local_cache.entry(key).or_insert_with(TimedLock::new);

    Adjust other methods similarly to eliminate explicit locking.

Comment on lines 210 to 218
pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
let mut cache = self.expired_local_cache.lock().await;
let mut count = 0;
cache.retain(|_, lock| {
count += 1;
get_current_millis() - lock.get_lock_time() <= used_expire_millis
});
count
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix clean_unused_locks count to reflect remaining locks

In the clean_unused_locks method, the count variable represents the total number of processed locks, not the number of remaining locks after cleanup. This can be misleading in logs and tests. Modify the function to return the number of remaining locks instead.

Apply this diff to fix the issue:

 pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
     let mut cache = self.expired_local_cache.lock().await;
-    let mut count = 0;
     cache.retain(|_, lock| {
-        count += 1;
         get_current_millis() - lock.get_lock_time() <= used_expire_millis
     });
-    count
+    cache.len() as i32
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
let mut cache = self.expired_local_cache.lock().await;
let mut count = 0;
cache.retain(|_, lock| {
count += 1;
get_current_millis() - lock.get_lock_time() <= used_expire_millis
});
count
}
pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
let mut cache = self.expired_local_cache.lock().await;
cache.retain(|_, lock| {
get_current_millis() - lock.get_lock_time() <= used_expire_millis
});
cache.len() as i32
}

Copy link

codecov bot commented Dec 13, 2024

Codecov Report

Attention: Patch coverage is 93.69369% with 7 lines in your changes missing coverage. Please review.

Project coverage is 28.22%. Comparing base (eddc45d) to head (8c28172).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...etmq-broker/src/processor/pop_message_processor.rs 93.69% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1738      +/-   ##
==========================================
+ Coverage   28.11%   28.22%   +0.11%     
==========================================
  Files         466      466              
  Lines       64277    64388     +111     
==========================================
+ Hits        18071    18175     +104     
- Misses      46206    46213       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/pop_message_processor.rs (2)

220-228: Make cleanup interval and expiry time configurable

The background task uses hardcoded values for:

  • Cleanup interval: 60 seconds
  • Lock expiry time: 60000 milliseconds

Consider making these values configurable through the QueueLockManager constructor or environment variables.

pub struct QueueLockManager {
    expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
+   cleanup_interval_secs: u64,
+   lock_expiry_millis: u64,
}

impl QueueLockManager {
-   pub fn new() -> Self {
+   pub fn new(cleanup_interval_secs: Option<u64>, lock_expiry_millis: Option<u64>) -> Self {
        QueueLockManager {
            expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))),
+           cleanup_interval_secs: cleanup_interval_secs.unwrap_or(60),
+           lock_expiry_millis: lock_expiry_millis.unwrap_or(60000),
        }
    }

    pub fn start(self: Arc<Self>) {
        tokio::spawn(async move {
            loop {
-               tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-               let count = self.clean_unused_locks(60000).await;
+               tokio::time::sleep(tokio::time::Duration::from_secs(self.cleanup_interval_secs)).await;
+               let count = self.clean_unused_locks(self.lock_expiry_millis).await;
                info!("QueueLockSize={}", count);
            }
        });
    }
}

331-390: Improve test coverage and naming conventions

  1. Test naming:

    • Remove inconsistent "1" suffixes (e.g., try_lock_locks_successfully1).
    • Use descriptive names that indicate the scenario being tested.
  2. Missing test cases:

    • Concurrent access scenarios
    • Edge cases for expired locks
    • Cleanup task behavior

Example of a concurrent test:

#[tokio::test]
async fn concurrent_lock_attempts_are_thread_safe() {
    let manager = Arc::new(QueueLockManager::new());
    let topic = CheetahString::from_static_str("test_topic");
    let consumer_group = CheetahString::from_static_str("test_group");
    let queue_id = 1;
    
    let mut handles = vec![];
    for _ in 0..10 {
        let manager = manager.clone();
        let topic = topic.clone();
        let consumer_group = consumer_group.clone();
        handles.push(tokio::spawn(async move {
            manager.try_lock(&topic, &consumer_group, queue_id).await
        }));
    }
    
    let results = futures::future::join_all(handles).await;
    let successful_locks = results.into_iter().filter(|r| r.unwrap()).count();
    assert_eq!(successful_locks, 1);
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a747372 and 5486d70.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/pop_message_processor.rs (4 hunks)
🔇 Additional comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)

150-158: 🛠️ Refactor suggestion

Consider using DashMap and documenting capacity choice

As mentioned in previous reviews, using DashMap instead of Arc<Mutex<HashMap>> would provide better performance in concurrent scenarios by reducing lock contention. Additionally, please document why 4096 was chosen as the initial capacity.


210-218: ⚠️ Potential issue

Fix count to reflect remaining locks

As mentioned in previous reviews, the count should reflect the number of remaining locks after cleanup, not the number of processed locks.


220-228: 🛠️ Refactor suggestion

Add error handling for background cleanup task

The background task should handle and log any errors that occur during cleanup to prevent silent failures.

pub fn start(self: Arc<Self>) {
    tokio::spawn(async move {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-           let count = self.clean_unused_locks(60000).await;
-           info!("QueueLockSize={}", count);
+           match self.clean_unused_locks(60000).await {
+               Ok(count) => info!("QueueLockSize={}", count),
+               Err(e) => error!("Failed to clean unused locks: {}", e),
+           }
        }
    });
}

Likely invalid or redundant comment.


187-191: 🛠️ Refactor suggestion

Optimize mutex guard duration

The mutex guard is held for the entire duration of the function. Consider releasing it earlier by moving the try_lock() call outside the guard:

pub async fn try_lock_with_key(&self, key: CheetahString) -> bool {
-    let mut cache = self.expired_local_cache.lock().await;
-    let lock = cache.entry(key).or_insert(TimedLock::new());
-    lock.try_lock()
+    let lock = {
+        let mut cache = self.expired_local_cache.lock().await;
+        cache.entry(key).or_insert(TimedLock::new()).clone()
+    };
+    lock.try_lock()
}

Likely invalid or redundant comment.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)

156-158: Consider making initial capacity configurable

The fixed capacity of 4096 might not be suitable for all use cases. Consider making it configurable through a constructor parameter.

-    pub fn new() -> Self {
+    pub fn new(initial_capacity: usize) -> Self {
         QueueLockManager {
-            expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))),
+            expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(initial_capacity))),
         }
     }

161-174: Optimize string key generation

The current implementation creates intermediate String allocations. Consider using a pre-allocated buffer or string builder for better performance.

     pub fn build_lock_key(
         topic: &CheetahString,
         consumer_group: &CheetahString,
         queue_id: i32,
-    ) -> String {
-        format!(
-            "{}{}{}{}{}",
-            topic,
-            PopAckConstants::SPLIT,
-            consumer_group,
-            PopAckConstants::SPLIT,
-            queue_id
-        )
+    ) -> CheetahString {
+        let mut buffer = String::with_capacity(
+            topic.len() + consumer_group.len() + 10
+        );
+        buffer.push_str(topic.as_str());
+        buffer.push_str(PopAckConstants::SPLIT);
+        buffer.push_str(consumer_group.as_str());
+        buffer.push_str(PopAckConstants::SPLIT);
+        buffer.push_str(&queue_id.to_string());
+        CheetahString::from(buffer)
     }

217-225: Make cleanup interval configurable

The background task's interval is hardcoded to 60 seconds. Consider making it configurable to allow for different cleanup strategies.

-    pub fn start(self: Arc<Self>) {
+    pub fn start(self: Arc<Self>, cleanup_interval_secs: u64) {
         tokio::spawn(async move {
             loop {
-                tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+                tokio::time::sleep(tokio::time::Duration::from_secs(cleanup_interval_secs)).await;
                 let count = self.clean_unused_locks(60000).await;
                 info!("QueueLockSize={}", count);
             }
         });
     }

345-362: Improve test naming consistency and coverage

  1. Test names have inconsistent suffixes (e.g., try_lock_locks_successfully1).
  2. Missing tests for concurrent access scenarios.

Consider:

  1. Removing numeric suffixes from test names
  2. Adding concurrent access tests:
#[tokio::test]
async fn try_lock_handles_concurrent_access() {
    let manager = Arc::new(QueueLockManager::new());
    let topic = CheetahString::from_static_str("test_topic");
    let consumer_group = CheetahString::from_static_str("test_group");
    let queue_id = 1;
    
    let mut handles = vec![];
    for _ in 0..10 {
        let manager = manager.clone();
        let topic = topic.clone();
        let consumer_group = consumer_group.clone();
        handles.push(tokio::spawn(async move {
            manager.try_lock(&topic, &consumer_group, queue_id).await
        }));
    }
    
    let results = futures::future::join_all(handles).await;
    assert_eq!(results.into_iter().filter(|r| r.unwrap()).count(), 1);
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5486d70 and 8c28172.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/pop_message_processor.rs (4 hunks)
🔇 Additional comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)

17-17: 🛠️ Refactor suggestion

Consider using DashMap for better concurrency

Using a Mutex to guard the entire HashMap may cause contention in high-concurrency scenarios. Consider using dashmap::DashMap instead.

Also applies to: 21-21, 33-33


210-215: ⚠️ Potential issue

Fix lock cleanup count logic

The method returns the total count before cleanup instead of the number of remaining locks.

     pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> usize {
         let mut cache = self.expired_local_cache.lock().await;
-        let count = cache.len();
         cache.retain(|_, lock| get_current_millis() - lock.get_lock_time() <= used_expire_millis);
-        count
+        cache.len()
     }

376-387: ⚠️ Potential issue

Update test assertions for clean_unused_locks

The test assertions need to be updated to match the fixed cleanup count logic.


187-191: Verify lock acquisition patterns

Let's verify that there are no potential deadlocks or race conditions in lock usage patterns across the codebase.

✅ Verification successful

Based on the comprehensive search results, I can now generate the final response:

Lock implementation follows safe patterns

The lock implementation in try_lock_with_key follows safe patterns with:

  • Proper lock acquisition using tokio's async mutex
  • No nested locks that could cause deadlocks
  • Clear unlock paths in corresponding unlock_with_key method
  • Comprehensive test coverage validating lock behavior
  • Timeout mechanisms to prevent indefinite blocking

The codebase shows consistent lock/unlock patterns across different components with proper error handling and timeouts.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all lock acquisitions and releases
echo "Searching for lock acquisition patterns..."
rg -A 5 "try_lock|unlock" --type rust

# Search for potential nested locks
echo "Checking for potential nested locks..."
ast-grep --pattern 'try_lock($_) {
  $$$
  try_lock($_)
  $$$
}'

Length of output: 62867

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge Difficulty level/Moderate Moderate difficult ISSUE feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement QueueLockManager function
4 participants