Skip to content

Commit

Permalink
[ISSUE #2548]🚀 DefaultMQAdminExt add create instance method🚧 (#2549)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Feb 14, 2025
1 parent d300197 commit 35f159e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 4 deletions.
89 changes: 87 additions & 2 deletions rocketmq-tools/src/admin/default_mq_admin_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use cheetah_string::CheetahString;
use rocketmq_client_rust::base::client_config::ClientConfig;
use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
use rocketmq_common::common::config::TopicConfig;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
Expand All @@ -39,23 +42,105 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
use rocketmq_remoting::runtime::RPCHook;
use rocketmq_rust::ArcMut;

use crate::admin::common::admin_tool_result::AdminToolResult;
use crate::admin::default_mq_admin_ext_impl::DefaultMQAdminExtImpl;
use crate::admin::mq_admin_ext_async::MQAdminExt;

const ADMIN_EXT_GROUP: &str = "admin_ext_group";

pub struct DefaultMQAdminExt {
client_config: ArcMut<ClientConfig>,
admin_ext_group: CheetahString,
create_topic_key: CheetahString,
timeout_millis: u64,
timeout_millis: Duration,
default_mqadmin_ext_impl: DefaultMQAdminExtImpl,
}

impl DefaultMQAdminExt {
pub fn new() -> Self {
unimplemented!("DefaultMQAdminExt::new")
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(None, Duration::from_millis(5000)),
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis: Duration::from_millis(5000),
}
}

pub fn with_timeout(timeout_millis: Duration) -> Self {
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(None, timeout_millis),
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis,
}
}

pub fn with_rpc_hook(rpc_hook: impl RPCHook) -> Self {
let rpc_hook_inner: Box<dyn RPCHook> = Box::new(rpc_hook);
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(
Some(Arc::new(rpc_hook_inner)),
Duration::from_millis(5000),
),
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis: Duration::from_millis(5000),
}
}

pub fn with_rpc_hook_and_timeout(rpc_hook: impl RPCHook, timeout_millis: Duration) -> Self {
let rpc_hook_inner: Box<dyn RPCHook> = Box::new(rpc_hook);
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(
Some(Arc::new(rpc_hook_inner)),
timeout_millis,
),
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis,
}
}

pub fn with_admin_ext_group(admin_ext_group: impl Into<CheetahString>) -> Self {
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(None, Duration::from_millis(5000)),
admin_ext_group: admin_ext_group.into(),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis: Duration::from_millis(5000),
}
}

pub fn with_admin_ext_group_and_timeout(
admin_ext_group: impl Into<CheetahString>,
timeout_millis: Duration,
) -> Self {
Self {
client_config: Default::default(),
default_mqadmin_ext_impl: DefaultMQAdminExtImpl::new(None, timeout_millis),
admin_ext_group: admin_ext_group.into(),
create_topic_key: CheetahString::from_static_str(
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
),
timeout_millis,
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions rocketmq-tools/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use cheetah_string::CheetahString;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -72,15 +73,29 @@ lazy_static! {
}

const SOCKS_PROXY_JSON: &str = "socksProxyJson";

const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
pub struct DefaultMQAdminExtImpl {
service_state: ServiceState,
client_instance: Option<ArcMut<MQClientInstance>>,
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
timeout_millis: u64,
timeout_millis: Duration,
kv_namespace_to_delete_list: Vec<CheetahString>,
}

impl DefaultMQAdminExtImpl {
pub fn new(rpc_hook: Option<Arc<Box<dyn RPCHook>>>, timeout_millis: Duration) -> Self {
DefaultMQAdminExtImpl {
service_state: ServiceState::CreateJust,
client_instance: None,
rpc_hook,
timeout_millis,
kv_namespace_to_delete_list: vec![CheetahString::from_static_str(
NAMESPACE_ORDER_TOPIC_CONFIG,
)],
}
}
}

#[allow(unused_variables)]
#[allow(unused_mut)]
#[cfg(feature = "async")]
Expand Down

0 comments on commit 35f159e

Please # to comment.