From 0b9ba5a0aaf1fc41eb6cb799082e3523d488ccc7 Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 3 Jan 2025 22:47:42 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2066]=F0=9F=92=ABAdd=20ConsumeReviveO?= =?UTF-8?q?bj=20struct=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processor_service/pop_revive_service.rs | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs b/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs index 9410c3f2..61d27583 100644 --- a/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs +++ b/rocketmq-broker/src/processor/processor_service/pop_revive_service.rs @@ -14,4 +14,98 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashMap; + +use cheetah_string::CheetahString; +use rocketmq_store::pop::pop_check_point::PopCheckPoint; + pub struct PopReviveService; + +struct ConsumeReviveObj { + map: HashMap, + sort_list: Option>, + old_offset: i64, + end_time: i64, + new_offset: i64, +} + +impl ConsumeReviveObj { + fn new() -> Self { + Self { + map: HashMap::new(), + sort_list: None, + old_offset: 0, + end_time: 0, + new_offset: 0, + } + } + + fn gen_sort_list(&mut self) -> &Vec { + if self.sort_list.is_none() { + let mut list: Vec = self.map.values().cloned().collect(); + list.sort_by_key(|ck| ck.revive_offset); + self.sort_list = Some(list); + } + self.sort_list.as_ref().unwrap() + } +} + +#[cfg(test)] +mod tests { + + use cheetah_string::CheetahString; + use rocketmq_store::pop::pop_check_point::PopCheckPoint; + + use super::*; + + #[test] + fn new_initializes_correctly() { + let obj = ConsumeReviveObj::new(); + assert!(obj.map.is_empty()); + assert!(obj.sort_list.is_none()); + assert_eq!(obj.old_offset, 0); + assert_eq!(obj.end_time, 0); + assert_eq!(obj.new_offset, 0); + } + + #[test] + fn gen_sort_list_creates_sorted_list() { + let mut obj = ConsumeReviveObj::new(); + let ck1 = PopCheckPoint { + revive_offset: 10, + ..Default::default() + }; + let ck2 = PopCheckPoint { + revive_offset: 5, + ..Default::default() + }; + obj.map.insert(CheetahString::from("key1"), ck1.clone()); + obj.map.insert(CheetahString::from("key2"), ck2.clone()); + + let sorted_list = obj.gen_sort_list(); + assert_eq!(sorted_list.len(), 2); + assert_eq!(sorted_list[0].revive_offset, 5); + assert_eq!(sorted_list[1].revive_offset, 10); + } + + #[test] + fn gen_sort_list_returns_existing_list_if_already_sorted() { + let mut obj = ConsumeReviveObj::new(); + let ck1 = PopCheckPoint { + revive_offset: 10, + ..Default::default() + }; + let ck2 = PopCheckPoint { + revive_offset: 5, + ..Default::default() + }; + obj.map.insert(CheetahString::from("key1"), ck1.clone()); + obj.map.insert(CheetahString::from("key2"), ck2.clone()); + + let _ = obj.gen_sort_list(); + let sorted_list = obj.gen_sort_list(); + assert_eq!(sorted_list.len(), 2); + assert_eq!(sorted_list[0].revive_offset, 5); + assert_eq!(sorted_list[1].revive_offset, 10); + } +}