-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #729]⚡️Optimize pull message #730
Conversation
WalkthroughThe changes optimize RocketMQ's message-pulling process. A new boolean flag Changes
Sequence DiagramsequenceDiagram
participant Client
participant BrokerRuntime
participant DefaultMessageStore
participant ReputMessageService
Note over Client: Enhanced Pull Message Flow
Client->>BrokerRuntime: Request messages
BrokerRuntime->>DefaultMessageStore: Fetch messages (with notify_message_arrive_in_batch)
DefaultMessageStore-->>ReputMessageService: Notify message arrival
ReputMessageService-->>DefaultMessageStore: Batch process based on notify_message_arrive_in_batch
DefaultMessageStore->>BrokerRuntime: Return messages
BrokerRuntime->>Client: Deliver messages
Assessment against linked issues
Poem
Tip AI model upgrade
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (3)
- rocketmq-broker/src/broker_runtime.rs (1 hunks)
- rocketmq-common/src/common/mix_all.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (10 hunks)
Additional comments not posted (14)
rocketmq-common/src/common/mix_all.rs (4)
104-109
: Functionis_lmq
looks good.The function correctly checks if the provided metadata starts with the LMQ prefix. The use of
Option
is appropriate for handling possibleNone
values.
178-182
: Testreturns_true_for_lmq_prefixed_metadata
looks good.The test correctly verifies that the function returns
true
for metadata starting with the LMQ prefix.
184-188
: Testreturns_false_for_non_lmq_prefixed_metadata
looks good.The test correctly verifies that the function returns
false
for metadata not starting with the LMQ prefix.
190-193
: Testreturns_false_for_none_metadata
looks good.The test correctly verifies that the function returns
false
forNone
metadata.rocketmq-broker/src/broker_runtime.rs (1)
298-298
: Ensure the new boolean argument is correctly utilized.The new boolean argument
false
has been added to theDefaultMessageStore::new
call. Ensure that this argument is correctly utilized within theDefaultMessageStore
implementation.rocketmq-store/src/message_store/default_message_store.rs (9)
35-38
: Imports are appropriate.The new imports are relevant and necessary for the added functionality.
116-116
: New fieldnotify_message_arrive_in_batch
added toDefaultMessageStore
.This field will likely be used to control batch notifications for message arrival.
144-144
: Clone implementation updated to include new field.The
Clone
implementation correctly includes the newnotify_message_arrive_in_batch
field.
155-155
: Constructor updated to initialize new field.The
notify_message_arrive_in_batch
field is correctly initialized in the constructor.
585-586
:start
method updated to pass new field.The
notify_message_arrive_in_batch
field is correctly passed to theReputMessageService
.
1037-1038
: New fieldnotify_message_arrive_in_batch
added toReputMessageService
.This field will likely be used to control batch notifications for message arrival.
1037-1038
:start
method updated to pass new field.The
notify_message_arrive_in_batch
field is correctly passed to theReputMessageServiceInner
.Also applies to: 1045-1046
1106-1107
: New fieldnotify_message_arrive_in_batch
added toReputMessageServiceInner
.This field will likely be used to control batch notifications for message arrival.
1218-1221
:do_reput
method updated to use new field.The
notify_message_arrive_in_batch
field is correctly used to control message arrival notifications.
fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | ||
let prop = dispatch_request.properties_map.as_ref(); | ||
if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | ||
return; | ||
} | ||
let prop = prop.unwrap(); | ||
let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | ||
let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | ||
if multi_dispatch_queue.is_none() | ||
|| multi_queue_offset.is_none() | ||
|| multi_dispatch_queue.as_ref().unwrap().is_empty() | ||
|| multi_queue_offset.as_ref().unwrap().is_empty() | ||
{ | ||
return; | ||
} | ||
let queues: Vec<&str> = multi_dispatch_queue | ||
.unwrap() | ||
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | ||
.collect(); | ||
let queue_offsets: Vec<&str> = multi_queue_offset | ||
.unwrap() | ||
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | ||
.collect(); | ||
if queues.len() != queue_offsets.len() { | ||
return; | ||
} | ||
for i in 0..queues.len() { | ||
let queue_name = queues[i]; | ||
let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | ||
let mut queue_id = dispatch_request.queue_id; | ||
if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | ||
queue_id = 0; | ||
} | ||
self.message_store | ||
.message_arriving_listener | ||
.as_ref() | ||
.unwrap() | ||
.arriving( | ||
queue_name, | ||
queue_id, | ||
queue_offset + 1, | ||
Some(dispatch_request.tags_code), | ||
dispatch_request.store_timestamp, | ||
dispatch_request.bit_map.clone(), | ||
dispatch_request.properties_map.as_ref(), | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle unwrap calls safely in notify_message_arrive4multi_queue
.
The unwrap calls can potentially panic if the values are not present. Consider using safe handling methods like if let
or match
.
- let prop = prop.unwrap();
- let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
- let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+ if let Some(prop) = prop {
+ let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
+ let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | |
let prop = dispatch_request.properties_map.as_ref(); | |
if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | |
return; | |
} | |
let prop = prop.unwrap(); | |
let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | |
let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | |
if multi_dispatch_queue.is_none() | |
|| multi_queue_offset.is_none() | |
|| multi_dispatch_queue.as_ref().unwrap().is_empty() | |
|| multi_queue_offset.as_ref().unwrap().is_empty() | |
{ | |
return; | |
} | |
let queues: Vec<&str> = multi_dispatch_queue | |
.unwrap() | |
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
.collect(); | |
let queue_offsets: Vec<&str> = multi_queue_offset | |
.unwrap() | |
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
.collect(); | |
if queues.len() != queue_offsets.len() { | |
return; | |
} | |
for i in 0..queues.len() { | |
let queue_name = queues[i]; | |
let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | |
let mut queue_id = dispatch_request.queue_id; | |
if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | |
queue_id = 0; | |
} | |
self.message_store | |
.message_arriving_listener | |
.as_ref() | |
.unwrap() | |
.arriving( | |
queue_name, | |
queue_id, | |
queue_offset + 1, | |
Some(dispatch_request.tags_code), | |
dispatch_request.store_timestamp, | |
dispatch_request.bit_map.clone(), | |
dispatch_request.properties_map.as_ref(), | |
); | |
} | |
} | |
fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | |
let prop = dispatch_request.properties_map.as_ref(); | |
if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | |
return; | |
} | |
if let Some(prop) = prop { | |
let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | |
let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | |
if multi_dispatch_queue.is_none() | |
|| multi_queue_offset.is_none() | |
|| multi_dispatch_queue.as_ref().unwrap().is_empty() | |
|| multi_queue_offset.as_ref().unwrap().is_empty() | |
{ | |
return; | |
} | |
let queues: Vec<&str> = multi_dispatch_queue | |
.unwrap() | |
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
.collect(); | |
let queue_offsets: Vec<&str> = multi_queue_offset | |
.unwrap() | |
.split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
.collect(); | |
if queues.len() != queue_offsets.len() { | |
return; | |
} | |
for i in 0..queues.len() { | |
let queue_name = queues[i]; | |
let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | |
let mut queue_id = dispatch_request.queue_id; | |
if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | |
queue_id = 0; | |
} | |
self.message_store | |
.message_arriving_listener | |
.as_ref() | |
.unwrap() | |
.arriving( | |
queue_name, | |
queue_id, | |
queue_offset + 1, | |
Some(dispatch_request.tags_code), | |
dispatch_request.store_timestamp, | |
dispatch_request.bit_map.clone(), | |
dispatch_request.properties_map.as_ref(), | |
); | |
} | |
} | |
} |
Which Issue(s) This PR Fixes(Closes)
Fixes #729
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
notify_message_arrive_in_batch
to enhance message arrival notifications in batch mode.Improvements
DefaultMessageStore
to accommodate batch message notifications.ReputMessageService
to support the new message notification feature.Bug Fixes