Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #658]🚀Support pull message consume #659

Merged
merged 3 commits into from
Jun 18, 2024
Merged

[ISSUE #658]🚀Support pull message consume #659

merged 3 commits into from
Jun 18, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jun 18, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #658

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced new struct PullMessageResponseHeader for handling message pulling with fast serialization capabilities.
    • Added support for dynamic message consumption configuration with lite_pull_message_enable option.
  • Improvements

    • Enhanced ConsumeMessageHook with additional trait constraints for better concurrency support.
    • Extended PullMessageProcessor with new fields and methods to improve message processing.
    • Updated method signatures and implementations for better performance and flexibility in message handling.
  • Refactor

    • Refactored SendMessageResponseHeader to use static method calls for improved code clarity.
    • Promoted filter module to be publicly accessible.
  • Documentation

    • Added detailed documentation for new modules and updated existing ones to reflect changes.

Copy link
Contributor

coderabbitai bot commented Jun 18, 2024

Warning

Rate limit exceeded

@mxsm has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 40 minutes and 8 seconds before requesting another review.

How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

Commits

Files that changed from the base of the PR and between 1d22d77 and 13e2d97.

Walkthrough

The recent changes enhance the RocketMQ project by supporting pull message consumption and improving the messaging protocol. New structs and headers for pull message requests and responses were introduced, along with protocol updates. Configurations and handlers in the broker and processor layers were adjusted to accommodate these changes. Overall, these enhancements aim to streamline message consumption and improve broker-messaging functionalities.

Changes

File/Directory Change Summary
rocketmq-broker/src/mqtrace/consume_message_hook.rs Added Sync, Send, and 'static traits to ConsumeMessageHook.
rocketmq-broker/src/processor.rs Added pull_message_result_handler module and request handling for PullMessage and LitePullMessage.
.../processor/pull_message_processor.rs Updated PullMessageProcessor struct, added fields, default implementation, and additional methods.
rocketmq-common/src/common/broker/broker_config.rs Added lite_pull_message_enable field with default true in BrokerConfig.
rocketmq-remoting/src/protocol.rs Modified write_if_not_null in FastCodesHeader to remove &mut self parameter.
rocketmq-remoting/src/protocol/header.rs Added pull_message_request_header and pull_message_response_header modules.
rocketmq-remoting/src/protocol/header/.../send_message_response_header.rs Updated method calls to use static write_if_not_null.
.../header/namesrv/topic_operation_header.rs Included Default in TopicRequestHeader derive attributes.
.../header/pull_message_request_header.rs Introduced PullMessageRequestHeader with implementations for serialization and encoding/decoding.
.../header/pull_message_response_header.rs Introduced PullMessageResponseHeader with similar implementations and new fields/constants.
rocketmq-remoting/src/protocol/remoting_command.rs Added helper functions including set_opaque_mut and decode_command_custom_header_fast.
rocketmq-store/src/lib.rs Promoted filter module to be publicly accessible.
rocketmq/src/lib.rs Added a new macro is_trait_implemented! to check for trait implementation.

Sequence Diagram(s)

Silent Pass

Assessment against linked issues

Objective Addressed Explanation
Support for pull message consume (#658)

Poem

Through fields of code, where bytes do roam,
The RocketMQ sends messages home.
With traits of Sync, and Send, and static,
Our broker's gears turn, never erratic.
Headers pull and process swift,
In RocketMQ, our spirits lift!
🌟🚀🐇


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Jun 18, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Jun 18, 2024

Codecov Report

Attention: Patch coverage is 0% with 279 lines in your changes missing coverage. Please review.

Project coverage is 25.57%. Comparing base (f90187e) to head (2f6a790).
Report is 1 commits behind head on main.

Current head 2f6a790 differs from pull request most recent head 13e2d97

Please upload reports for the commit 13e2d97 to get more accurate results.

Files Patch % Lines
...src/protocol/header/pull_message_request_header.rs 0.00% 166 Missing ⚠️
...rc/protocol/header/pull_message_response_header.rs 0.00% 47 Missing ⚠️
...tmq-broker/src/processor/pull_message_processor.rs 0.00% 43 Missing ⚠️
rocketmq-remoting/src/protocol/remoting_command.rs 0.00% 12 Missing ⚠️
...e_operation_header/send_message_response_header.rs 0.00% 5 Missing ⚠️
rocketmq-broker/src/processor.rs 0.00% 3 Missing ⚠️
rocketmq-common/src/common/broker/broker_config.rs 0.00% 1 Missing ⚠️
rocketmq-remoting/src/protocol.rs 0.00% 1 Missing ⚠️
.../protocol/header/namesrv/topic_operation_header.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #659      +/-   ##
==========================================
- Coverage   25.98%   25.57%   -0.42%     
==========================================
  Files         234      236       +2     
  Lines       17079    17348     +269     
==========================================
- Hits         4438     4436       -2     
- Misses      12641    12912     +271     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between f90187e and 2f6a790.

Files selected for processing (14)
  • rocketmq-broker/src/mqtrace/consume_message_hook.rs (1 hunks)
  • rocketmq-broker/src/processor.rs (2 hunks)
  • rocketmq-broker/src/processor/pull_message_processor.rs (1 hunks)
  • rocketmq-broker/src/processor/pull_message_result_handler.rs (1 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-remoting/src/protocol.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/message_operation_header/send_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/namesrv/topic_operation_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
  • rocketmq-store/src/lib.rs (1 hunks)
  • rocketmq/src/lib.rs (1 hunks)
Files skipped from review due to trivial changes (1)
  • rocketmq-remoting/src/protocol/header.rs
Additional comments not posted (18)
rocketmq-store/src/lib.rs (1)

23-23: The filter module is now publicly accessible, which could be beneficial for extensibility. Ensure that all public interfaces are properly documented and consider restricting access to internal functionalities that should not be exposed.

rocketmq/src/lib.rs (1)

23-42: Introduced a utility macro is_trait_implemented! to check trait implementations at compile-time. This is a robust addition for type-safe checks across the codebase.

rocketmq-broker/src/mqtrace/consume_message_hook.rs (1)

22-22: Enhancing the ConsumeMessageHook trait with Sync, Send, and 'static ensures thread safety, crucial for the concurrent processing of messages in RocketMQ.

rocketmq-broker/src/processor/pull_message_result_handler.rs (1)

25-38: Introduction of PullMessageResultHandler trait aligns with the PR's objectives to support pull message consumption. The method handle is well-designed to process various aspects of message handling, ensuring flexibility and robustness in message processing.

rocketmq-broker/src/processor/pull_message_processor.rs (3)

32-36: The addition of consume_message_hook_vec, pull_message_result_handler, and broker_config fields in PullMessageProcessor struct ensures that the processor can handle hooks, results, and configurations effectively. This change supports the expansion of functionality for pull message processing.


45-53: The process_request method is now asynchronous and properly delegates to process_request_inner. This change enhances the function's ability to handle asynchronous operations and improves code readability by separating concerns.


55-91: The process_request_inner method includes several new checks and configurations, particularly for permissions and feature flags like lite_pull_message_enable. Ensure thorough testing for these conditional paths to verify the correct behavior under different configurations.

rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (2)

28-37: The PullMessageResponseHeader struct is well-defined with optional fields that represent different aspects of message processing. This structure will facilitate detailed response management in pull message operations.


39-47: Constants defined for field names in PullMessageResponseHeader enhance maintainability by avoiding hard-coded strings throughout the code. This is a good practice to prevent errors related to typos in string literals.

rocketmq-remoting/src/protocol/header/namesrv/topic_operation_header.rs (1)

140-140: The addition of the Default derive attribute to TopicRequestHeader is a sensible enhancement, facilitating easier instantiation of this struct with default values.

rocketmq-common/src/common/broker/broker_config.rs (1)

145-145: Ensure proper default value handling for lite_pull_message_enable.

The addition of the lite_pull_message_enable field with a default value of true is noted. This change aligns with the objective of supporting pull message consumption. Ensure that all configurations and conditional logic that depend on this new setting are updated accordingly.

rocketmq-remoting/src/protocol/remoting_command.rs (2)

235-237: Added mutable setter for opaque.

The addition of set_opaque_mut provides a mutable setter for opaque, enhancing flexibility in modifying this field within the same object context. This is a useful addition for scenarios where opaque might need to be updated after the initial creation of the RemotingCommand instance.


368-380: Optimized method for decoding command headers.

The new method decode_command_custom_header_fast leverages trait bounds to ensure that the type T implements FromMap, Default, and FastCodesHeader, which potentially optimizes the decoding process by using the decode_fast method provided by the FastCodesHeader trait. This is particularly useful for performance-critical paths where decoding happens frequently.

rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (3)

29-48: Introduction of PullMessageRequestHeader struct.

The PullMessageRequestHeader struct is well-defined with fields that are essential for the pull message functionality such as consumer_group, topic, queue_id, etc. The use of Option for fields like subscription, expression_type, and max_msg_bytes provides flexibility, allowing these fields to be optionally included based on the context of the request.


67-122: Implementation of CommandCustomHeader for PullMessageRequestHeader.

The implementation of CommandCustomHeader for PullMessageRequestHeader is comprehensive, covering all relevant fields and handling optional fields appropriately. This allows for a flexible and dynamic construction of command headers based on the actual data provided at runtime.


166-222: Fast encoding method for PullMessageRequestHeader.

The encode_fast method efficiently serializes the PullMessageRequestHeader to a BytesMut buffer. This method is crucial for performance optimization in scenarios where rapid serialization is required. The method handles optional fields gracefully, ensuring that only present values are written.

Fast decoding method for PullMessageRequestHeader.

The decode_fast method provides an efficient way to populate the PullMessageRequestHeader fields from a HashMap. This method is essential for scenarios where headers need to be quickly reconstructed from their serialized forms. The method's implementation ensures robust error handling and defaults management, which is crucial for maintaining data integrity during the decoding process.

rocketmq-broker/src/processor.rs (2)

74-74: Introduction of pull_message_result_handler module.

The addition of the pull_message_result_handler module is a critical part of enhancing the broker's capability to handle pull message results effectively. This module is likely to contain logic that deals with the aggregation, filtering, and delivery of messages fetched through the pull mechanism.


146-150: Handling for PullMessage and LitePullMessage requests.

The update to handle PullMessage and LitePullMessage requests in the BrokerRequestProcessor is a significant enhancement aligning with the PR's objective to support pull message consumption. This ensures that pull requests are routed to the appropriate processor, facilitating efficient message fetching based on consumer demand.

@@ -365,7 +365,7 @@
}

pub trait FastCodesHeader {
fn write_if_not_null(&mut self, out: &mut bytes::BytesMut, key: &str, value: &str) {
fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the write_if_not_null method to improve clarity.

-    fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
-        if !value.is_empty() {
-            RocketMQSerializable::write_str(out, true, key);
-            RocketMQSerializable::write_str(out, false, key);
-        }
+    fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
+        if !value.is_empty() {
+            RocketMQSerializable::write_str(out, true, key);
+            RocketMQSerializable::write_str(out, true, value);
+        }

The current implementation seems to have a typo or logic error where the key is written twice instead of writing the key-value pair. This change corrects that mistake by ensuring that both the key and the value are correctly serialized into the output.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
fn write_if_not_null(out: &mut bytes::BytesMut, key: &str, value: &str) {
if !value.is_empty() {
RocketMQSerializable::write_str(out, true, key);
RocketMQSerializable::write_str(out, true, value);
}

Comment on lines +38 to +39
impl Default for PullMessageProcessor {
fn default() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation for PullMessageProcessor uses todo!(), which panics at runtime. This should be replaced with an actual implementation to prevent runtime crashes.

-        todo!()
+        Self {
+            consume_message_hook_vec: Arc::new(Vec::new()),
+            pull_message_result_handler: Arc::new(Default::default()),
+            broker_config: Arc::new(Default::default()),
+        }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl Default for PullMessageProcessor {
fn default() -> Self {
impl Default for PullMessageProcessor {
fn default() -> Self {
Self {
consume_message_hook_vec: Arc::new(Vec::new()),
pull_message_result_handler: Arc::new(Default::default()),
broker_config: Arc::new(Default::default()),
}

Comment on lines +96 to +104
Self::write_if_not_null(out, "msgId", self.msg_id.to_string().as_str());
Self::write_if_not_null(out, "queueId", self.queue_id.to_string().as_str());
Self::write_if_not_null(out, "queueOffset", self.queue_offset.to_string().as_str());
Self::write_if_not_null(
out,
"transactionId",
self.transaction_id.clone().as_deref().unwrap(),
);
self.write_if_not_null(
Self::write_if_not_null(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write_if_not_null method is used to serialize fields only if they are not null. However, calling unwrap() directly on Option types (transactionId, batchUniqId) can cause a panic if they are None. It's safer to handle these cases to avoid runtime errors.

-        self.transaction_id.clone().as_deref().unwrap(),
-        self.batch_uniq_id.clone().as_deref().unwrap(),
+        self.transaction_id.clone().as_deref().unwrap_or(""),
+        self.batch_uniq_id.clone().as_deref().unwrap_or(""),
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Self::write_if_not_null(out, "msgId", self.msg_id.to_string().as_str());
Self::write_if_not_null(out, "queueId", self.queue_id.to_string().as_str());
Self::write_if_not_null(out, "queueOffset", self.queue_offset.to_string().as_str());
Self::write_if_not_null(
out,
"transactionId",
self.transaction_id.clone().as_deref().unwrap(),
);
self.write_if_not_null(
Self::write_if_not_null(
Self::write_if_not_null(out, "msgId", self.msg_id.to_string().as_str());
Self::write_if_not_null(out, "queueId", self.queue_id.to_string().as_str());
Self::write_if_not_null(out, "queueOffset", self.queue_offset.to_string().as_str());
Self::write_if_not_null(
out,
"transactionId",
self.transaction_id.clone().as_deref().unwrap_or(""),
);
Self::write_if_not_null(
out,
"batchUniqId",
self.batch_uniq_id.clone().as_deref().unwrap_or(""),
);
Self::write_if_not_null(

…ader.rs

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@mxsm
Copy link
Owner Author

mxsm commented Jun 18, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 2f6a790 and 1d22d77.

Files selected for processing (1)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs

@mxsm
Copy link
Owner Author

mxsm commented Jun 18, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support pull message consume
2 participants