Skip to content

Commit

Permalink
[ISSUE #1737]💥Implement QueueLockManager function🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 13, 2024
1 parent eddc45d commit a747372
Showing 1 changed file with 147 additions and 0 deletions.
147 changes: 147 additions & 0 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::code::request_code::RequestCode;
Expand All @@ -27,6 +30,8 @@ use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerCon
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use tokio::sync::Mutex;
use tracing::info;

#[derive(Default)]
pub struct PopMessageProcessor {}
Expand Down Expand Up @@ -142,6 +147,87 @@ impl TimedLock {
}
}

pub struct QueueLockManager {
expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>,
}

impl QueueLockManager {
pub fn new() -> Self {
QueueLockManager {
expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))),
}
}

pub fn build_lock_key(
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) -> String {
format!(
"{}{}{}{}{}",
topic,
PopAckConstants::SPLIT,
consumer_group,
PopAckConstants::SPLIT,
queue_id
)
}

pub async fn try_lock(
&self,
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) -> bool {
let key = Self::build_lock_key(topic, consumer_group, queue_id);
self.try_lock_with_key(CheetahString::from_string(key))
.await
}

pub async fn try_lock_with_key(&self, key: CheetahString) -> bool {
let mut cache = self.expired_local_cache.lock().await;
let lock = cache.entry(key).or_insert(TimedLock::new());
lock.try_lock()
}

pub async fn unlock(
&self,
topic: &CheetahString,
consumer_group: &CheetahString,
queue_id: i32,
) {
let key = Self::build_lock_key(topic, consumer_group, queue_id);
self.unlock_with_key(CheetahString::from_string(key)).await;
}

pub async fn unlock_with_key(&self, key: CheetahString) {
let cache = self.expired_local_cache.lock().await;
if let Some(lock) = cache.get(&key) {
lock.unlock();
}
}

pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
let mut cache = self.expired_local_cache.lock().await;
let mut count = 0;
cache.retain(|_, lock| {
count += 1;
get_current_millis() - lock.get_lock_time() <= used_expire_millis
});
count
}

pub fn start(self: Arc<Self>) {
tokio::spawn(async move {

Check warning on line 221 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L220-L221

Added lines #L220 - L221 were not covered by tests
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
let count = self.clean_unused_locks(60000).await;
info!("QueueLockSize={}", count);

Check warning on line 225 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L223-L225

Added lines #L223 - L225 were not covered by tests
}
});
}

Check warning on line 228 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L227-L228

Added lines #L227 - L228 were not covered by tests
}

#[cfg(test)]
mod tests {
use cheetah_string::CheetahString;
Expand Down Expand Up @@ -241,4 +327,65 @@ mod tests {
let lock_time = lock.get_lock_time();
assert!(lock_time >= initial_time);
}

#[tokio::test]
async fn new_queue_lock_manager_has_empty_cache() {
let manager = QueueLockManager::new();
let cache = manager.expired_local_cache.lock().await;
assert!(cache.is_empty());
}

#[tokio::test]
async fn build_lock_key_formats_correctly() {
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
let key = QueueLockManager::build_lock_key(&topic, &consumer_group, queue_id);
let expected = "test_topic@test_group@1";
assert_eq!(key, expected);
}

#[tokio::test]
async fn try_lock_locks_successfully1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
assert!(manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn try_lock_fails_when_already_locked1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
assert!(!manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn unlock_unlocks_successfully1() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
manager.unlock(&topic, &consumer_group, queue_id).await;
assert!(manager.try_lock(&topic, &consumer_group, queue_id).await);
}

#[tokio::test]
async fn clean_unused_locks_removes_expired_locks() {
let manager = QueueLockManager::new();
let topic = CheetahString::from_static_str("test_topic");
let consumer_group = CheetahString::from_static_str("test_group");
let queue_id = 1;
manager.try_lock(&topic, &consumer_group, queue_id).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let removed_count = manager.clean_unused_locks(5).await;
assert_eq!(removed_count, 1);
let removed_count = manager.clean_unused_locks(15).await;
assert_eq!(removed_count, 0);
}
}

0 comments on commit a747372

Please # to comment.