Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #1762]🚀Optimize ConsumerOrderInfoManager encode and decode🔥 #1764

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use cheetah_string::CheetahString;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::config_manager::ConfigManager;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::TimeUtils::get_current_millis;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -44,24 +45,19 @@ pub(crate) struct ConsumerOrderInfoManager {
//Fully implemented will be removed
#[allow(unused_variables)]
impl ConfigManager for ConsumerOrderInfoManager {
fn decode0(&mut self, key: &[u8], body: &[u8]) {
todo!()
}

fn stop(&mut self) -> bool {
todo!()
}

fn config_file_path(&self) -> String {
get_consumer_order_info_path(self.broker_config.store_path_root_dir.as_str())
}

fn encode(&mut self) -> String {
todo!()
}

fn encode_pretty(&self, pretty_format: bool) -> String {
"".to_string()
self.auto_clean();
let wrapper = self.consumer_order_info_wrapper.lock();
match pretty_format {
true => SerdeJsonUtils::to_json_pretty(&wrapper.table)
.expect("Failed to serialize consumer order info wrapper"),
false => serde_json::to_string(&wrapper.table)
.expect("Failed to serialize consumer order info wrapper"),
}
Comment on lines +53 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using Result instead of expect for error handling

The current implementation uses expect which could cause panics in production. Consider returning a Result type instead.

-    fn encode_pretty(&self, pretty_format: bool) -> String {
+    fn encode_pretty(&self, pretty_format: bool) -> Result<String, serde_json::Error> {
         self.auto_clean();
         let wrapper = self.consumer_order_info_wrapper.lock();
         match pretty_format {
-            true => SerdeJsonUtils::to_json_pretty(&wrapper.table)
-                .expect("Failed to serialize consumer order info wrapper"),
-            false => serde_json::to_string(&wrapper.table)
-                .expect("Failed to serialize consumer order info wrapper"),
+            true => SerdeJsonUtils::to_json_pretty(&wrapper.table),
+            false => serde_json::to_string(&wrapper.table),
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.auto_clean();
let wrapper = self.consumer_order_info_wrapper.lock();
match pretty_format {
true => SerdeJsonUtils::to_json_pretty(&wrapper.table)
.expect("Failed to serialize consumer order info wrapper"),
false => serde_json::to_string(&wrapper.table)
.expect("Failed to serialize consumer order info wrapper"),
}
self.auto_clean();
let wrapper = self.consumer_order_info_wrapper.lock();
match pretty_format {
true => SerdeJsonUtils::to_json_pretty(&wrapper.table),
false => serde_json::to_string(&wrapper.table),
}

}

fn decode(&self, json_string: &str) {
Expand All @@ -86,6 +82,8 @@ impl ConfigManager for ConsumerOrderInfoManager {
}

impl ConsumerOrderInfoManager {
fn auto_clean(&self) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Now I have enough context to provide the final response:

Implement auto_clean functionality to prevent memory leaks from stale order information

The auto_clean method should remove entries from ConsumerOrderInfoWrapper.table where OrderInfo.pop_time is older than CLEAN_SPAN_FROM_LAST (24 hours). The data structure shows:

  • A HashMap of topic@group to order information
  • Each OrderInfo contains pop_time and invisible_time
  • CLEAN_SPAN_FROM_LAST is defined as 24 hours in milliseconds

Example implementation should:

  1. Lock the wrapper
  2. Filter out entries where current_time - pop_time > CLEAN_SPAN_FROM_LAST
  3. Update the HashMap with the filtered entries
🔗 Analysis chain

Implement auto_clean functionality to prevent potential memory leaks

The auto_clean method is currently empty but is called during serialization. Based on the CLEAN_SPAN_FROM_LAST constant (24 hours), it appears this method should clean up old consumer order information.

Consider implementing the following:

  1. Remove entries older than CLEAN_SPAN_FROM_LAST
  2. Clean up any orphaned or completed orders

Let's verify the usage of auto_clean in other parts of the codebase:


Let me gather more context about the consumer order info manager to understand the data structure and cleaning requirements.


Let me check the ConsumerOrderInfoWrapper structure and any related data that needs cleaning:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any other implementations or calls to auto_clean
rg "auto_clean|CLEAN_SPAN_FROM_LAST" --type rust

Length of output: 355


Script:

#!/bin/bash
# Get the full implementation of ConsumerOrderInfoManager
ast-grep --pattern 'struct ConsumerOrderInfoManager {
  $$$
}'

# Also search for any tests or related code
rg "ConsumerOrderInfoManager" -A 5 --type rust

Length of output: 5326


Script:

#!/bin/bash
# Get ConsumerOrderInfoWrapper implementation
ast-grep --pattern 'struct ConsumerOrderInfoWrapper {
  $$$
}'

# Search for any timestamp or time-related fields that might need cleaning
rg "ConsumerOrderInfoWrapper" -A 10 --type rust

Length of output: 4351


pub fn update_next_visible_time(
&self,
topic: &CheetahString,
Expand Down
Loading