-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #1809]🍻Implement MQClientAPIImpl#process_pop_response method logic #1810
Conversation
WalkthroughThis pull request introduces a new Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1810 +/- ##
==========================================
- Coverage 28.42% 28.30% -0.12%
==========================================
Files 472 472
Lines 65341 65613 +272
==========================================
Hits 18571 18571
- Misses 46770 47042 +272 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
1686-1743
: Refactor complex function for improved readabilityThe
build_queue_offset_sorted_map
function contains complex logic and nested conditions that make it difficult to read and maintain. Consider refactoring the function by extracting smaller helper functions and adding comments to explain non-trivial logic.Example refactor:
+fn is_lmq_and_dispatch_needed(message_ext: &MessageExt, topic: &str) -> bool { + mix_all::is_lmq(Some(topic)) && + message_ext.reconsume_times() == 0 && + !message_ext.get_property(&CheetahString::from_static_str( + MessageConst::PROPERTY_INNER_MULTI_DISPATCH, + )).unwrap_or_default().is_empty() +} + fn build_queue_offset_sorted_map( topic: &str, msg_found_list: &[MessageExt], ) -> HashMap<String, Vec<u64>> { let mut sort_map: HashMap<String, Vec<u64>> = HashMap::with_capacity(16); for message_ext in msg_found_list { - let dispatch = message_ext - .get_property(&CheetahString::from_static_str( - MessageConst::PROPERTY_INNER_MULTI_DISPATCH, - )) - .unwrap_or_default(); - if mix_all::is_lmq(Some(topic)) - && message_ext.reconsume_times() == 0 - && !dispatch.is_empty() - { + if is_lmq_and_dispatch_needed(message_ext, topic) { // LMQ processing logic... } else { // Non-LMQ processing logic... } } sort_map }Adding comments and extracting conditions into functions improves clarity.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-client/src/consumer/pop_result.rs
(1 hunks)rocketmq-client/src/implementation/mq_client_api_impl.rs
(5 hunks)rocketmq-remoting/src/protocol/header/pop_message_response_header.rs
(1 hunks)
🔇 Additional comments (2)
rocketmq-client/src/consumer/pop_result.rs (1)
Line range hint 23-37
: Implementation of PopResult
struct looks good
The PopResult
struct is properly defined with the necessary fields and derives Default
and Clone
, which are appropriate for this use case. The Display
implementation provides a clear and formatted string representation of the struct.
rocketmq-remoting/src/protocol/header/pop_message_response_header.rs (1)
28-40
: Type changes from signed to unsigned integers
The fields pop_time
, invisible_time
, rest_num
, and revive_qid
have been changed from signed to unsigned integer types (i64
to u64
and i32
to u32
). Ensure that all usages of these fields handle the unsigned types correctly and that negative values are not expected in these contexts.
Confirm that negative values are not possible for these fields and update any related code accordingly.
.unwrap_or_default(); | ||
if msg_queue_offset as i64 != offset { | ||
warn!( | ||
"Queue offset[{}] of msg is strange, not equal to the stored \ | ||
in msg, {:?}", | ||
msg_queue_offset, message | ||
); | ||
} | ||
let extra_info = ExtraInfoUtil::build_extra_info( | ||
message.queue_offset(), | ||
response_header.pop_time as i64, | ||
response_header.invisible_time as i64, | ||
response_header.revive_qid as i32, | ||
message.get_topic(), | ||
broker_name, | ||
msg_queue_offset as i32, | ||
); | ||
message.put_property( | ||
CheetahString::from_static_str(MessageConst::PROPERTY_POP_CK), | ||
CheetahString::from_string(extra_info), | ||
); | ||
(queue_offset_key, queue_id_key) | ||
} else { | ||
let queue_id_key = ExtraInfoUtil::get_start_offset_info_map_key( | ||
message.get_topic(), | ||
message.queue_id() as i64, | ||
); | ||
let queue_offset_key = ExtraInfoUtil::get_queue_offset_map_key( | ||
message.get_topic(), | ||
message.queue_id() as i64, | ||
message.queue_offset(), | ||
); | ||
let queue_offset = message.queue_offset(); | ||
let index = sort_map | ||
.get(&queue_id_key) | ||
.unwrap() | ||
.iter() | ||
.position(|&q| q == queue_offset as u64) | ||
.unwrap_or_default(); | ||
|
||
let msg_queue_offset = sort_map | ||
.get(&queue_offset_key) | ||
.unwrap() | ||
.get(index) | ||
.cloned() | ||
.unwrap_or_default(); | ||
if msg_queue_offset as i64 != queue_offset { | ||
warn!( | ||
"Queue offset[{}] of msg is strange, not equal to the stored \ | ||
in msg, {:?}", | ||
msg_queue_offset, message | ||
); | ||
} | ||
let extra_info = ExtraInfoUtil::build_extra_info( | ||
message.queue_offset(), | ||
response_header.pop_time as i64, | ||
response_header.invisible_time as i64, | ||
response_header.revive_qid as i32, | ||
message.get_topic(), | ||
broker_name, | ||
msg_queue_offset as i32, | ||
); | ||
message.put_property( | ||
CheetahString::from_static_str(MessageConst::PROPERTY_POP_CK), | ||
CheetahString::from_string(extra_info), | ||
); | ||
(queue_offset_key, queue_id_key) | ||
}; | ||
if is_order && !order_count_info.is_empty() { | ||
let mut count = order_count_info.get(&queue_offset_key); | ||
if count.is_none() { | ||
count = order_count_info.get(&queue_id_key); | ||
} | ||
if let Some(ct) = count { | ||
message.set_reconsume_times(*ct); | ||
} | ||
} | ||
} | ||
} | ||
message.put_property( | ||
CheetahString::from_static_str(MessageConst::PROPERTY_FIRST_POP_TIME), | ||
CheetahString::from(response_header.pop_time.to_string()), | ||
); | ||
message.broker_name = broker_name.clone(); | ||
message.set_topic( | ||
NamespaceUtil::without_namespace_with_namespace( | ||
topic.as_str(), | ||
self.client_config | ||
.namespace | ||
.clone() | ||
.unwrap_or_default() | ||
.as_str(), | ||
) | ||
.into(), | ||
) | ||
} | ||
Ok(pop_result) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential panics due to unchecked unwrap()
calls
In the process_pop_response
function, multiple unwrap()
calls are used on Option
values without prior checks, which can lead to panics if the Option
is None
. Specifically:
- Line 1455~:
response.get_body_mut().unwrap()
- Line 1478~:
response.decode_command_custom_header::<PopMessageResponseHeader>().unwrap()
- Lines 1489~ and 1495~: Unwrapping
Option
fields inresponse_header
without checking.
To enhance the robustness of the code and prevent potential panics, it's recommended to handle Option
values safely using pattern matching or the ?
operator.
Apply this diff to handle potential None
values safely:
- let messages = MessageDecoder::decodes_batch(
- response.get_body_mut().unwrap(),
- self.client_config.decode_read_body,
- self.client_config.decode_decompress_body,
- );
+ let body = response.get_body_mut().ok_or_else(|| {
+ RemotingError("Response body is missing")
+ })?;
+ let messages = MessageDecoder::decodes_batch(
+ body,
+ self.client_config.decode_read_body,
+ self.client_config.decode_decompress_body,
+ );
...
- let response_header = response
- .decode_command_custom_header::<PopMessageResponseHeader>()
- .unwrap();
+ let response_header = response
+ .decode_command_custom_header::<PopMessageResponseHeader>()
+ .map_err(|_| RemotingError("Failed to decode PopMessageResponseHeader"))?;
...
- let start_offset_info = ExtraInfoUtil::parse_start_offset_info(
- response_header.start_offset_info.as_ref().unwrap_or(&CheetahString::from_slice("")),
- ).map_err(RemotingError)?;
+ let start_offset_str = response_header.start_offset_info.as_deref().unwrap_or("");
+ let start_offset_info = ExtraInfoUtil::parse_start_offset_info(start_offset_str)
+ .map_err(RemotingError)?;
Repeat similar safe handling for other instances where unwrap()
is called.
Committable suggestion skipped: line range outside the PR's diff.
Which Issue(s) This PR Fixes(Closes)
Fixes #1809
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
PopResult
struct for enhanced message retrieval results.process_pop_response
method to handle various response scenarios and constructPopResult
objects.Bug Fixes
PopMessageResponseHeader
for improved data handling.Tests
Display
implementation ofPopResult
.