Skip to content

Commit

Permalink
[ISSUE #2304]🏗️Add RemotingCommand encodeHeader method for Zero Copy🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 17, 2025
1 parent 773606f commit dde9ea5
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions rocketmq-remoting/src/protocol/remoting_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,43 @@ impl RemotingCommand {
})
}

pub fn encode_header(&self) -> Option<Bytes> {
if let Some(body) = &self.body {
self.encode_header_with_body_length(body.len())

Check warning on line 389 in rocketmq-remoting/src/protocol/remoting_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/remoting_command.rs#L387-L389

Added lines #L387 - L389 were not covered by tests
} else {
self.encode_header_with_body_length(0)

Check warning on line 391 in rocketmq-remoting/src/protocol/remoting_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/remoting_command.rs#L391

Added line #L391 was not covered by tests
}
}
pub fn encode_header_with_body_length(&self, body_length: usize) -> Option<Bytes> {
//for zero copy
// 1> header length size
let mut length = 4;

// 2> header data length
let header_data = self.header_encode().unwrap();

length += header_data.len();

// 3> body data length
length += body_length;

let mut result = BytesMut::with_capacity(4 + length - body_length);

// length
result.put_i32(length as i32);

// header length
result.put_i32(mark_protocol_type(
header_data.len() as i32,
self.serialize_type,
));

// header data
result.put(header_data);

Some(result.freeze())
}

Check warning on line 422 in rocketmq-remoting/src/protocol/remoting_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/remoting_command.rs#L393-L422

Added lines #L393 - L422 were not covered by tests

pub fn make_custom_header_to_net(&mut self) {
if let Some(header) = &self.command_custom_header {
let option = header.to_map();
Expand Down Expand Up @@ -746,6 +783,10 @@ pub fn parse_header_length(size: i32) -> usize {
(size & 0xFFFFFF) as usize
}

pub fn mark_protocol_type(source: i32, serialize_type: SerializeType) -> i32 {
((serialize_type.get_code() as i32) << 24) | (source & 0x00FFFFFF)
}

Check warning on line 788 in rocketmq-remoting/src/protocol/remoting_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/remoting_command.rs#L786-L788

Added lines #L786 - L788 were not covered by tests

pub fn parse_serialize_type(size: i32) -> crate::Result<SerializeType> {
let code = (size >> 24) as u8;
match SerializeType::value_of(code) {
Expand Down

0 comments on commit dde9ea5

Please # to comment.