From cc274b05fc9e2541236628d16f8afed56fb1607c Mon Sep 17 00:00:00 2001 From: Sainath Singineedi <44405294+sainad2222@users.noreply.github.com> Date: Sat, 7 Dec 2024 21:49:16 +0530 Subject: [PATCH] Allocate message queue by config (#1651) --- .../src/consumer/rebalance_strategy.rs | 1 + .../allocate_message_queue_by_config.rs | 103 ++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_by_config.rs diff --git a/rocketmq-client/src/consumer/rebalance_strategy.rs b/rocketmq-client/src/consumer/rebalance_strategy.rs index e2333dba..28412dcf 100644 --- a/rocketmq-client/src/consumer/rebalance_strategy.rs +++ b/rocketmq-client/src/consumer/rebalance_strategy.rs @@ -16,6 +16,7 @@ */ pub mod allocate_message_queue_averagely; pub mod allocate_message_queue_averagely_by_circle; +pub mod allocate_message_queue_by_config; pub mod allocate_message_queue_by_machine_room; pub mod allocate_message_queue_by_machine_room_nearby; diff --git a/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_by_config.rs b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_by_config.rs new file mode 100644 index 00000000..2f4b91a7 --- /dev/null +++ b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_by_config.rs @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use cheetah_string::CheetahString; +use rocketmq_common::common::message::message_queue::MessageQueue; + +use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; + +pub struct AllocateMessageQueueByConfig { + message_queue_list: Vec, +} + +impl AllocateMessageQueueByConfig { + #[inline] + pub fn new(message_queue_list: Vec) -> Self { + Self { message_queue_list } + } +} + +impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig { + fn allocate( + &self, + consumer_group: &CheetahString, + current_cid: &CheetahString, + mq_all: &[MessageQueue], + cid_all: &[CheetahString], + ) -> crate::Result> { + Ok(self.message_queue_list.clone()) + } + + #[inline] + fn get_name(&self) -> &'static str { + "CONFIG" + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use cheetah_string::CheetahString; + use rocketmq_common::common::message::message_queue::MessageQueue; + + use super::*; + + #[test] + fn test_allocate_message_queue_by_config() { + let consumer_group = CheetahString::from("test_group"); + let current_cid = CheetahString::from("CID_PREFIX1"); + let mq_all = create_message_queue_list(4); + let cid_all = create_consumer_id_list(2); + let strategy = AllocateMessageQueueByConfig::new(mq_all.clone()); + + let mut consumer_allocate_queue = HashMap::new(); + for consumer_id in &cid_all { + let queues = strategy + .allocate(&consumer_group, &consumer_id, &mq_all, &cid_all) + .unwrap(); + let queue_ids: Vec = queues.into_iter().map(|mq| mq.get_queue_id()).collect(); + consumer_allocate_queue.insert(consumer_id.clone(), queue_ids); + } + + assert_eq!( + consumer_allocate_queue + .get("CID_PREFIX0") + .unwrap() + .as_slice(), + &[0, 1, 2, 3] + ); + assert_eq!( + consumer_allocate_queue + .get("CID_PREFIX1") + .unwrap() + .as_slice(), + &[0, 1, 2, 3] + ); + } + + fn create_consumer_id_list(size: usize) -> Vec { + (0..size) + .map(|i| format!("CID_PREFIX{}", i).into()) + .collect() + } + + fn create_message_queue_list(size: usize) -> Vec { + (0..size) + .map(|i| MessageQueue::from_parts("topic", "broker", i as i32)) + .collect() + } +}