diff --git a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs index 7abb4e1a..f7b20d08 100644 --- a/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs +++ b/rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use cheetah_string::CheetahString; use rocketmq_common::common::broker::broker_config::BrokerConfig; use rocketmq_common::common::config_manager::ConfigManager; +use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; use rocketmq_common::TimeUtils::get_current_millis; use serde::Deserialize; use serde::Serialize; @@ -44,24 +45,19 @@ pub(crate) struct ConsumerOrderInfoManager { //Fully implemented will be removed #[allow(unused_variables)] impl ConfigManager for ConsumerOrderInfoManager { - fn decode0(&mut self, key: &[u8], body: &[u8]) { - todo!() - } - - fn stop(&mut self) -> bool { - todo!() - } - fn config_file_path(&self) -> String { get_consumer_order_info_path(self.broker_config.store_path_root_dir.as_str()) } - fn encode(&mut self) -> String { - todo!() - } - fn encode_pretty(&self, pretty_format: bool) -> String { - "".to_string() + self.auto_clean(); + let wrapper = self.consumer_order_info_wrapper.lock(); + match pretty_format { + true => SerdeJsonUtils::to_json_pretty(&wrapper.table) + .expect("Failed to serialize consumer order info wrapper"), + false => serde_json::to_string(&wrapper.table) + .expect("Failed to serialize consumer order info wrapper"), + } } fn decode(&self, json_string: &str) { @@ -86,6 +82,8 @@ impl ConfigManager for ConsumerOrderInfoManager { } impl ConsumerOrderInfoManager { + fn auto_clean(&self) {} + pub fn update_next_visible_time( &self, topic: &CheetahString,