-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #966]🚀Support client consumer message-3🚀 #968
Conversation
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
WalkthroughThis pull request introduces significant enhancements to the RocketMQ client, focusing on the consumer module. Key changes include the implementation of a new concurrent message listener, improved error handling with updated error types, and the introduction of a rebalance strategy module. Additionally, the consumer configuration has been modified for better management, and several methods have been updated to support synchronous operations instead of asynchronous ones. Overall, these changes aim to improve the functionality and maintainability of the consumer implementation. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer
participant MessageQueue
participant Listener
Consumer->>MessageQueue: Subscribe to topic
MessageQueue-->>Consumer: Acknowledge subscription
Consumer->>Listener: Register message listener
Listener->>MessageQueue: Fetch messages
MessageQueue-->>Listener: Deliver messages
Listener->>Consumer: Process messages concurrently
Assessment against linked issues
Suggested labelsapproved, auto merge Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Outside diff range, codebase verification and nitpick comments (3)
rocketmq-client/src/error.rs (1)
23-41
: Update references to renamed error types.The verification script found occurrences of old error names in the codebase, indicating that not all references have been updated. Please update the following instances to reflect the new error type names:
MQClientError::MQClientException
inrocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs
MQClientError::RemotingTooMuchRequestException
inrocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
Ensure all references to the renamed error types are updated to maintain consistency and avoid potential runtime errors.
Analysis chain
Approved renaming and addition of error types.
Renaming error variants for consistency and clarity, and adding
IllegalArgumentError
, enhances the error handling framework. Ensure that all references to these error types in the codebase are updated to reflect the new names.Run the following script to verify the usage of renamed error types:
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that all references to error types are updated. # Test: Search for old error names. Expect: No occurrences. rg --type rust -A 5 $'MQClientException|RemotingTooMuchRequestException|MQBrokerException|RequestTimeoutException|OffsetNotFoundException|RemotingException'Length of output: 2799
rocketmq-client/src/producer/default_mq_producer.rs (1)
485-485
: Incomplete Update: Instances ofMQClientException
still exist.The transition to the new error type
MQClientErr
is incomplete. Please update the following occurrences ofMQClientException
to ensure consistent error handling across the codebase:
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs
Analysis chain
Approved: Error handling updated to use new error type.
The change from
MQClientException
toMQClientErr
in the error handling of thebatch
function aligns with the PR's objective to enhance error clarity. Ensure that all references to the old error type have been updated across the entire codebase.Run the following script to verify the usage of the new error type:
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all references to the old error type have been updated. # Test: Search for the old error type. Expect: No occurrences. rg --type rust "MQClientException"Length of output: 416
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (1)
245-245
: Incomplete Refactoring: Old error types still present in the codebase.The refactoring to replace old error types with new ones is incomplete. The following occurrences of old error types were found and need to be updated:
MQClientException
andRemotingTooMuchRequestException
inrocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs
MQClientException
inrocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs
Please update these instances to use the new error types
MQClientErr
,RequestTimeoutError
, andRemotingTooMuchRequestError
for consistency.Analysis chain
Approved: Error handling updated to use new error types.
The changes in error handling in the
sync_send_with_message_queue_timeout
function align with the PR's objective to enhance error clarity. Ensure that all references to the old error types have been updated across the entire codebase.Run the following script to verify the usage of the new error types:
Also applies to: 256-256, 319-319, 399-399, 415-415, 422-422
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all references to the old error types have been updated. # Test: Search for the old error types. Expect: No occurrences. rg --type rust "MQClientException|RemotingTooMuchRequestException|RequestTimeoutException"Length of output: 624
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (19)
- rocketmq-client/examples/quickstart/consumer.rs (2 hunks)
- rocketmq-client/src/base/validators.rs (11 hunks)
- rocketmq-client/src/consumer.rs (1 hunks)
- rocketmq-client/src/consumer/allocate_message_queue_strategy.rs (1 hunks)
- rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (28 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (4 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (4 hunks)
- rocketmq-client/src/consumer/default_mq_push_consumer.rs (9 hunks)
- rocketmq-client/src/consumer/default_mq_push_consumer_builder.rs (5 hunks)
- rocketmq-client/src/consumer/mq_push_consumer.rs (1 hunks)
- rocketmq-client/src/consumer/rebalance_strategy.rs (1 hunks)
- rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs (1 hunks)
- rocketmq-client/src/error.rs (1 hunks)
- rocketmq-client/src/factory/mq_client_instance.rs (2 hunks)
- rocketmq-client/src/implementation/mq_admin_impl.rs (2 hunks)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (5 hunks)
- rocketmq-client/src/producer/default_mq_producer.rs (2 hunks)
- rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (35 hunks)
- rocketmq-client/src/producer/request_future_holder.rs (2 hunks)
Files skipped from review due to trivial changes (4)
- rocketmq-client/src/consumer.rs
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
- rocketmq-client/src/implementation/mq_client_api_impl.rs
- rocketmq-client/src/producer/request_future_holder.rs
Additional comments not posted (11)
rocketmq-client/src/consumer/allocate_message_queue_strategy.rs (1)
30-30
: Approved change toget_name
return type.The change to return a
&'static str
fromget_name
enhances safety by ensuring the string's lifetime matches the program's. However, verify that all implementations of this trait return a static string.Run the following script to verify the implementations of
get_name
:Verification successful
Verified: All Implementations of
get_name
Return a Static StringThe implementation of the
get_name
method found inrocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs
returns a&'static str
, which is correct. This confirms that the change to the return type enhances safety by ensuring the string's lifetime matches the program's. No other implementations were found, indicating compliance across the codebase.
rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs
: Lines 61-63Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that all implementations of `get_name` return a static string. # Test: Search for implementations of `get_name`. Expect: Only static strings returned. ast-grep --lang rust --pattern $'trait AllocateMessageQueueStrategy { $$$ get_name($_) { $$$ } $$$ }'Length of output: 620
Script:
#!/bin/bash # Description: Find all implementations of the `get_name` method in Rust files and check their return types. # Search for implementations of `get_name` in Rust files. ast-grep --lang rust --pattern 'fn get_name(&self) -> &'static str { $$$ }'Length of output: 93
Script:
#!/bin/bash # Description: Find all implementations of the `get_name` method in Rust files and check their return types. # Search for implementations of `get_name` in Rust files. ast-grep --lang rust --pattern $'fn get_name(&self) -> &\'static str { $$$ }'Length of output: 402
rocketmq-client/src/consumer/rebalance_strategy.rs (1)
27-56
: Approved implementation ofcheck
function.The function
check
is well-implemented, usingIllegalArgumentError
effectively for error handling and efficiently checking membership withHashSet
. Ensure that the error messages provided are clear and informative.rocketmq-client/src/consumer/mq_push_consumer.rs (1)
92-92
: Verify the rationale and impact of changingsubscribe
to synchronous.The method
subscribe
has been changed from asynchronous to synchronous. This could have significant implications for the system's performance and responsiveness. It is crucial to verify that this change aligns with the overall system architecture and does not adversely affect the consumer's ability to process messages efficiently.Please provide the rationale behind this change and assess its impact on the system's performance and scalability.
rocketmq-client/src/base/validators.rs (1)
26-26
: Approve the standardization of error handling.The replacement of
MQClientException
withMQClientErr
across various validation methods helps standardize error handling within the project. This change should maintain or enhance the clarity and utility of error messages provided to the users. It is important to ensure thatMQClientErr
includes all necessary information for effective error resolution and does not omit any critical details that were previously available withMQClientException
.Also applies to: 38-38, 42-42, 49-49, 67-67, 77-77, 85-85, 92-92, 104-104, 120-120, 124-124, 134-134, 149-149, 159-159, 170-170, 182-182
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1)
42-44
: Approve the enhancements inRebalancePushImpl
.The introduction of
ArcRefCellWrapper
andWeakCellWrapper
in theRebalancePushImpl
struct, along with the modifications to various methods, enhances the management of shared mutable state and references. These changes are likely to improve the flexibility and robustness of the rebalancing functionality. It is important to ensure that these changes are thoroughly tested, especially in concurrent environments, to avoid any potential issues with memory management or thread safety.Also applies to: 48-56, 62-67, 70-70, 74-74, 84-85, 93-94, 97-109, 164-166, 170-170
rocketmq-client/src/consumer/default_mq_push_consumer_builder.rs (1)
36-36
: Approved: Addition oftopic_sub_expression
field and its initialization.The new field
topic_sub_expression
inDefaultMQPushConsumerBuilder
is correctly initialized in the constructor. This change supports enhanced subscription management capabilities.Also applies to: 76-76
rocketmq-client/src/consumer/default_mq_push_consumer.rs (2)
59-60
: Approved: Addition oftopic
andsub_expression
fields and their initialization.The new fields
topic
andsub_expression
inConsumerConfig
are correctly initialized in the constructor with default empty strings. This change supports enhanced subscription management capabilities.Also applies to: 377-378
585-605
: Approved with caution: Refactoredsubscribe
method to use synchronous threading model.The refactoring of the
subscribe
method to use a synchronous threading model simplifies the control flow but introduces complexity in thread management. Ensure thorough testing and monitoring to detect any potential issues with race conditions or deadlocks that could arise from this new model.Consider adding comprehensive tests to verify the new threading model:
#[test] fn test_subscribe_thread_safety() { // Implement test to check for race conditions or deadlocks }rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (2)
76-76
: Approved: Addition ofclient_instance
field and its initialization.The new field
client_instance
inDefaultMQPushConsumerImpl
is correctly initialized in the constructor withNone
. This change supports enhanced client instance management capabilities.Also applies to: 81-81, 111-111, 124-124
143-149
: Approved: Addition ofset_default_mqpush_consumer_impl
method.The method
set_default_mqpush_consumer_impl
enhances flexibility in dependency management by allowing a weak reference to be set for the consumer implementation. This approach is beneficial for managing the lifecycle of consumer instances and avoiding memory leaks.rocketmq-client/src/factory/mq_client_instance.rs (1)
241-241
: Updated Error Handling:The error handling in the
start
method has been updated to useMQClientErr
instead ofMQClientException
. This change is consistent with the PR's objectives to improve error handling clarity.Run the following script to verify the integration of the new error type with other components:
Verification successful
Successful Integration of
MQClientErr
:The
MQClientErr
error type has been successfully integrated across the codebase, replacing the previousMQClientException
. This change is consistently applied in various modules, ensuring improved error handling clarity.
- Locations:
rocketmq-client/src/error.rs
: Definition ofMQClientErr
.- Various modules including
producer
,consumer
,base
,implementation
, andfactory
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the integration of `MQClientErr` with other components. # Test: Search for the usage of `MQClientErr`. Expect: Consistent usage across the project. rg --type rust -A 5 $'MQClientErr'Length of output: 64180
pub struct AllocateMessageQueueAveragely; | ||
|
||
impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely { | ||
fn allocate( | ||
&self, | ||
consumer_group: &str, | ||
current_cid: &str, | ||
mq_all: &[MessageQueue], | ||
cid_all: &[String], | ||
) -> Result<Vec<MessageQueue>> { | ||
let mut result = Vec::new(); | ||
if !check(consumer_group, current_cid, mq_all, cid_all)? { | ||
return Ok(result); | ||
} | ||
|
||
let index = cid_all | ||
.iter() | ||
.position(|cid| cid == current_cid) | ||
.unwrap_or(0); | ||
let mod_val = mq_all.len() % cid_all.len(); | ||
let average_size = if mq_all.len() <= cid_all.len() { | ||
1 | ||
} else if mod_val > 0 && index < mod_val { | ||
mq_all.len() / cid_all.len() + 1 | ||
} else { | ||
mq_all.len() / cid_all.len() | ||
}; | ||
let start_index = if mod_val > 0 && index < mod_val { | ||
index * average_size | ||
} else { | ||
index * average_size + mod_val | ||
}; | ||
let range = average_size.min(mq_all.len() - start_index); | ||
for i in 0..range { | ||
result.push(mq_all[start_index + i].clone()); | ||
} | ||
Ok(result) | ||
} | ||
fn get_name(&self) -> &'static str { | ||
"AVG" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comprehensive Review of New Rebalance Strategy Implementation
The new file introduces a struct AllocateMessageQueueAveragely
and implements the AllocateMessageQueueStrategy
trait for it. Here are some observations and suggestions:
-
Correctness and Logic:
- The
allocate
function correctly checks if the consumer is eligible to receive message queues before proceeding with the allocation logic. - The use of
unwrap_or(0)
at line 41 is potentially risky ifcurrent_cid
is not found incid_all
. Consider handling this case more gracefully to avoid unexpected behavior.
- The
-
Performance:
- The calculation of
index
,mod_val
, andaverage_size
is efficient and handles different scenarios of queue sizes and consumer counts.
- The calculation of
-
Error Handling:
- Proper use of
Result
and early returns on error conditions are good practices that are followed here.
- Proper use of
-
Readability and Maintainability:
- The code is generally well-structured and readable. Comments explaining the logic of queue allocation could enhance maintainability, especially for new contributors.
-
Best Practices:
- Implementing
get_name
to return a static string is a good practice for identifying the strategy. However, consider making the strategy name more descriptive than "AVG".
- Implementing
Overall, the implementation is solid, but adding more descriptive comments and handling potential edge cases more robustly would be beneficial.
pub struct MyMessageListener; | ||
|
||
impl MessageListenerConcurrently for MyMessageListener { | ||
fn consume_message( | ||
&self, | ||
msgs: Vec<MessageExt>, | ||
_context: ConsumeConcurrentlyContext, | ||
) -> Result<ConsumeConcurrentlyStatus> { | ||
for msg in msgs { | ||
println!("Receive message: {:?}", msg); | ||
} | ||
Ok(()) | ||
});*/ | ||
consumer.start().await?; | ||
|
||
Ok(()) | ||
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review of New Message Listener Implementation
The implementation of MyMessageListener
in the example consumer file introduces concurrent message handling capabilities:
-
Correctness and Logic:
- The
consume_message
method processes each message and logs its content. This is a simple and effective demonstration of message processing. - The method returns a
ConsumeConcurrentlyStatus::ConsumeSuccess
which is appropriate for this example.
- The
-
Best Practices:
- Using
println!
for logging in examples is acceptable, but for more robust applications, integrating a logging framework would be advisable.
- Using
-
Error Handling:
- The method handles the basic case well. However, adding error handling for potential issues during message processing could make the example more comprehensive.
Overall, the example is well-implemented for demonstration purposes but could benefit from more detailed error handling and the use of a logging framework for more serious applications.
@@ -83,7 +83,7 @@ impl MQAdminImpl { | |||
)); | |||
} | |||
} | |||
Err(MQClientException( | |||
Err(MQClientErr( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review of Error Handling Update in MQAdminImpl
The update to use MQClientErr
for error handling in the fetch_publish_message_queues
method is a significant change:
-
Correctness and Logic:
- The new error type
MQClientErr
is used consistently to handle errors related to message queue retrieval. This standardizes error handling and improves the clarity of error messages.
- The new error type
-
Best Practices:
- The use of a formatted error message that includes the topic name helps in diagnosing issues more effectively.
-
Potential Improvements:
- Consider adding more specific error codes or categories to
MQClientErr
to further enhance error diagnostics and handling.
- Consider adding more specific error codes or categories to
Overall, the change is a positive step towards better error management in the RocketMQ client. Further refinement of the error categorization could enhance its effectiveness.
pub fn subscribe( | ||
mut self, | ||
topic: impl Into<String>, | ||
sub_expression: impl Into<String>, | ||
) -> Self { | ||
self.topic_sub_expression.0 = Some(topic.into()); | ||
self.topic_sub_expression.1 = Some(sub_expression.into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved: Refactored subscribe
method with enhanced clarity.
The refactoring of the subscribe
method to directly assign topic
and sub_expression
to topic_sub_expression
simplifies the subscription setup process. However, consider adding error handling or validation to ensure that neither topic
nor sub_expression
are empty strings before assignment, which could prevent runtime issues.
Consider adding validation for topic
and sub_expression
:
if topic.into().is_empty() || sub_expression.into().is_empty() {
return Err("Topic and subscription expression cannot be empty");
}
pub async fn subscribe(&mut self, topic: &str, sub_expression: &str) -> Result<()> { | ||
let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression); | ||
if let Err(e) = subscription_data { | ||
return Err(MQClientError::MQClientErr( | ||
-1, | ||
format!("buildSubscriptionData exception, {}", e), | ||
)); | ||
} | ||
let subscription_data = subscription_data.unwrap(); | ||
self.rebalance_impl | ||
.put_subscription_data(topic, subscription_data) | ||
.await; | ||
if let Some(ref mut client_instance) = self.client_instance { | ||
client_instance | ||
.send_heartbeat_to_all_broker_with_lock() | ||
.await; | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved: Addition of subscribe
method with encapsulated subscription logic.
The new subscribe
method in DefaultMQPushConsumerImpl
simplifies the subscription process by encapsulating the logic for building subscription data and sending heartbeats to brokers. Consider adding error handling for the subscription data building process to ensure robustness in case of invalid parameters.
Consider adding error handling for building subscription data:
if let Err(e) = subscription_data {
return Err(MQClientError::MQClientErr(
-1,
format!("Error building subscription data: {}", e),
));
}
Which Issue(s) This PR Fixes(Closes)
Fixes #966
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Refactor
Style