Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #1737]💥Implement QueueLockManager function🚀 #1738

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::code::request_code::RequestCode;
Expand All @@ -27,6 +30,8 @@
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use tokio::sync::Mutex;
use tracing::info;

#[derive(Default)]
pub struct PopMessageProcessor {}
Expand Down Expand Up @@ -142,6 +147,84 @@
}
}

pub struct QueueLockManager {
expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
}

impl QueueLockManager {
pub fn new() -> Self {
QueueLockManager {
expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))),
}
}

pub fn build_lock_key(
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) -> String {
format!(
"{}{}{}{}{}",
topic,
PopAckConstants::SPLIT,
consumer_group,
PopAckConstants::SPLIT,
queue_id
)
}

pub async fn try_lock(
&self,
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) -> bool {
let key = Self::build_lock_key(topic, consumer_group, queue_id);
self.try_lock_with_key(CheetahString::from_string(key))
.await
}

pub async fn try_lock_with_key(&self, key: CheetahString) -> bool {
let mut cache = self.expired_local_cache.lock().await;
let lock = cache.entry(key).or_insert(TimedLock::new());
lock.try_lock()
}
Comment on lines +188 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a concurrent DashMap to reduce contention

Using a Mutex to guard the entire expired_local_cache may cause contention in high-concurrency scenarios. Consider replacing it with a thread-safe concurrent map like DashMap to improve performance.

Apply these changes to use DashMap:

  1. Add the dependency in Cargo.toml:

    [dependencies]
    dashmap = "5.3"
  2. Update the import statement:

    -use tokio::sync::Mutex;
    +use dashmap::DashMap;
  3. Modify the QueueLockManager struct:

    pub struct QueueLockManager {
    -    expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
    +    expired_local_cache: Arc<DashMap<CheetahString, TimedLock>>,
    }
  4. Update methods to use DashMap APIs without locking:

    -let mut cache = self.expired_local_cache.lock().await;
    -let lock = cache.entry(key).or_insert(TimedLock::new());
    +let lock = self.expired_local_cache.entry(key).or_insert_with(TimedLock::new);

    Adjust other methods similarly to eliminate explicit locking.


pub async fn unlock(
&self,
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) {
let key = Self::build_lock_key(topic, consumer_group, queue_id);
self.unlock_with_key(CheetahString::from_string(key)).await;
}

pub async fn unlock_with_key(&self, key: CheetahString) {
let cache = self.expired_local_cache.lock().await;
if let Some(lock) = cache.get(&key) {
lock.unlock();
}
}

pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> usize {
let mut cache = self.expired_local_cache.lock().await;
let count = cache.len();
cache.retain(|_, lock| get_current_millis() - lock.get_lock_time() <= used_expire_millis);
count
}

pub fn start(self: Arc<Self>) {
tokio::spawn(async move {

Check warning on line 218 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L217-L218

Added lines #L217 - L218 were not covered by tests
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
let count = self.clean_unused_locks(60000).await;
info!("QueueLockSize={}", count);

Check warning on line 222 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L220-L222

Added lines #L220 - L222 were not covered by tests
}
});
}

Check warning on line 225 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L224-L225

Added lines #L224 - L225 were not covered by tests
}

#[cfg(test)]
mod tests {
use cheetah_string::CheetahString;
Expand Down Expand Up @@ -241,4 +324,65 @@
let lock_time = lock.get_lock_time();
assert!(lock_time >= initial_time);
}

#[tokio::test]
async fn new_queue_lock_manager_has_empty_cache() {
let manager = QueueLockManager::new();
let cache = manager.expired_local_cache.lock().await;
assert!(cache.is_empty());
}

#[tokio::test]
async fn build_lock_key_formats_correctly() {
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
let key = QueueLockManager::build_lock_key(&topic, &consumer_group, queue_id);
let expected = "test_topic@test_group@1";
assert_eq!(key, expected);
}

#[tokio::test]
async fn try_lock_locks_successfully1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
assert!(manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn try_lock_fails_when_already_locked1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
assert!(!manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn unlock_unlocks_successfully1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
manager.unlock(&topic, &consumer_group, queue_id).await;
assert!(manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn clean_unused_locks_removes_expired_locks() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let removed_count = manager.clean_unused_locks(5).await;
assert_eq!(removed_count, 1);
let removed_count = manager.clean_unused_locks(15).await;
assert_eq!(removed_count, 0);
}
}
Loading