-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #1532]🚀Add support append enabled_append_prop_crc⚡️ #1538
Conversation
WalkthroughThe changes in this pull request introduce a new dependency to the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1538 +/- ##
==========================================
- Coverage 24.85% 24.84% -0.02%
==========================================
Files 450 450
Lines 59525 59561 +36
==========================================
+ Hits 14795 14796 +1
- Misses 44730 44765 +35 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (7)
rocketmq-store/src/base/append_message_callback.rs (1)
199-201
: Optimize buffer handling by avoiding unnecessary allocationsInstead of using
pre_encode_buffer.chunk()
, which may lead to unnecessary allocations, you can usepre_encode_buffer.as_ref()
to obtain a reference to the internal buffer without copying.Apply this change to improve performance:
-mapped_file.append_message_bytes_no_position_update_ref(pre_encode_buffer.chunk()); +mapped_file.append_message_bytes_no_position_update_ref(pre_encode_buffer.as_ref());rocketmq-store/src/log_file/commit_log.rs (1)
906-908
: Typographical error in log messageThe log message at line 906 contains unintended line breaks, which may cause logging issues.
Modify the log message for clarity:
warn!( - "The commitlog files are deleted, and delete the consume queue - files" + "The commitlog files are deleted, and delete the consume queue files" );rocketmq-store/src/message_encoder/message_ext_encoder.rs (1)
530-531
: Avoid redundantcopy_to_bytes
callWhen retrieving the encoder buffer,
self.byte_buf.copy_to_bytes(len)
consumes the buffer. Ifbyte_buf
is needed elsewhere after this call, consider usingself.byte_buf.clone().freeze()
instead.-let len = self.byte_buf.len(); -self.byte_buf.copy_to_bytes(len) +let buffer = self.byte_buf.clone(); +buffer.freeze()rocketmq-common/src/common/message/message_decoder.rs (2)
719-732
: Consider extracting store size calculationThe store size calculation logic is duplicated from the previous encode function.
Consider extracting the calculation into a shared function:
fn calculate_store_size( body_len: usize, topic_len: usize, properties_length: usize, born_host_length: usize, include_store_fields: bool, ) -> usize { let mut size = 4 // TOTALSIZE + 4 // MAGICCODE + 4 // BODYCRC + 4 // QUEUEID + 4 // FLAG + 8 // QUEUEOFFSET + 8 // PHYSICALOFFSET + 4 // SYSFLAG + 8 // BORNTIMESTAMP + born_host_length // BORNHOST + 4 // RECONSUMETIMES + 8 // Prepared Transaction Offset + 4 + body_len // BODY + 1 + topic_len // TOPIC + 2 + properties_length; // propertiesLength if include_store_fields { size += 8 // STORETIMESTAMP + store_host_address_length; // STOREHOSTADDRESS } size }
797-810
: Improve CRC32 string formatting efficiency and safetyWhile the implementation works, it could be improved in terms of efficiency and safety.
Consider these improvements:
- Use
format!
for cleaner number formatting:pub fn create_crc32(mut input: &mut [u8], crc32: u32) { input.put(MessageConst::PROPERTY_CRC32.as_bytes()); input.put_u8(NAME_VALUE_SEPARATOR as u8); input.put(format!("{:010}", crc32).as_bytes()); input.put_u8(PROPERTY_SEPARATOR as u8); }
- Add bounds checking to prevent buffer overflow:
pub fn create_crc32(mut input: &mut [u8], crc32: u32) -> Result<()> { let required_len = MessageConst::PROPERTY_CRC32.len() + 12; // 1 separator + 10 digits + 1 separator if input.len() < required_len { return Err(Error::BufferTooSmall); } // ... rest of the implementation Ok(()) }rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs (1)
Line range hint
341-365
: Reduce code duplication and improve error handling.The implementations of
append_message_no_position_update
andappend_message_offset_no_position_update
are identical. Additionally, the error messages need improvement.Consider the following improvements:
fn append_message_no_position_update(&self, data: &[u8], offset: usize, length: usize) -> bool { let current_pos = self.wrote_position.load(Ordering::Relaxed) as usize; if current_pos + length <= self.file_size as usize { let mut mapped_file = &mut self.get_mapped_file_mut()[current_pos..current_pos + length]; if let Some(data_slice) = data.get(offset..offset + length) { if mapped_file.write_all(data_slice).is_ok() { return true; } else { - error!("append_message_offset_length write_all error"); + error!("Failed to write data in append_message_no_position_update"); } } else { - error!("Invalid data slice"); + error!("Invalid data slice: offset {} length {} exceeds data length {}", + offset, length, data.len()); } } false } -fn append_message_offset_no_position_update( - &self, - data: &[u8], - offset: usize, - length: usize, -) -> bool { - let current_pos = self.wrote_position.load(Ordering::Relaxed) as usize; - - if current_pos + length <= self.file_size as usize { - let mut mapped_file = - &mut self.get_mapped_file_mut()[current_pos..current_pos + length]; - - if let Some(data_slice) = data.get(offset..offset + length) { - if mapped_file.write_all(data_slice).is_ok() { - return true; - } else { - error!("append_message_offset_length write_all error"); - } - } else { - error!("Invalid data slice"); - } - } - false -} +fn append_message_offset_no_position_update( + &self, + data: &[u8], + offset: usize, + length: usize, +) -> bool { + self.append_message_no_position_update(data, offset, length) +}rocketmq-store/src/log_file/mapped_file.rs (1)
203-204
: LGTM! Well-documented trait interface changes.The changes to use
&[u8]
instead ofBytes
are consistent and well-documented. The new methodappend_message_bytes_no_position_update_ref
provides a more ergonomic API for working with byte slices.Consider adding a note in the documentation about the performance implications of using
&[u8]
vsBytes
, particularly for large messages, to help users make informed decisions about which method to use.Also applies to: 206-216, 233-233
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (8)
rocketmq-common/Cargo.toml
(1 hunks)rocketmq-common/src/common/message/message_decoder.rs
(6 hunks)rocketmq-common/src/common/message/message_ext_broker_inner.rs
(4 hunks)rocketmq-store/src/base/append_message_callback.rs
(4 hunks)rocketmq-store/src/log_file/commit_log.rs
(3 hunks)rocketmq-store/src/log_file/mapped_file.rs
(2 hunks)rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs
(1 hunks)rocketmq-store/src/message_encoder/message_ext_encoder.rs
(7 hunks)
🔇 Additional comments (7)
rocketmq-store/src/log_file/commit_log.rs (1)
104-105
: Ensure proper memory management with ArcMut
The use of ArcMut<BytesMut>
introduces shared mutability. Be cautious when using ArcMut
to prevent data races in concurrent contexts. Ensure all accesses are properly synchronized.
Please confirm that all accesses to the ArcMut<BytesMut>
buffer are thread-safe and consider using synchronization primitives if necessary.
rocketmq-common/Cargo.toml (1)
15-16
: Confirm the addition of rocketmq-rust
as a dependency
The rocketmq-rust
crate has been added as a dependency. Ensure that this addition is necessary and does not introduce any cyclic dependencies within the workspace.
Run the following script to check for cyclic dependencies:
✅ Verification successful
After analyzing the Cargo.toml files, I can generate the final response:
The addition of rocketmq-rust
dependency is correct and does not create cyclic dependencies
The workspace structure shows that rocketmq-rust
(in the rocketmq
directory) is the core crate, and it's appropriately used as a dependency by other workspace members including rocketmq-common
. The dependency graph is hierarchical with rocketmq-rust
at the base level, and there are no circular dependencies observed in the workspace configuration.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that adding `rocketmq-rust` does not create cyclic dependencies.
# Check for cyclic dependencies using cargo
cargo metadata --format-version=1 --no-deps | jq -r '.packages[].name' | sort | uniq -d
Length of output: 185
Script:
#!/bin/bash
# Let's check the workspace dependencies by examining Cargo.toml files
# First, find all Cargo.toml files
fd Cargo.toml
# Then examine the content of each to understand the dependency structure
echo "=== Content of workspace Cargo.toml files ==="
fd Cargo.toml --exec cat {}
Length of output: 14944
rocketmq-common/src/common/message/message_ext_broker_inner.rs (1)
199-222
: Implement Debug
trait carefully to avoid exposing sensitive data
The Debug
implementation masks encoded_buff
content with "****"
, which is good for not exposing buffer content. However, ensure that this aligns with the project's logging and debugging policies.
rocketmq-store/src/message_encoder/message_ext_encoder.rs (2)
61-61
: Initialize buffer capacity appropriately
In the constructor of MessageExtEncoder
, ensure that the initial capacity of byte_buf
is correctly set to max_message_size
to prevent buffer overflows.
548-549
: Ensure correct return of byte_buf
In the byte_buf
method, returning a clone of self.byte_buf
is appropriate. Ensure that this change does not introduce unintended side effects due to shared mutability with ArcMut
.
rocketmq-common/src/common/message/message_decoder.rs (1)
409-413
: LGTM! Well-documented size calculation
The store size calculation is clear and well-documented with comments explaining each component.
rocketmq-store/src/log_file/mapped_file.rs (1)
Line range hint 341-365
: Verify CRC calculation implementation.
The enabled_append_prop_crc
feature is used in the append_messages
method, but we should verify the CRC calculation implementation in the related files.
✅ Verification successful
Based on the search results, I can now generate the final response as the implementation details are clear from the code:
CRC calculation is properly implemented and the feature is complete.
The CRC calculation implementation is correctly handled across the codebase:
- In
MessageExtEncoder
, the CRC length is properly reserved based on the config flag - In
DefaultAppendMessageCallback
, CRC32 is calculated and stored when enabled:- Calculates CRC32 on the message buffer excluding the reserved space
- Uses proper buffer slicing to store the CRC value
- The feature flag is properly propagated through the call chain from config to the actual implementation
- The CRC property is properly cleaned up in CommitLog before appending new messages
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify CRC calculation implementation
# Check for CRC calculation in message encoder
rg -A 5 "enabled_append_prop_crc" .
ast-grep --pattern 'fn do_append_batch($$$) { $$$}'
Length of output: 11216
@@ -26,6 +27,7 @@ | |||
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag; | |||
use rocketmq_common::utils::message_utils; | |||
use rocketmq_common::CRC32Utils::crc32; | |||
use rocketmq_common::MessageDecoder::create_crc32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing import for crc32
function
The crc32
function from CRC32Utils
is used in the code but not imported. Ensure that the crc32
function is correctly imported to prevent compilation errors.
Add the missing import:
use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
use rocketmq_common::utils::message_utils;
+use rocketmq_common::CRC32Utils::crc32;
use rocketmq_common::MessageDecoder::create_crc32;
Committable suggestion skipped: line range outside the PR's diff.
let crc32 = crc32(&pre_encode_buffer[..check_size as usize]); | ||
create_crc32( | ||
&mut pre_encode_buffer[check_size as usize..msg_len as usize], | ||
crc32, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential buffer overrun in CRC32 computation
In the do_append
method, the slice &pre_encode_buffer[..check_size as usize]
may lead to a buffer overrun if check_size
exceeds the length of pre_encode_buffer
. Ensure that check_size
is less than or equal to pre_encode_buffer.len()
to prevent potential runtime errors.
Consider adding bounds checking before slicing:
let check_size = msg_len - self.crc32_reserved_length;
+if (check_size as usize) > pre_encode_buffer.len() {
+ return AppendMessageResult {
+ status: AppendMessageStatus::UnknownError,
+ wrote_offset,
+ ..Default::default()
+ };
+}
let crc32 = crc32(&pre_encode_buffer[..check_size as usize]);
Committable suggestion skipped: line range outside the PR's diff.
) -> (Option<PutMessageResult>, ArcMut<BytesMut>) { | ||
PUT_MESSAGE_THREAD_LOCAL.with(|thread_local| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update return type in encode_message_ext
function
The return type of encode_message_ext
has been changed but not properly updated in all places where the function is used. Ensure that the function signature and its usage are consistent throughout the codebase to prevent type mismatch errors.
Verify and update the function signature:
fn encode_message_ext(
message_ext: &MessageExtBrokerInner,
message_store_config: &Arc<MessageStoreConfig>,
) -> (Option<PutMessageResult>, ArcMut<BytesMut>)
pub struct MessageExtBrokerInner { | ||
pub message_ext_inner: MessageExt, | ||
pub properties_string: CheetahString, | ||
pub tags_code: i64, | ||
pub encoded_buff: Option<bytes::BytesMut>, | ||
pub encoded_buff: Option<ArcMut<bytes::BytesMut>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update encoded_buff
field type to ensure compatibility
The type of encoded_buff
has been changed to Option<ArcMut<bytes::BytesMut>>
. Ensure that all usages of encoded_buff
are updated accordingly to prevent type mismatch errors.
Verify that the new type aligns with the intended shared mutability and adjust any affected code.
Which Issue(s) This PR Fixes(Closes)
Fixes #1532
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
Improvements
Bug Fixes
These updates enhance the functionality, efficiency, and clarity of message processing within the application.