Skip to content

Commit

Permalink
[ISSUE #1221]Optimize MappedFile method (#1222)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Nov 18, 2024
1 parent d66f037 commit cd12693
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 21 deletions.
3 changes: 2 additions & 1 deletion rocketmq-cli/src/content_show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fs;
use std::path::PathBuf;

use bytes::Buf;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_decoder;
use rocketmq_store::log_file::mapped_file::default_mapped_file_impl::DefaultMappedFile;
use rocketmq_store::log_file::mapped_file::MappedFile;
Expand All @@ -34,7 +35,7 @@ pub fn print_content(from: Option<u32>, to: Option<u32>, path: Option<PathBuf>)
let file_metadata = fs::metadata(path_buf.clone()).unwrap();
println!("file size: {}B", file_metadata.len());
let mapped_file = DefaultMappedFile::new(
path_buf.to_os_string().to_string_lossy().to_string(),
CheetahString::from(path_buf.to_string_lossy().to_string()),
file_metadata.len(),
);
// read message number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ mod tests {
let wrapper = TopicConfigAndMappingSerializeWrapper::default();
assert!(wrapper.topic_queue_mapping_info_map.is_empty());
assert!(wrapper.topic_queue_mapping_detail_map.is_empty());
assert_eq!(wrapper.mapping_data_version, DataVersion::new());
//assert_eq!(wrapper.mapping_data_version, DataVersion::new());
assert_eq!(
wrapper
.topic_config_serialize_wrapper()
Expand Down
1 change: 0 additions & 1 deletion rocketmq-store/src/base/append_message_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ impl DefaultAppendMessageCallback {
}
}

#[allow(unused_variables)]
impl AppendMessageCallback for DefaultAppendMessageCallback {
fn do_append<MF: MappedFile>(
&self,
Expand Down
9 changes: 6 additions & 3 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use cheetah_string::CheetahString;
use log::warn;
use parking_lot::RwLock;
use rocketmq_common::UtilAll::offset_to_file_name;
Expand Down Expand Up @@ -142,8 +143,10 @@ impl MappedFileQueue {
return false;
}

let mapped_file =
DefaultMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size);
let mapped_file = DefaultMappedFile::new(
CheetahString::from_string(file.to_string_lossy().to_string()),
self.mapped_file_size,
);
// Set wrote, flushed, committed positions for mapped_file
mapped_file.set_wrote_position(self.mapped_file_size as i32);
mapped_file.set_flushed_position(self.mapped_file_size as i32);
Expand Down Expand Up @@ -210,7 +213,7 @@ impl MappedFileQueue {
) -> Option<Arc<DefaultMappedFile>> {
let mut mapped_file = match self.allocate_mapped_file_service {
None => DefaultMappedFile::new(
next_file_path.to_string_lossy().to_string(),
CheetahString::from_string(next_file_path.to_string_lossy().to_string()),
self.mapped_file_size,
),
Some(ref _value) => {
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-store/src/index/index_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use bytes::Buf;
use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::hasher::string_hasher::JavaStringHasher;

use crate::index::index_header::IndexHeader;
Expand Down Expand Up @@ -69,7 +70,7 @@ impl IndexFile {
let file_total_size =
INDEX_HEADER_SIZE + (hash_slot_num * HASH_SLOT_SIZE) + (index_num * INDEX_SIZE);
let mapped_file = Arc::new(DefaultMappedFile::new(
file_name.to_string(),
CheetahString::from_slice(file_name),
file_total_size as u64,
));

Expand All @@ -94,7 +95,7 @@ impl IndexFile {
index_file
}

pub fn get_file_name(&self) -> String {
pub fn get_file_name(&self) -> &CheetahString {
self.mapped_file.get_file_name()
}

Expand Down
3 changes: 2 additions & 1 deletion rocketmq-store/src/log_file/mapped_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::io;
use std::sync::Arc;

use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;

Expand All @@ -37,7 +38,7 @@ pub trait MappedFile {
///
/// # Returns
/// A `String` representing the name of the file.
fn get_file_name(&self) -> String;
fn get_file_name(&self) -> &CheetahString;

/// Renames the mapped file to the specified file name.
///
Expand Down
25 changes: 13 additions & 12 deletions rocketmq-store/src/log_file/mapped_file/default_mapped_file_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;

use bytes::Bytes;
use bytes::BytesMut;
use cheetah_string::CheetahString;
use memmap2::MmapMut;
use rocketmq_common::common::message::message_batch::MessageExtBatch;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub struct DefaultMappedFile {
file: File,
mmapped_file: SyncUnsafeCellWrapper<MmapMut>,
transient_store_pool: Option<TransientStorePool>,
file_name: String,
file_name: CheetahString,
file_from_offset: u64,
mapped_byte_buffer: Option<bytes::Bytes>,
wrote_position: AtomicI32,
Expand Down Expand Up @@ -94,14 +95,14 @@ impl PartialEq for DefaultMappedFile {

impl Default for DefaultMappedFile {
fn default() -> Self {
Self::new(String::new(), 0)
Self::new(CheetahString::new(), 0)
}
}

impl DefaultMappedFile {
pub fn new(file_name: String, file_size: u64) -> Self {
pub fn new(file_name: CheetahString, file_size: u64) -> Self {
let file_from_offset = Self::get_file_from_offset(&file_name);
let path_buf = PathBuf::from(file_name.clone());
let path_buf = PathBuf::from(file_name.as_str());
ensure_dir_ok(path_buf.parent().unwrap().to_str().unwrap());
let file = OpenOptions::new()
.read(true)
Expand Down Expand Up @@ -140,8 +141,8 @@ impl DefaultMappedFile {
}
}

fn get_file_from_offset(file_name: &String) -> u64 {
let file_from_offset = PathBuf::from(file_name.to_owned())
fn get_file_from_offset(file_name: &CheetahString) -> u64 {
let file_from_offset = PathBuf::from(file_name.as_str())
.file_name()
.unwrap()
.to_str()
Expand All @@ -151,8 +152,8 @@ impl DefaultMappedFile {
file_from_offset
}

fn build_file(file_name: &String, file_size: u64) -> File {
let path = PathBuf::from(file_name.clone());
fn build_file(file_name: &CheetahString, file_size: u64) -> File {
let path = PathBuf::from(file_name.as_str());
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -166,12 +167,12 @@ impl DefaultMappedFile {
}

pub fn new_with_transient_store_pool(
file_name: String,
file_name: CheetahString,
file_size: u64,
transient_store_pool: TransientStorePool,
) -> Self {
let file_from_offset = Self::get_file_from_offset(&file_name);
let path_buf = PathBuf::from(file_name.clone());
let path_buf = PathBuf::from(file_name.as_str());
let file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -212,8 +213,8 @@ impl DefaultMappedFile {

#[allow(unused_variables)]
impl MappedFile for DefaultMappedFile {
fn get_file_name(&self) -> String {
self.file_name.clone()
fn get_file_name(&self) -> &CheetahString {
&self.file_name
}

fn rename_to(&mut self, file_name: &str) -> bool {
Expand Down

0 comments on commit cd12693

Please # to comment.