diff --git a/rocketmq-namesrv/src/bootstrap.rs b/rocketmq-namesrv/src/bootstrap.rs index cbf9a3b0..5c80717a 100644 --- a/rocketmq-namesrv/src/bootstrap.rs +++ b/rocketmq-namesrv/src/bootstrap.rs @@ -49,11 +49,11 @@ pub struct Builder { } struct NameServerRuntime { - name_server_config: Arc, + name_server_config: ArcMut, tokio_client_config: Arc, server_config: Arc, route_info_manager: Arc>, - kvconfig_manager: Arc>, + kvconfig_manager: KVConfigManager, name_server_runtime: Option, remoting_client: ArcMut, } @@ -159,7 +159,7 @@ impl Builder { } pub fn build(self) -> NameServerBootstrap { - let name_server_config = Arc::new(self.name_server_config.unwrap()); + let name_server_config = ArcMut::new(self.name_server_config.unwrap_or_default()); let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread"); let tokio_client_config = Arc::new(TokioClientConfig::default()); let remoting_client = ArcMut::new(RocketmqDefaultClient::new( @@ -176,9 +176,7 @@ impl Builder { name_server_config.clone(), remoting_client.clone(), ))), - kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new( - name_server_config, - ))), + kvconfig_manager: KVConfigManager::new(name_server_config), name_server_runtime: Some(runtime), remoting_client, }, diff --git a/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs b/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs index 7f9dee7c..a3921c18 100644 --- a/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs +++ b/rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs @@ -18,24 +18,29 @@ use std::collections::HashMap; use std::sync::Arc; use cheetah_string::CheetahString; +use parking_lot::RwLock; use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig; use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils; use rocketmq_common::FileUtils; use rocketmq_remoting::protocol::body::kv_table::KVTable; use rocketmq_remoting::protocol::RemotingSerializable; +use rocketmq_rust::ArcMut; use tracing::error; use tracing::info; use crate::kvconfig::KVConfigSerializeWrapper; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct KVConfigManager { - pub(crate) config_table: HashMap< - CheetahString, /* Namespace */ - HashMap, + pub(crate) config_table: Arc< + RwLock< + HashMap< + CheetahString, /* Namespace */ + HashMap, + >, + >, >, - - pub(crate) namesrv_config: Arc, + pub(crate) namesrv_config: ArcMut, } impl KVConfigManager { @@ -48,9 +53,9 @@ impl KVConfigManager { /// # Returns /// /// A new `KVConfigManager` instance. - pub fn new(namesrv_config: Arc) -> KVConfigManager { + pub fn new(namesrv_config: ArcMut) -> KVConfigManager { KVConfigManager { - config_table: HashMap::new(), + config_table: Arc::new(RwLock::new(HashMap::new())), namesrv_config, } } @@ -62,8 +67,8 @@ impl KVConfigManager { /// A reference to the configuration table. pub fn get_config_table( &self, - ) -> &HashMap> { - &self.config_table + ) -> HashMap> { + self.config_table.read().clone() } /// Gets a reference to the Namesrv configuration. @@ -83,10 +88,9 @@ impl KVConfigManager { if let Ok(content) = result { let wrapper = SerdeJsonUtils::decode::(content.as_bytes()).unwrap(); - if let Some(ref config_table) = wrapper.config_table { - for (namespace, config) in config_table { - self.config_table.insert(namespace.clone(), config.clone()); - } + if let Some(config_table) = wrapper.config_table { + let mut table = self.config_table.write(); + table.extend(config_table); info!("load KV config success"); } } @@ -94,7 +98,8 @@ impl KVConfigManager { /// Persists the current key-value configurations to a file. pub fn persist(&mut self) { - let wrapper = KVConfigSerializeWrapper::new_with_config_table(self.config_table.clone()); + let wrapper = + KVConfigSerializeWrapper::new_with_config_table(self.config_table.write().clone()); let content = serde_json::to_string(&wrapper).unwrap(); let result = FileUtils::string_to_file( @@ -109,96 +114,157 @@ impl KVConfigManager { /// Adds or updates a key-value configuration. pub fn put_kv_config( &mut self, - namespace: impl Into, - key: impl Into, - value: impl Into, + namespace: CheetahString, + key: CheetahString, + value: CheetahString, ) { - let namespace_inner = namespace.into(); - if !self.config_table.contains_key(namespace_inner.as_str()) { - self.config_table - .insert(namespace_inner.clone(), HashMap::new()); - } - - let key = key.into(); - let value = value.into(); - let pre_value = self - .config_table - .get_mut(namespace_inner.as_str()) - .unwrap() - .insert(key.clone(), value.clone()); + let mut config_table = self.config_table.write(); + let namespace_entry = config_table.entry(namespace.clone()).or_default(); + let pre_value = namespace_entry.insert(key.clone(), value.clone()); match pre_value { None => { info!( "putKVConfig create new config item, Namespace: {} Key: {} Value: {}", - namespace_inner, key, value + namespace, key, value ) } Some(_) => { info!( "putKVConfig update config item, Namespace: {} Key: {} Value: {}", - namespace_inner, key, value + namespace, key, value ) } } + drop(config_table); self.persist(); } /// Deletes a key-value configuration. - pub fn delete_kv_config(&mut self, namespace: impl Into, key: impl Into) { - let namespace_inner = namespace.into(); - if !self.config_table.contains_key(namespace_inner.as_str()) { + pub fn delete_kv_config(&mut self, namespace: &CheetahString, key: &CheetahString) { + let mut config_table = self.config_table.write(); + if !config_table.contains_key(namespace) { return; } - let key = key.into(); - let pre_value = self - .config_table - .get_mut(namespace_inner.as_str()) - .unwrap() - .remove(key.as_str()); + let pre_value = config_table.get_mut(namespace).unwrap().remove(key); match pre_value { None => {} Some(value) => { info!( "deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", - namespace_inner, key, value + namespace, key, value ) } } + drop(config_table); self.persist(); } /// Gets the key-value list for a specific namespace. - pub fn get_kv_list_by_namespace( - &mut self, - namespace: impl Into, - ) -> Option> { - let namespace_inner = namespace.into(); - match self.config_table.get(namespace_inner.as_str()) { - None => None, - Some(kv_table) => { - let table = KVTable { - table: kv_table.clone(), - }; - Some(table.encode()) - } - } + pub fn get_kv_list_by_namespace(&self, namespace: &CheetahString) -> Option> { + let config_table = self.config_table.read(); + config_table.get(namespace).map(|kv_table| { + let table = KVTable { + table: kv_table.clone(), + }; + table.encode() + }) } // Gets the value for a specific key in a namespace. pub fn get_kvconfig( &self, - namespace: impl Into, - key: impl Into, + namespace: &CheetahString, + key: &CheetahString, ) -> Option { - match self.config_table.get(namespace.into().as_str()) { + let config_table = self.config_table.read(); + match config_table.get(namespace) { None => None, - Some(kv_table) => { - if let Some(value) = kv_table.get(key.into().as_str()) { - return Some(value.clone()); - } - None - } + Some(kv_table) => kv_table.get(key).cloned(), } } } + +#[cfg(test)] +mod tests { + use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig; + use rocketmq_rust::ArcMut; + + use super::*; + + fn create_kv_config_manager() -> KVConfigManager { + let namesrv_config = ArcMut::new(NamesrvConfig::default()); + KVConfigManager::new(namesrv_config) + } + + #[test] + fn new_kv_config_manager_initializes_empty_config_table() { + let manager = create_kv_config_manager(); + assert!(manager.get_config_table().is_empty()); + } + + #[test] + fn put_kv_config_creates_new_entry() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + let config_table = manager.get_config_table(); + assert_eq!(config_table["namespace"]["key"], "value"); + } + + #[test] + fn put_kv_config_updates_existing_entry() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + manager.put_kv_config("namespace".into(), "key".into(), "new_value".into()); + let config_table = manager.get_config_table(); + assert_eq!(config_table["namespace"]["key"], "new_value"); + } + + #[test] + fn delete_kv_config_removes_entry() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + manager.delete_kv_config(&"namespace".into(), &"key".into()); + let config_table = manager.get_config_table(); + assert!(config_table["namespace"].get("key").is_none()); + } + + #[test] + fn delete_kv_config_does_nothing_if_key_does_not_exist() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + manager.delete_kv_config(&"namespace".into(), &"non_existent_key".into()); + let config_table = manager.get_config_table(); + assert_eq!(config_table["namespace"]["key"], "value"); + } + + #[test] + fn get_kv_list_by_namespace_returns_encoded_list() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + let kv_list = manager.get_kv_list_by_namespace(&"namespace".into()); + assert!(kv_list.is_some()); + } + + #[test] + fn get_kv_list_by_namespace_returns_none_if_namespace_does_not_exist() { + let manager = create_kv_config_manager(); + let kv_list = manager.get_kv_list_by_namespace(&"non_existent_namespace".into()); + assert!(kv_list.is_none()); + } + + #[test] + fn get_kvconfig_returns_value_if_key_exists() { + let mut manager = create_kv_config_manager(); + manager.put_kv_config("namespace".into(), "key".into(), "value".into()); + let value = manager.get_kvconfig(&"namespace".into(), &"key".into()); + assert_eq!(value, Some("value".into())); + } + + #[test] + fn get_kvconfig_returns_none_if_key_does_not_exist() { + let manager = create_kv_config_manager(); + let value = manager.get_kvconfig(&"namespace".into(), &"non_existent_key".into()); + assert!(value.is_none()); + } +} diff --git a/rocketmq-namesrv/src/processor.rs b/rocketmq-namesrv/src/processor.rs index b84dd5a1..0be17e69 100644 --- a/rocketmq-namesrv/src/processor.rs +++ b/rocketmq-namesrv/src/processor.rs @@ -30,6 +30,8 @@ use crate::processor::default_request_processor::DefaultRequestProcessor; mod client_request_processor; pub mod default_request_processor; +const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG"; + #[derive(Clone)] pub struct NameServerRequestProcessor { pub(crate) client_request_processor: ArcMut, diff --git a/rocketmq-namesrv/src/processor/client_request_processor.rs b/rocketmq-namesrv/src/processor/client_request_processor.rs index 85b17231..f1ccb26c 100644 --- a/rocketmq-namesrv/src/processor/client_request_processor.rs +++ b/rocketmq-namesrv/src/processor/client_request_processor.rs @@ -20,6 +20,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use cheetah_string::CheetahString; use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig; use rocketmq_common::common::FAQUrl; use rocketmq_common::TimeUtils; @@ -31,24 +32,26 @@ use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequ use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; +use rocketmq_rust::ArcMut; use tracing::warn; use crate::kvconfig::kvconfig_mananger::KVConfigManager; +use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG; use crate::route::route_info_manager::RouteInfoManager; pub struct ClientRequestProcessor { route_info_manager: Arc>, - namesrv_config: Arc, + namesrv_config: ArcMut, need_check_namesrv_ready: AtomicBool, startup_time_millis: u64, - kvconfig_manager: Arc>, + kvconfig_manager: KVConfigManager, } impl ClientRequestProcessor { pub fn new( route_info_manager: Arc>, - namesrv_config: Arc, - kvconfig_manager: Arc>, + namesrv_config: ArcMut, + kvconfig_manager: KVConfigManager, ) -> Self { Self { route_info_manager, @@ -94,10 +97,10 @@ impl ClientRequestProcessor { } if self.namesrv_config.order_message_enable { //get kv config - let order_topic_config = self - .kvconfig_manager - .read() - .get_kvconfig("ORDER_TOPIC_CONFIG", request_header.topic.clone()); + let order_topic_config = self.kvconfig_manager.get_kvconfig( + &CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG), + &request_header.topic, + ); topic_route_data.order_topic_conf = order_topic_config; }; /*let standard_json_only = request_header.accept_standard_json_only.unwrap_or(false); diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index c9007294..39895054 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -56,12 +56,13 @@ use rocketmq_remoting::protocol::RemotingSerializable; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; use tracing::warn; +use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG; use crate::route::route_info_manager::RouteInfoManager; use crate::KVConfigManager; pub struct DefaultRequestProcessor { route_info_manager: Arc>, - kvconfig_manager: Arc>, + kvconfig_manager: KVConfigManager, } impl DefaultRequestProcessor { @@ -111,7 +112,7 @@ impl DefaultRequestProcessor { ///implementation put KV config impl DefaultRequestProcessor { - fn put_kv_config(&self, request: RemotingCommand) -> RemotingCommand { + fn put_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand { let request_header = request .decode_command_custom_header::() .unwrap(); @@ -120,12 +121,13 @@ impl DefaultRequestProcessor { return RemotingCommand::create_response_command_with_code( RemotingSysResponseCode::SystemError, ) - .set_remark("namespace or key is empty"); + .set_remark(CheetahString::from_static_str("namespace or key is empty")); } - self.kvconfig_manager.write().put_kv_config( - request_header.namespace.as_str(), - request_header.key.as_str(), - request_header.value.as_str(), + + self.kvconfig_manager.put_kv_config( + request_header.namespace.clone(), + request_header.key.clone(), + request_header.value.clone(), ); RemotingCommand::create_response_command() } @@ -135,10 +137,9 @@ impl DefaultRequestProcessor { .decode_command_custom_header::() .unwrap(); - let value = self.kvconfig_manager.read().get_kvconfig( - request_header.namespace.as_str(), - request_header.key.as_str(), - ); + let value = self + .kvconfig_manager + .get_kvconfig(&request_header.namespace, &request_header.key); if value.is_some() { return RemotingCommand::create_response_command() @@ -147,20 +148,17 @@ impl DefaultRequestProcessor { RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::SystemError) .set_remark(format!( "No config item, Namespace: {} Key: {}", - request_header.namespace.as_str(), - request_header.key.as_str() + request_header.namespace, request_header.key )) } - fn delete_kv_config(&self, request: RemotingCommand) -> RemotingCommand { + fn delete_kv_config(&mut self, request: RemotingCommand) -> RemotingCommand { let request_header = request .decode_command_custom_header::() .unwrap(); - self.kvconfig_manager.write().delete_kv_config( - request_header.namespace.as_str(), - request_header.key.as_str(), - ); + self.kvconfig_manager + .delete_kv_config(&request_header.namespace, &request_header.key); RemotingCommand::create_response_command() } @@ -203,7 +201,7 @@ impl DefaultRequestProcessor { impl DefaultRequestProcessor { pub fn new( route_info_manager: Arc>, - kvconfig_manager: Arc>, + kvconfig_manager: KVConfigManager, ) -> Self { Self { route_info_manager, @@ -213,7 +211,7 @@ impl DefaultRequestProcessor { } impl DefaultRequestProcessor { fn process_register_broker( - &self, + &mut self, remote_addr: SocketAddr, request: RemotingCommand, ) -> RemotingCommand { @@ -269,14 +267,14 @@ impl DefaultRequestProcessor { } if self .kvconfig_manager - .read() .namesrv_config .return_order_topic_config_to_broker { - if let Some(value) = self - .kvconfig_manager - .write() - .get_kv_list_by_namespace("ORDER_TOPIC_CONFIG") + if let Some(value) = + self.kvconfig_manager + .get_kv_list_by_namespace(&CheetahString::from_static_str( + NAMESPACE_ORDER_TOPIC_CONFIG, + )) { response_command = response_command.set_body(value); } @@ -411,8 +409,7 @@ impl DefaultRequestProcessor { .unwrap(); let value = self .kvconfig_manager - .write() - .get_kv_list_by_namespace(request_header.namespace.as_str()); + .get_kv_list_by_namespace(&request_header.namespace); if let Some(value) = value { return RemotingCommand::create_response_command().set_body(value); } diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index 17f6f039..56f70d0a 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -77,14 +77,14 @@ pub struct RouteInfoManager { pub(crate) broker_live_table: BrokerLiveTable, pub(crate) filter_server_table: FilterServerTable, pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable, - pub(crate) namesrv_config: Arc, + pub(crate) namesrv_config: ArcMut, pub(crate) remoting_client: ArcMut, } #[allow(private_interfaces)] impl RouteInfoManager { pub fn new( - namesrv_config: Arc, + namesrv_config: ArcMut, remoting_client: ArcMut, ) -> Self { RouteInfoManager {