Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #671]🚀Support pull message consume-4 #675

Merged
merged 3 commits into from
Jun 22, 2024
Merged

[ISSUE #671]🚀Support pull message consume-4 #675

merged 3 commits into from
Jun 22, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Jun 22, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #671

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced the StoreStatsService struct for enhanced store statistics tracking.
    • Added new methods for DefaultPullMessageResultHandler for handling pull message results and message hooks.
    • Included new fields in BrokerConfig for slave_read_enable and commercial_base_count.
  • Improvements

    • More flexible handling of message context fields by changing from String to Option<String>.
    • Enhanced SelectMappedBufferResult with a new get_buffer method.
    • Improved memory management and message retrieval logic in DefaultMessageStore.
  • Bug Fixes

    • Adjusted PullMessageResponseHeader field types for consistency.
    • Corrective changes in truncate_dirty_logic_files for better resource locking.
  • Refactor

    • Reduced module visibility to crate level for better encapsulation and security.
    • Updated field types in message_store_config for better performance and consistency.
  • Performance

    • Added new functions in queue modules to improve iteration efficiency.

@mxsm
Copy link
Owner Author

mxsm commented Jun 22, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

coderabbitai bot commented Jun 22, 2024

Warning

Review failed

The pull request is closed.

Walkthrough

Recent changes to the rocketmq-broker and rocketmq-store modules include converting modules to crate visibility, updating data structures to use Option<String>, adding new modules and methods, and reworking the PullMessageProcessor logic. Enhancements in rocketmq-common and rocketmq-remoting introduce new configuration fields, response codes, and message handling functionalities, along with dependency updates in rocketmq-store.

Changes

Files/Modules Change Summary
rocketmq-broker/src/lib.rs Changed module visibility to pub(crate), added new module hook_utils.
rocketmq-broker/src/mqtrace/consume_message_context.rs Changed certain String fields to Option<String> in ConsumeMessageContext.
rocketmq-broker/src/processor.rs Added module default_pull_message_result_handler.
rocketmq-broker/src/processor/default_pull_message_result_handler.rs Added a new struct DefaultPullMessageResultHandler and implemented a trait PullMessageResultHandler.
rocketmq-broker/src/processor/pull_message_processor.rs Removed fields and imports related to ConsumeMessageHook, added MAX_PULL_MSG_SIZE, adjusted method signature for handle_pull_message_request.
rocketmq-common/src/common/broker/broker_config.rs Updated BrokerConfig struct with new fields slave_read_enable and commercial_base_count.
rocketmq-common/src/common/mix_all.rs Added new function is_sys_consumer_group_for_no_cold_read_limit, and test cases.
rocketmq-remoting/src/code/response_code.rs Added several new variants to the ResponseCode enum.
rocketmq-remoting/src/protocol/header/pull_message_response_header.rs Changed type of suggest_which_broker_id field from Option<i64> to Option<u64>.
rocketmq-remoting/src/protocol/remoting_command.rs Added new methods set_command_custom_header_ref, set_code_ref, set_remark_ref, and get_ext_fields.
rocketmq-store/Cargo.toml Added dependencies sysinfo and once_cell.
rocketmq-store/src/base.rs Added new module store_stats_service.
rocketmq-store/src/base/get_message_result.rs Commented out message_buffer_list. Adjusted method signatures and added add_message method in GetMessageResult.
rocketmq-store/src/base/select_result.rs Removed byte_buffer field in SelectMappedBufferResult, added method get_buffer().
rocketmq-store/src/base/store_stats_service.rs Introduced StoreStatsService struct and implementation block.
rocketmq-store/src/config/message_store_config.rs Changed various fields from usize to u64 and i32, added max_filter_message_size field.
rocketmq-store/src/lib.rs Added sync_unsafe_cell feature and included new module utils.
rocketmq-store/src/log_file.rs Added constant MAX_PULL_MSG_SIZE, adjusted getMessage method parameter message_filter.
rocketmq-store/src/log_file/commit_log.rs Added public function get_message(offset, size), minor formatting updates.
rocketmq-store/src/log_file/mapped_file/default_impl.rs Changed mmapped_file field to use SyncUnsafeCell, added get_mapped_file and get_mapped_file_mut methods in DefaultMappedFile.
rocketmq-store/src/message_store/default_message_store.rs Added new methods and constants, changed DefaultMessageStore struct, modified impl MessageStore logic.
rocketmq-store/src/queue.rs Added ConsumeQueueExt import, added methods get_valid_tags_code_as_long, is_tags_code_valid, iterate_from, and iterate_from_inner for ConsumeQueueTrait
rocketmq-store/src/queue/batch_consume_queue.rs BatchConsumeQueue now implements iterate_from and iterate_from_inner methods.
rocketmq-store/src/queue/local_file_consume_queue_store.rs Modified truncate_dirty_logic_files function, adding as_ref() before invoking lock().
rocketmq-store/src/queue/single_consume_queue.rs Added iterate_from and iterate_from_inner methods to ConsumeQueue implementation.

Assessment against linked issues

Objective Addressed Explanation
Support pull message consume functionality (#671)

In code, we play and weave,
Changing modules, watching them breathe.
From Option<String> to buffer tweaks,
New hooks and handlers in message peaks.
Dependencies grow, the system refines,
With every commit, it simply shines. ⏳✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Jun 22, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Outside diff range and nitpick comments (1)
rocketmq-store/src/log_file/mapped_file/default_impl.rs (1)

602-603: Document the use of unsafe in get_mapped_file.

While the method correctly uses unsafe to return an immutable reference, it's important to document why unsafe is necessary and ensure it is used safely.

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 4e6d7d0 and b7e093b.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (28)
  • rocketmq-broker/src/lib.rs (1 hunks)
  • rocketmq-broker/src/mqtrace/consume_message_context.rs (1 hunks)
  • rocketmq-broker/src/processor.rs (1 hunks)
  • rocketmq-broker/src/processor/default_pull_message_result_handler.rs (1 hunks)
  • rocketmq-broker/src/processor/pull_message_processor.rs (6 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-common/src/common/mix_all.rs (1 hunks)
  • rocketmq-remoting/src/code/response_code.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
  • rocketmq-store/Cargo.toml (1 hunks)
  • rocketmq-store/src/base.rs (1 hunks)
  • rocketmq-store/src/base/get_message_result.rs (8 hunks)
  • rocketmq-store/src/base/select_result.rs (1 hunks)
  • rocketmq-store/src/base/store_stats_service.rs (1 hunks)
  • rocketmq-store/src/config/message_store_config.rs (5 hunks)
  • rocketmq-store/src/lib.rs (2 hunks)
  • rocketmq-store/src/log_file.rs (2 hunks)
  • rocketmq-store/src/log_file/commit_log.rs (3 hunks)
  • rocketmq-store/src/log_file/mapped_file/default_impl.rs (8 hunks)
  • rocketmq-store/src/message_store/default_message_store.rs (6 hunks)
  • rocketmq-store/src/queue.rs (3 hunks)
  • rocketmq-store/src/queue/batch_consume_queue.rs (1 hunks)
  • rocketmq-store/src/queue/local_file_consume_queue_store.rs (1 hunks)
  • rocketmq-store/src/queue/single_consume_queue.rs (1 hunks)
  • rocketmq-store/src/stats/broker_stats_manager.rs (1 hunks)
  • rocketmq-store/src/utils.rs (1 hunks)
  • rocketmq-store/src/utils/store_util.rs (1 hunks)
Files skipped from review due to trivial changes (4)
  • rocketmq-store/Cargo.toml
  • rocketmq-store/src/base.rs
  • rocketmq-store/src/base/get_message_result.rs
  • rocketmq-store/src/utils.rs
Additional comments not posted (28)
rocketmq-store/src/base/store_stats_service.rs (1)

17-19: Placeholder Struct StoreStatsService

The StoreStatsService struct is currently a placeholder without any fields or methods. If this is intended for future expansion, consider adding a comment or documentation to clarify its intended use.

rocketmq-store/src/lib.rs (2)

37-37: New Module utils

The introduction of the utils module should be accompanied by documentation or comments explaining its purpose and the types of utilities it contains. This helps maintain clarity and context within the codebase.


19-19: Introduction of sync_unsafe_cell Feature

The #![feature(sync_unsafe_cell)] has been introduced. Ensure that this unstable feature is necessary for the current implementation, as it ties the codebase to a specific Rust version which might affect portability and future upgrades.

rocketmq-broker/src/lib.rs (1)

25-42: Module Visibility Changes in rocketmq-broker

Changing the visibility of multiple modules from pub to pub(crate) restricts their accessibility to within the crate. Ensure that this change aligns with the architectural goals and does not inadvertently restrict necessary access for other parts of the application or for library users.

rocketmq-broker/src/mqtrace/consume_message_context.rs (1)

37-45: Good change to optional fields.

Changing account_auth_type, account_owner_parent, account_owner_self, and commercial_owner to Option<String> improves the handling of absent values, enhancing robustness and flexibility.

rocketmq-store/src/log_file.rs (2)

35-35: Good addition of MAX_PULL_MSG_SIZE constant.

Defining MAX_PULL_MSG_SIZE as a constant improves code readability and maintainability by avoiding magic numbers in the codebase.


100-100: Good update to get_message method signature.

Allowing the message_filter parameter to be optional increases the method's flexibility, enabling it to be used in contexts where no filtering is required.

rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (1)

31-31: Appropriate update to suggest_which_broker_id.

Changing suggest_which_broker_id from i64 to u64 and making it optional is suitable as broker IDs are typically non-negative and might not be present in all contexts.

rocketmq-common/src/common/mix_all.rs (2)

81-98: Ensure comprehensive testing for new function is_sys_consumer_group_for_no_cold_read_limit.

The function is_sys_consumer_group_for_no_cold_read_limit has been added to identify specific consumer groups. It is crucial to ensure that all edge cases are covered by the tests, particularly for consumer groups that might closely resemble the specified ones but are not exactly the same.


110-151: Review test coverage for new function.

The tests for is_sys_consumer_group_for_no_cold_read_limit appear to cover a range of expected consumer groups. Ensure that these tests are comprehensive and consider adding scenarios for partial matches and case sensitivity if not already covered.

rocketmq-store/src/queue.rs (2)

93-103: Approved: Added validation and retrieval methods for tags code in CqUnit

The new methods get_valid_tags_code_as_long and is_tags_code_valid provide a clean, encapsulated way to handle tag code validation and retrieval, which enhances the maintainability and readability of the code.


377-383: Approved: Added iteration methods to ConsumeQueueTrait

The introduction of iterate_from and iterate_from_inner methods allows for more flexible and efficient iteration over queue elements starting from a given index. This is a valuable addition for managing large data sets efficiently.

rocketmq-broker/src/processor/default_pull_message_result_handler.rs (1)

Line range hint 44-287: Approved: Implementation of DefaultPullMessageResultHandler

The DefaultPullMessageResultHandler is well-implemented with methods to handle pull message results effectively. The use of smart pointers like Arc ensures thread safety, and the modular approach in handling different response scenarios enhances the maintainability of the code.

rocketmq-remoting/src/protocol/remoting_command.rs (3)

216-237: Approved: Added method to set command custom header by reference

The new method set_command_custom_header_ref enhances performance by allowing headers to be set by reference, avoiding unnecessary data duplication. This is particularly beneficial in high-throughput scenarios where performance is critical.


244-246: Approved: Added method to set code by reference

The method set_code_ref provides a more efficient way to set the command code by reference, aligning with Rust's focus on performance and safety.


276-278: Approved: Added method to set remark by reference

The set_remark_ref method allows setting the remark by reference, which is consistent with the other methods added for reference-based setting. This approach reduces memory overhead and aligns with best practices in Rust for handling mutable data efficiently.

rocketmq-broker/src/processor.rs (3)

68-68: Addition of default_pull_message_result_handler module.

The introduction of the default_pull_message_result_handler module is crucial for supporting pull message consumption as mentioned in the PR objectives. This module likely contains the logic for handling pull message results, which is a new functionality aimed at enhancing the message pulling mechanism in RocketMQ.

Ensure that the implementation of this module is consistent with existing architectural patterns and that it integrates seamlessly with other components of the system.


Line range hint 239-281: Review of process_request method in BrokerRequestProcessor.

This method handles different types of requests by delegating them to the appropriate processors. The handling of RequestCode::PullMessage and RequestCode::LitePullMessage has been specifically mentioned, indicating that these request types are now being processed by the pull_message_processor.

It's important to ensure that this delegation is done efficiently and that the pull_message_processor is properly equipped to handle these new types of requests, especially since they are related to the newly supported pull message functionality. Consider adding performance metrics or logs to monitor how these new request types are being processed.


Line range hint 283-397: Review of SendMessageProcessorInner and its methods.

The SendMessageProcessorInner struct contains several methods that are crucial for the message sending process, such as execute_send_message_hook_before and execute_send_message_hook_after. These methods utilize hooks to execute custom logic before and after sending a message, which can be critical for ensuring message integrity and tracking.

It's important to verify that these hooks are being executed in the correct order and that they do not introduce latency or errors into the message sending process. Additionally, the method build_msg_context builds the message context for sending, which should be carefully reviewed to ensure that all necessary information is correctly included and that no sensitive information is leaked.

rocketmq-store/src/config/message_store_config.rs (3)

101-104: Updated field types to u64 for better capacity handling.

The changes to use u64 instead of usize for these fields are appropriate, given they likely represent byte sizes or counts that could exceed the capacity of usize on 32-bit systems. This is a good practice for ensuring that the software can handle larger datasets as it scales.


206-206: New configuration field added: max_filter_message_size.

This new field has been added to configure the maximum size of a filter message. Ensure that this new configuration option is documented clearly in the user manual or help documentation to avoid confusion about its purpose and usage.


285-288: Default values for message transfer limits and filter message size set in Default implementation.

The default values for max_transfer_bytes_on_message_in_memory, max_transfer_count_on_message_in_memory, max_transfer_bytes_on_message_in_disk, and max_transfer_count_on_message_in_disk have been set along with the max_filter_message_size. It is crucial to verify that these defaults are sensible based on typical usage scenarios and system capabilities.

Also applies to: 390-390

rocketmq-store/src/queue/local_file_consume_queue_store.rs (1)

507-507: Refinement in locking mechanism with .as_ref().

The addition of .as_ref() before lock() is typically used to handle Option types safely, ensuring that there is a value before attempting to acquire a lock. This change likely improves the robustness of the code by preventing potential panics from trying to lock a None value. It's a good practice to handle optional values safely.

rocketmq-broker/src/processor/pull_message_processor.rs (1)

96-96: Verify the impact of removing _broker_allow_flow_ctr_suspend parameter.

The removal of _broker_allow_flow_ctr_suspend simplifies the method but ensure it doesn't affect the functionality of message pulling, especially under high load or slow network conditions.

Verification successful

Verify the impact of removing _broker_allow_flow_ctr_suspend parameter.

The _broker_allow_flow_ctr_suspend parameter is not used anywhere else in the codebase, and its removal does not affect any conditional checks or logic within the process_request_inner method.

  • The rg and ast-grep commands did not find any occurrences of _broker_allow_flow_ctr_suspend outside the method signature.
  • Manual inspection of pull_message_processor.rs confirmed that the parameter is not used within the method body.

Thus, the removal of this parameter simplifies the method without impacting its functionality.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check usage of the removed parameter in other parts of the codebase.

# Test: Search for the parameter usage. Expect: No occurrences outside this method.
rg '_broker_allow_flow_ctr_suspend'

Length of output: 35



Script:

#!/bin/bash
# Description: Check for the usage of `_broker_allow_flow_ctr_suspend` within `pull_message_processor.rs`.

# Test: Search for the parameter usage within the file. Expect: Occurrences only within the method, and no critical logic dependent on it.
rg '_broker_allow_flow_ctr_suspend' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 91



Script:

#!/bin/bash
# Description: Extract the full implementation of the `process_request_inner` method from `pull_message_processor.rs`.

# Test: Ensure there are no dependencies on `_broker_allow_flow_ctr_suspend`.
ast-grep --lang rust --pattern 'fn process_request_inner($_) { $$$ }' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 127



Script:

#!/bin/bash
# Description: List all functions in `pull_message_processor.rs` to confirm the method name and presence.

# Test: Ensure the correct method name and locate its implementation.
ast-grep --lang rust --pattern 'fn $_($_) { $$$ }' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 108



Script:

#!/bin/bash
# Description: Display the content of `pull_message_processor.rs` to manually inspect the methods and usage of `_broker_allow_flow_ctr_suspend`.

# Test: Ensure the correct method name and locate its implementation.
cat rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 22131

rocketmq-store/src/message_store/default_message_store.rs (4)

442-450: Performance consideration in estimate_in_mem_by_commit_offset.

The calculation in estimate_in_mem_by_commit_offset might be a performance bottleneck due to its usage of floating-point operations. Consider optimizing this part if it's called frequently.
[PERFORMANCE]


74-74: Verify the usage of the constant MAX_PULL_MSG_SIZE.

Ensure that MAX_PULL_MSG_SIZE is used appropriately throughout the project to maintain consistency in handling message sizes.

Verification successful

MAX_PULL_MSG_SIZE is used appropriately in the codebase.

The constant MAX_PULL_MSG_SIZE is utilized correctly in the following locations:

  • rocketmq-store/src/message_store/default_message_store.rs

    • It is used to cap the max_pull_size to ensure it does not exceed the defined maximum pull message size.
    • This usage is consistent with the purpose of maintaining a limit on the message size that can be pulled.
  • rocketmq-broker/src/processor/pull_message_processor.rs

    • It is passed as a parameter to a function, ensuring that the maximum pull message size is respected during message processing.
    • This ensures that the message size constraint is enforced during the pull message processing.

These usages indicate that MAX_PULL_MSG_SIZE is employed to maintain consistency in handling message sizes across the project.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of MAX_PULL_MSG_SIZE across the project.
rg "MAX_PULL_MSG_SIZE" --type rust

Length of output: 639



Script:

#!/bin/bash
# Extract the relevant code snippets where MAX_PULL_MSG_SIZE is used in rocketmq-store/src/message_store/default_message_store.rs
rg -A 5 -B 5 "MAX_PULL_MSG_SIZE" rocketmq-store/src/message_store/default_message_store.rs

# Extract the relevant code snippets where MAX_PULL_MSG_SIZE is used in rocketmq-broker/src/processor/pull_message_processor.rs
rg -A 5 -B 5 "MAX_PULL_MSG_SIZE" rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 2443


707-919: Complex logic in get_message needs thorough testing.

The get_message function is complex and handles many edge cases. Ensure thorough unit and integration tests cover all paths to prevent regressions and ensure reliability.


431-439: Logic verification in next_offset_correction.

Verify the logic in next_offset_correction to ensure it behaves correctly under all configurations of broker_role and offset_check_in_slave. This is critical for maintaining data consistency and correct behavior in cluster setups.

Verification successful

Logic verification in next_offset_correction.

The function next_offset_correction correctly updates next_offset based on the configurations of broker_role and offset_check_in_slave. The multiple usages in the codebase suggest it handles different scenarios, and the configurations cover all possible values.

  • The function updates next_offset based on whether broker_role is not Slave or offset_check_in_slave is true.
  • The configurations broker_role and offset_check_in_slave are defined and used appropriately in the codebase.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for the usage of `next_offset_correction` to ensure it handles all edge cases.

rg "next_offset_correction" --type rust

Length of output: 981



Script:

#!/bin/bash
# Retrieve the definition and all usages of `next_offset_correction` in `rocketmq-store/src/message_store/default_message_store.rs`.
rg "next_offset_correction" rocketmq-store/src/message_store/default_message_store.rs

# Retrieve the definitions and usages of `broker_role` and `offset_check_in_slave` in the codebase.
rg "broker_role" --type rust
rg "offset_check_in_slave" --type rust

Length of output: 2444

Comment on lines +272 to +274
pub fn get_message(&self, offset: i64, size: i32) -> Option<SelectMappedBufferResult> {
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method get_message always returns None.

The implementation of get_message in the CommitLog struct always returns None, which means it does not perform its intended function of retrieving messages. This could be a placeholder or an incomplete implementation.

Consider implementing the logic to retrieve messages based on the provided offset and size, or if this method is not needed, consider removing it to clean up the codebase.

Comment on lines +467 to +474

pub fn record_disk_fall_behind_size(
&self,
group: &str,
topic: &str,
queue_id: i32,
fall_behind: i64,
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the record_disk_fall_behind_size method.

The method record_disk_fall_behind_size is currently empty. This might be a placeholder for future implementation.

Please implement this method or provide appropriate documentation if it is intended to remain unimplemented for now.

Comment on lines +778 to +780
fn iterate_from(&self, start_index: i64) -> Option<Box<dyn Iterator<Item = CqUnit>>> {
todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement iterator methods iterate_from and iterate_from_inner.

Both methods iterate_from and iterate_from_inner are currently placeholders with todo!(). This indicates they are not yet implemented.

Please implement these methods to support iterating over consume queue units. If these methods are not planned to be implemented soon, consider adding documentation explaining their intended usage and future implementation plans.

Also applies to: 782-788

Comment on lines +33 to +35
use rocketmq_common::common::attribute::cleanup_policy::CleanupPolicy;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::mix_all::is_sys_consumer_group_for_no_cold_read_limit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor suggestion for imports: Combine common imports.

You can combine the imports from rocketmq_common::common to clean up the code and improve readability.

use rocketmq_common::common::{
    attribute::cleanup_policy::CleanupPolicy, 
    message::{message_batch::MessageExtBatch, mix_all::is_sys_consumer_group_for_no_cold_read_limit}
};

@@ -75,6 +80,7 @@
use crate::store_path_config_helper::get_abort_file;
use crate::store_path_config_helper::get_store_checkpoint;
use crate::store_path_config_helper::get_store_path_consume_queue;
use crate::utils::store_util::TOTAL_PHYSICAL_MEMORY_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of global constant for memory size needs clarification.

The usage of TOTAL_PHYSICAL_MEMORY_SIZE as a global mutable state might lead to race conditions or unpredictable behavior in a multi-threaded environment. Consider encapsulating this within appropriate data structures with thread-safe access patterns.

Comment on lines +22 to +31
pub static TOTAL_PHYSICAL_MEMORY_SIZE: Lazy<u64> =
Lazy::new(StoreUtil::get_total_physical_memory_size);

impl StoreUtil {
pub fn get_total_physical_memory_size() -> u64 {
let mut sys = System::new_all();
sys.refresh_all();
let physical_total = sys.total_memory();
physical_total * 1024 // Convert from kilobytes to bytes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory Size Calculation in StoreUtil

The implementation for calculating total physical memory size is straightforward and uses the sysinfo crate. However, consider handling potential errors that might arise from the sysinfo API, such as failures in fetching system information.

-        let physical_total = sys.total_memory();
+        let physical_total = sys.total_memory().unwrap_or_default();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub static TOTAL_PHYSICAL_MEMORY_SIZE: Lazy<u64> =
Lazy::new(StoreUtil::get_total_physical_memory_size);
impl StoreUtil {
pub fn get_total_physical_memory_size() -> u64 {
let mut sys = System::new_all();
sys.refresh_all();
let physical_total = sys.total_memory();
physical_total * 1024 // Convert from kilobytes to bytes
}
pub static TOTAL_PHYSICAL_MEMORY_SIZE: Lazy<u64> =
Lazy::new(StoreUtil::get_total_physical_memory_size);
impl StoreUtil {
pub fn get_total_physical_memory_size() -> u64 {
let mut sys = System::new_all();
sys.refresh_all();
let physical_total = sys.total_memory().unwrap_or_default();
physical_total * 1024 // Convert from kilobytes to bytes
}

Comment on lines +47 to +51
Success = 0,
SystemError = 1,
SystemBusy = 2,
RequestCodeNotSupported = 3,
TransactionFailed = 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Ensure Documentation for Response Codes

The response codes Success, SystemError, SystemBusy, RequestCodeNotSupported, and TransactionFailed are unique and do not overlap with existing codes. However, there are no documentation comments explaining the usage of each response code.

  • Consider adding documentation comments for each response code to clarify their intended use.
Analysis chain

Validate and document new response codes.

Ensure that the newly added response codes such as Success, SystemError, SystemBusy, RequestCodeNotSupported, and TransactionFailed are unique and do not overlap with existing codes. Additionally, consider adding documentation comments for each to explain when they should be used.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the uniqueness and documentation of new response codes.

# Test: Search for the response codes in the project to ensure no overlaps.
rg --type rust $'enum ResponseCode'

Length of output: 104



Script:

#!/bin/bash
# List all variants of the ResponseCode enum to check for overlaps.
ast-grep --lang rust --pattern 'enum ResponseCode { $$$ }' | grep -oP '(?<=ResponseCode\s*{)[^}]+' | tr ',' '\n' | sort | uniq -d

# Search for documentation comments related to ResponseCode.
ast-grep --lang rust --pattern '/// $_' --pattern 'enum ResponseCode { $$$ }' -A 1

Length of output: 428



Script:

#!/bin/bash
# List all variants of the ResponseCode enum to check for overlaps.
ast-grep --lang rust --pattern 'enum ResponseCode { $$$ }' | awk -F'[{}]' '{print $2}' | tr ',' '\n' | sort | uniq -d

# Search for documentation comments related to ResponseCode.
ast-grep --lang rust --pattern '/// $_' -A 1 | grep -B 1 'enum ResponseCode'

Length of output: 196

Comment on lines +152 to +153
pub slave_read_enable: bool,
pub commercial_base_count: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Document and validate new configuration options.

The new configuration options slave_read_enable and commercial_base_count have been added to BrokerConfig. However, they lack proper documentation and validation within the codebase. Ensure these fields are well-documented to explain their purpose and usage, and add appropriate validation logic to integrate them smoothly with existing configurations.

  • Documentation: Add comments explaining the purpose and expected values for slave_read_enable and commercial_base_count.
  • Validation: Implement validation logic to check the correctness of these fields where necessary.
Analysis chain

Document and validate new configuration options.

The new configuration options slave_read_enable and commercial_base_count have been added to BrokerConfig. Ensure that these fields are well-documented in the code and validate their usage across the system to ensure they integrate smoothly with existing configurations.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage and documentation of new configuration options.

# Test: Search for the usage of `slave_read_enable` and `commercial_base_count` in the project.
rg --type rust $'slave_read_enable|commercial_base_count'

Length of output: 882



Script:

#!/bin/bash
# Description: Verify the documentation of new configuration options.

# Test: Search for documentation comments for `slave_read_enable` and `commercial_base_count` in the project.
rg --type rust -A 5 'slave_read_enable|commercial_base_count' | grep -E '///|//!|/**|/*!'

Length of output: 2965



Script:

#!/bin/bash
# Description: Verify the validation of new configuration options.

# Test: Search for validation logic for `slave_read_enable` and `commercial_base_count` in the project.
rg --type rust 'validate|check|assert' -A 5 -B 5 'slave_read_enable|commercial_base_count'

Length of output: 172



Script:

#!/bin/bash
# Description: Verify the validation of new configuration options.

# Test: Search for validation logic for `slave_read_enable` and `commercial_base_count` in the project.
rg --type rust -A 5 -B 5 'validate|check|assert' | rg 'slave_read_enable|commercial_base_count'

Length of output: 97

Comment on lines +305 to +307
fn iterate_from(&self, start_index: i64) -> Option<Box<dyn Iterator<Item = CqUnit>>> {
todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement or document the method iterate_from.

The method iterate_from is currently not implemented and contains a todo!() placeholder. Ensure that this method is either implemented or properly documented with a TODO comment explaining what needs to be done before it can be completed.

Would you like me to help implement this method or should I open a GitHub issue to track this task?

Comment on lines +309 to +315
fn iterate_from_inner(
&self,
start_index: i64,
count: i32,
) -> Option<Box<dyn Iterator<Item = CqUnit>>> {
todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement or document the method iterate_from_inner.

Like iterate_from, the method iterate_from_inner is also marked with todo!() and is not implemented. Consider adding a TODO comment if it is to be implemented later or provide an implementation to fulfill its intended functionality.

Would you like assistance in implementing this method, or should I open a GitHub issue to track this task?

@mxsm
Copy link
Owner Author

mxsm commented Jun 22, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Support pull message consume-4
2 participants