Skip to content

Commit

Permalink
[ISSUE #1239]🔥Optimize KVConfigManager code⚡️
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Nov 20, 2024
1 parent 3998417 commit 4a094be
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 109 deletions.
10 changes: 4 additions & 6 deletions rocketmq-namesrv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ pub struct Builder {
}

struct NameServerRuntime {
name_server_config: Arc<NamesrvConfig>,
name_server_config: ArcMut<NamesrvConfig>,
tokio_client_config: Arc<TokioClientConfig>,
server_config: Arc<ServerConfig>,
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
kvconfig_manager: KVConfigManager,
name_server_runtime: Option<RocketMQRuntime>,
remoting_client: ArcMut<RocketmqDefaultClient>,
}
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Builder {
}

pub fn build(self) -> NameServerBootstrap {
let name_server_config = Arc::new(self.name_server_config.unwrap());
let name_server_config = ArcMut::new(self.name_server_config.unwrap_or_default());

Check warning on line 162 in rocketmq-namesrv/src/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/bootstrap.rs#L162

Added line #L162 was not covered by tests
let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
let tokio_client_config = Arc::new(TokioClientConfig::default());
let remoting_client = ArcMut::new(RocketmqDefaultClient::new(
Expand All @@ -176,9 +176,7 @@ impl Builder {
name_server_config.clone(),
remoting_client.clone(),
))),
kvconfig_manager: Arc::new(parking_lot::RwLock::new(KVConfigManager::new(
name_server_config,
))),
kvconfig_manager: KVConfigManager::new(name_server_config),

Check warning on line 179 in rocketmq-namesrv/src/bootstrap.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/bootstrap.rs#L179

Added line #L179 was not covered by tests
name_server_runtime: Some(runtime),
remoting_client,
},
Expand Down
198 changes: 132 additions & 66 deletions rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@ use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;
use parking_lot::RwLock;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::FileUtils;
use rocketmq_remoting::protocol::body::kv_table::KVTable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_rust::ArcMut;
use tracing::error;
use tracing::info;

use crate::kvconfig::KVConfigSerializeWrapper;

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct KVConfigManager {
pub(crate) config_table: HashMap<
CheetahString, /* Namespace */
HashMap<CheetahString /* Key */, CheetahString /* Value */>,
pub(crate) config_table: Arc<
RwLock<
HashMap<
CheetahString, /* Namespace */
HashMap<CheetahString /* Key */, CheetahString /* Value */>,
>,
>,
>,

pub(crate) namesrv_config: Arc<NamesrvConfig>,
pub(crate) namesrv_config: ArcMut<NamesrvConfig>,
}

impl KVConfigManager {
Expand All @@ -48,9 +53,9 @@ impl KVConfigManager {
/// # Returns
///
/// A new `KVConfigManager` instance.
pub fn new(namesrv_config: Arc<NamesrvConfig>) -> KVConfigManager {
pub fn new(namesrv_config: ArcMut<NamesrvConfig>) -> KVConfigManager {
KVConfigManager {
config_table: HashMap::new(),
config_table: Arc::new(RwLock::new(HashMap::new())),
namesrv_config,
}
}
Expand All @@ -62,8 +67,8 @@ impl KVConfigManager {
/// A reference to the configuration table.
pub fn get_config_table(
&self,
) -> &HashMap<CheetahString, HashMap<CheetahString, CheetahString>> {
&self.config_table
) -> HashMap<CheetahString, HashMap<CheetahString, CheetahString>> {
self.config_table.read().clone()
}

/// Gets a reference to the Namesrv configuration.
Expand All @@ -83,18 +88,18 @@ impl KVConfigManager {
if let Ok(content) = result {
let wrapper =
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
if let Some(ref config_table) = wrapper.config_table {
for (namespace, config) in config_table {
self.config_table.insert(namespace.clone(), config.clone());
}
if let Some(config_table) = wrapper.config_table {
let mut table = self.config_table.write();
table.extend(config_table);

Check warning on line 93 in rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs#L91-L93

Added lines #L91 - L93 were not covered by tests
info!("load KV config success");
}
}
}

/// Persists the current key-value configurations to a file.
pub fn persist(&mut self) {
let wrapper = KVConfigSerializeWrapper::new_with_config_table(self.config_table.clone());
let wrapper =
KVConfigSerializeWrapper::new_with_config_table(self.config_table.write().clone());
let content = serde_json::to_string(&wrapper).unwrap();

let result = FileUtils::string_to_file(
Expand All @@ -109,96 +114,157 @@ impl KVConfigManager {
/// Adds or updates a key-value configuration.
pub fn put_kv_config(
&mut self,
namespace: impl Into<CheetahString>,
key: impl Into<CheetahString>,
value: impl Into<CheetahString>,
namespace: CheetahString,
key: CheetahString,
value: CheetahString,
) {
let namespace_inner = namespace.into();
if !self.config_table.contains_key(namespace_inner.as_str()) {
self.config_table
.insert(namespace_inner.clone(), HashMap::new());
}

let key = key.into();
let value = value.into();
let pre_value = self
.config_table
.get_mut(namespace_inner.as_str())
.unwrap()
.insert(key.clone(), value.clone());
let mut config_table = self.config_table.write();
let namespace_entry = config_table.entry(namespace.clone()).or_default();
let pre_value = namespace_entry.insert(key.clone(), value.clone());
match pre_value {
None => {
info!(
"putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace_inner, key, value
namespace, key, value
)
}
Some(_) => {
info!(
"putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace_inner, key, value
namespace, key, value
)
}
}
drop(config_table);
self.persist();
}

/// Deletes a key-value configuration.
pub fn delete_kv_config(&mut self, namespace: impl Into<String>, key: impl Into<String>) {
let namespace_inner = namespace.into();
if !self.config_table.contains_key(namespace_inner.as_str()) {
pub fn delete_kv_config(&mut self, namespace: &CheetahString, key: &CheetahString) {
let mut config_table = self.config_table.write();
if !config_table.contains_key(namespace) {
return;
}

let key = key.into();
let pre_value = self
.config_table
.get_mut(namespace_inner.as_str())
.unwrap()
.remove(key.as_str());
let pre_value = config_table.get_mut(namespace).unwrap().remove(key);
match pre_value {
None => {}
Some(value) => {
info!(
"deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace_inner, key, value
namespace, key, value
)
}
}
drop(config_table);
self.persist();
}

/// Gets the key-value list for a specific namespace.
pub fn get_kv_list_by_namespace(
&mut self,
namespace: impl Into<CheetahString>,
) -> Option<Vec<u8>> {
let namespace_inner = namespace.into();
match self.config_table.get(namespace_inner.as_str()) {
None => None,
Some(kv_table) => {
let table = KVTable {
table: kv_table.clone(),
};
Some(table.encode())
}
}
pub fn get_kv_list_by_namespace(&self, namespace: &CheetahString) -> Option<Vec<u8>> {
let config_table = self.config_table.read();
config_table.get(namespace).map(|kv_table| {
let table = KVTable {
table: kv_table.clone(),
};
table.encode()
})
}

// Gets the value for a specific key in a namespace.
pub fn get_kvconfig(
&self,
namespace: impl Into<CheetahString>,
key: impl Into<CheetahString>,
namespace: &CheetahString,
key: &CheetahString,
) -> Option<CheetahString> {
match self.config_table.get(namespace.into().as_str()) {
let config_table = self.config_table.read();
match config_table.get(namespace) {
None => None,
Some(kv_table) => {
if let Some(value) = kv_table.get(key.into().as_str()) {
return Some(value.clone());
}
None
}
Some(kv_table) => kv_table.get(key).cloned(),
}
}
}

#[cfg(test)]
mod tests {
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_rust::ArcMut;

use super::*;

fn create_kv_config_manager() -> KVConfigManager {
let namesrv_config = ArcMut::new(NamesrvConfig::default());
KVConfigManager::new(namesrv_config)
}

#[test]
fn new_kv_config_manager_initializes_empty_config_table() {
let manager = create_kv_config_manager();
assert!(manager.get_config_table().is_empty());
}

#[test]
fn put_kv_config_creates_new_entry() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
let config_table = manager.get_config_table();
assert_eq!(config_table["namespace"]["key"], "value");
}

#[test]
fn put_kv_config_updates_existing_entry() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
manager.put_kv_config("namespace".into(), "key".into(), "new_value".into());
let config_table = manager.get_config_table();
assert_eq!(config_table["namespace"]["key"], "new_value");
}

#[test]
fn delete_kv_config_removes_entry() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
manager.delete_kv_config(&"namespace".into(), &"key".into());
let config_table = manager.get_config_table();
assert!(config_table["namespace"].get("key").is_none());
}

#[test]
fn delete_kv_config_does_nothing_if_key_does_not_exist() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
manager.delete_kv_config(&"namespace".into(), &"non_existent_key".into());
let config_table = manager.get_config_table();
assert_eq!(config_table["namespace"]["key"], "value");
}

#[test]
fn get_kv_list_by_namespace_returns_encoded_list() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
let kv_list = manager.get_kv_list_by_namespace(&"namespace".into());
assert!(kv_list.is_some());
}

#[test]
fn get_kv_list_by_namespace_returns_none_if_namespace_does_not_exist() {
let manager = create_kv_config_manager();
let kv_list = manager.get_kv_list_by_namespace(&"non_existent_namespace".into());
assert!(kv_list.is_none());
}

#[test]
fn get_kvconfig_returns_value_if_key_exists() {
let mut manager = create_kv_config_manager();
manager.put_kv_config("namespace".into(), "key".into(), "value".into());
let value = manager.get_kvconfig(&"namespace".into(), &"key".into());
assert_eq!(value, Some("value".into()));
}

#[test]
fn get_kvconfig_returns_none_if_key_does_not_exist() {
let manager = create_kv_config_manager();
let value = manager.get_kvconfig(&"namespace".into(), &"non_existent_key".into());
assert!(value.is_none());
}
}
2 changes: 2 additions & 0 deletions rocketmq-namesrv/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::processor::default_request_processor::DefaultRequestProcessor;
mod client_request_processor;
pub mod default_request_processor;

const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";

#[derive(Clone)]
pub struct NameServerRequestProcessor {
pub(crate) client_request_processor: ArcMut<ClientRequestProcessor>,
Expand Down
19 changes: 11 additions & 8 deletions rocketmq-namesrv/src/processor/client_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use cheetah_string::CheetahString;
use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
use rocketmq_common::common::FAQUrl;
use rocketmq_common::TimeUtils;
Expand All @@ -31,24 +32,26 @@ use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequ
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_rust::ArcMut;
use tracing::warn;

use crate::kvconfig::kvconfig_mananger::KVConfigManager;
use crate::processor::NAMESPACE_ORDER_TOPIC_CONFIG;
use crate::route::route_info_manager::RouteInfoManager;

pub struct ClientRequestProcessor {
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
namesrv_config: Arc<NamesrvConfig>,
namesrv_config: ArcMut<NamesrvConfig>,
need_check_namesrv_ready: AtomicBool,
startup_time_millis: u64,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
kvconfig_manager: KVConfigManager,
}

impl ClientRequestProcessor {
pub fn new(
route_info_manager: Arc<parking_lot::RwLock<RouteInfoManager>>,
namesrv_config: Arc<NamesrvConfig>,
kvconfig_manager: Arc<parking_lot::RwLock<KVConfigManager>>,
namesrv_config: ArcMut<NamesrvConfig>,
kvconfig_manager: KVConfigManager,

Check warning on line 54 in rocketmq-namesrv/src/processor/client_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/client_request_processor.rs#L53-L54

Added lines #L53 - L54 were not covered by tests
) -> Self {
Self {
route_info_manager,
Expand Down Expand Up @@ -94,10 +97,10 @@ impl ClientRequestProcessor {
}
if self.namesrv_config.order_message_enable {
//get kv config
let order_topic_config = self
.kvconfig_manager
.read()
.get_kvconfig("ORDER_TOPIC_CONFIG", request_header.topic.clone());
let order_topic_config = self.kvconfig_manager.get_kvconfig(
&CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
&request_header.topic,
);

Check warning on line 103 in rocketmq-namesrv/src/processor/client_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/client_request_processor.rs#L100-L103

Added lines #L100 - L103 were not covered by tests
topic_route_data.order_topic_conf = order_topic_config;
};
/*let standard_json_only = request_header.accept_standard_json_only.unwrap_or(false);
Expand Down
Loading

0 comments on commit 4a094be

Please # to comment.