From c34e43de3cd04a78aaf667579a3fdbee1a5b49da Mon Sep 17 00:00:00 2001 From: mxsm Date: Wed, 27 Nov 2024 23:55:50 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1386]=F0=9F=9A=80Add=20MQAdminExt=20f?= =?UTF-8?q?or=20rocketmq-tools=20crate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 8 + rocketmq-tools/Cargo.toml | 14 + rocketmq-tools/src/admin.rs | 2 + rocketmq-tools/src/admin/mq_admin_ext.rs | 663 +++++++++++++++++ .../src/admin/mq_admin_ext_async.rs | 681 ++++++++++++++++++ rocketmq-tools/src/lib.rs | 5 + rocketmq-tools/src/tools_error.rs | 35 + 7 files changed, 1408 insertions(+) create mode 100644 rocketmq-tools/src/admin/mq_admin_ext.rs create mode 100644 rocketmq-tools/src/admin/mq_admin_ext_async.rs create mode 100644 rocketmq-tools/src/tools_error.rs diff --git a/Cargo.lock b/Cargo.lock index 9effef9d..c4f58047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2126,6 +2126,14 @@ dependencies = [ [[package]] name = "rocketmq-tools" version = "0.4.0" +dependencies = [ + "cheetah-string", + "rocketmq-common", + "rocketmq-remoting", + "rocketmq-rust", + "thiserror", + "trait-variant", +] [[package]] name = "ron" diff --git a/rocketmq-tools/Cargo.toml b/rocketmq-tools/Cargo.toml index 827efe54..5cdc0081 100644 --- a/rocketmq-tools/Cargo.toml +++ b/rocketmq-tools/Cargo.toml @@ -12,3 +12,17 @@ readme = "README.md" description = "Rust implementation of Apache RocketMQ tools" [dependencies] +rocketmq-common = { workspace = true } +rocketmq-remoting = { workspace = true } +rocketmq-rust = { workspace = true } + + +cheetah-string = { workspace = true } +thiserror = { workspace = true } + +trait-variant.workspace = true + +[features] +default = ["async"] +async = [] +sync = [] \ No newline at end of file diff --git a/rocketmq-tools/src/admin.rs b/rocketmq-tools/src/admin.rs index a0d345b3..0c0b4cc3 100644 --- a/rocketmq-tools/src/admin.rs +++ b/rocketmq-tools/src/admin.rs @@ -15,3 +15,5 @@ * limitations under the License. */ pub(crate) mod common; +pub(crate) mod mq_admin_ext; +pub(crate) mod mq_admin_ext_async; diff --git a/rocketmq-tools/src/admin/mq_admin_ext.rs b/rocketmq-tools/src/admin/mq_admin_ext.rs new file mode 100644 index 00000000..0a3f1505 --- /dev/null +++ b/rocketmq-tools/src/admin/mq_admin_ext.rs @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#![allow(unused_imports)] +use std::collections::HashMap; +use std::collections::HashSet; + +use cheetah_string::CheetahString; +use rocketmq_common::common::base::plain_access_config::PlainAccessConfig; +use rocketmq_common::common::config::TopicConfig; +use rocketmq_common::common::message::message_enum::MessageRequestMode; +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats; +use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable; +use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo; +use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection; +use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo; +use rocketmq_remoting::protocol::body::group_list::GroupList; +use rocketmq_remoting::protocol::body::kv_table::KVTable; +use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection; +use rocketmq_remoting::protocol::body::topic::topic_list::TopicList; +use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper; +use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; +use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail; +use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig; + +use crate::admin::common::admin_tool_result::AdminToolResult; +use crate::Result; + +#[cfg(feature = "sync")] +#[allow(dead_code)] +pub trait MQAdminExt { + fn start(&self) -> Result<()>; + fn shutdown(&self); + fn add_broker_to_container( + &self, + broker_container_addr: CheetahString, + broker_config: CheetahString, + ) -> Result<()>; + + fn remove_broker_from_container( + &self, + broker_container_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_id: u64, + ) -> Result<()>; + + fn update_broker_config( + &self, + broker_addr: CheetahString, + properties: HashMap, + ) -> Result<()>; + + fn get_broker_config( + &self, + broker_addr: CheetahString, + ) -> Result>; + + fn create_and_update_topic_config( + &self, + addr: CheetahString, + config: TopicConfig, + ) -> Result<()>; + + fn create_and_update_topic_config_list( + &self, + addr: CheetahString, + topic_config_list: Vec, + ) -> Result<()>; + + fn create_and_update_plain_access_config( + &self, + addr: CheetahString, + config: PlainAccessConfig, + ) -> Result<()>; + + fn delete_plain_access_config( + &self, + addr: CheetahString, + access_key: CheetahString, + ) -> Result<()>; + + fn update_global_white_addr_config( + &self, + addr: CheetahString, + global_white_addrs: CheetahString, + acl_file_full_path: Option, + ) -> Result<()>; + + fn examine_broker_cluster_acl_version_info(&self, addr: CheetahString) + -> Result; + + fn create_and_update_subscription_group_config( + &self, + addr: CheetahString, + config: SubscriptionGroupConfig, + ) -> Result<()>; + + fn create_and_update_subscription_group_config_list( + &self, + broker_addr: CheetahString, + configs: Vec, + ) -> Result<()>; + + fn examine_subscription_group_config( + &self, + addr: CheetahString, + group: CheetahString, + ) -> Result; + + fn examine_topic_stats( + &self, + topic: CheetahString, + broker_addr: Option, + ) -> Result; + + fn examine_topic_stats_concurrent( + &self, + topic: CheetahString, + ) -> AdminToolResult; + + fn fetch_all_topic_list(&self) -> Result; + + fn fetch_topics_by_cluster(&self, cluster_name: CheetahString) -> Result; + + fn fetch_broker_runtime_stats(&self, broker_addr: CheetahString) -> Result; + + fn examine_consume_stats( + &self, + consumer_group: CheetahString, + topic: Option, + cluster_name: Option, + broker_addr: Option, + timeout_millis: Option, + ) -> Result; + + /*fn check_rocksdb_cq_write_progress( + &self, + broker_addr: CheetahString, + topic: CheetahString, + ) -> Result;*/ + + fn examine_broker_cluster_info(&self) -> Result; + + fn examine_topic_route_info(&self, topic: CheetahString) -> Result; + + fn examine_consumer_connection_info( + &self, + consumer_group: CheetahString, + broker_addr: Option, + ) -> Result; + + fn examine_producer_connection_info( + &self, + producer_group: CheetahString, + topic: CheetahString, + ) -> Result; + + /* fn get_all_producer_info( + &self, + broker_addr: CheetahString, + ) -> Result;*/ + + fn get_name_server_address_list(&self) -> Vec; + + fn wipe_write_perm_of_broker( + &self, + namesrv_addr: CheetahString, + broker_name: CheetahString, + ) -> Result; + + fn add_write_perm_of_broker( + &self, + namesrv_addr: CheetahString, + broker_name: CheetahString, + ) -> Result; + + fn put_kv_config(&self, namespace: CheetahString, key: CheetahString, value: CheetahString); + + fn get_kv_config(&self, namespace: CheetahString, key: CheetahString) -> Result; + + fn get_kv_list_by_namespace(&self, namespace: CheetahString) -> Result; + + fn delete_topic(&self, topic_name: CheetahString, cluster_name: CheetahString) -> Result<()>; + + fn delete_topic_in_broker( + &self, + addrs: HashSet, + topic: CheetahString, + ) -> Result<()>; + + /*fn delete_topic_in_broker_concurrent( + &self, + addrs: HashSet, + topic: CheetahString, + ) -> AdminToolResult;*/ + + fn delete_topic_in_name_server( + &self, + addrs: HashSet, + cluster_name: Option, + topic: CheetahString, + ) -> Result<()>; + + fn delete_subscription_group( + &self, + addr: CheetahString, + group_name: CheetahString, + remove_offset: Option, + ) -> Result<()>; + + fn create_and_update_kv_config( + &self, + namespace: CheetahString, + key: CheetahString, + value: CheetahString, + ) -> Result<()>; + + fn delete_kv_config(&self, namespace: CheetahString, key: CheetahString) -> Result<()>; + + /*fn reset_offset_by_timestamp_old( + &self, + consumer_group: CheetahString, + topic: CheetahString, + timestamp: u64, + force: bool, + ) -> Result>;*/ + + fn reset_offset_by_timestamp( + &self, + cluster_name: Option, + topic: CheetahString, + group: CheetahString, + timestamp: u64, + is_force: bool, + ) -> Result>; + + fn reset_offset_new( + &self, + consumer_group: CheetahString, + topic: CheetahString, + timestamp: u64, + ) -> Result<()>; + + /*fn reset_offset_new_concurrent( + &self, + group: CheetahString, + topic: CheetahString, + timestamp: u64, + ) -> AdminToolResult;*/ + + fn get_consume_status( + &self, + topic: CheetahString, + group: CheetahString, + client_addr: CheetahString, + ) -> Result>>; + + fn create_or_update_order_conf( + &self, + key: CheetahString, + value: CheetahString, + is_cluster: bool, + ) -> Result<()>; + + fn query_topic_consume_by_who(&self, topic: CheetahString) -> Result; + + fn query_topics_by_consumer(&self, group: CheetahString) -> Result; + + fn query_topics_by_consumer_concurrent( + &self, + group: CheetahString, + ) -> AdminToolResult; + + fn query_subscription( + &self, + group: CheetahString, + topic: CheetahString, + ) -> Result; + + /*fn query_consume_time_span( + &self, + topic: CheetahString, + group: CheetahString, + ) -> Result>; + + fn query_consume_time_span_concurrent( + &self, + topic: CheetahString, + group: CheetahString, + ) -> AdminToolResult>;*/ + + fn clean_expired_consumer_queue( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + fn delete_expired_commit_log( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + fn clean_unused_topic( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + fn get_consumer_running_info( + &self, + consumer_group: CheetahString, + client_id: CheetahString, + jstack: bool, + metrics: Option, + ) -> Result; + + fn consume_message_directly( + &self, + consumer_group: CheetahString, + client_id: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + fn consume_message_directly_ext( + &self, + cluster_name: CheetahString, + consumer_group: CheetahString, + client_id: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + /*fn message_track_detail( + &self, + msg: MessageExt, + ) -> Result>; + + fn message_track_detail_concurrent( + &self, + msg: MessageExt, + ) -> AdminToolResult>;*/ + + fn clone_group_offset( + &self, + src_group: CheetahString, + dest_group: CheetahString, + topic: CheetahString, + is_offline: bool, + ) -> Result<()>; + + /*fn view_broker_stats_data( + &self, + broker_addr: CheetahString, + stats_name: CheetahString, + stats_key: CheetahString, + ) -> Result;*/ + + fn get_cluster_list(&self, topic: String) -> Result>; + + /*fn fetch_consume_stats_in_broker( + &self, + broker_addr: CheetahString, + is_order: bool, + timeout_millis: u64, + ) -> Result;*/ + + fn get_topic_cluster_list(&self, topic: String) -> Result>; + + /*fn get_all_subscription_group( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result;*/ + + /*fn get_user_subscription_group( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result;*/ + + fn get_all_topic_config( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result; + + fn get_user_topic_config( + &self, + broker_addr: CheetahString, + special_topic: bool, + timeout_millis: u64, + ) -> Result; + + fn update_consume_offset( + &self, + broker_addr: CheetahString, + consume_group: CheetahString, + mq: MessageQueue, + offset: u64, + ) -> Result<()>; + + fn update_name_server_config( + &self, + properties: HashMap, + name_servers: Vec, + ) -> Result<()>; + + fn get_name_server_config( + &self, + name_servers: Vec, + ) -> Result>>; + + /*fn query_consume_queue( + &self, + broker_addr: CheetahString, + topic: CheetahString, + queue_id: i32, + index: u64, + count: i32, + consumer_group: CheetahString, + ) -> Result;*/ + + fn resume_check_half_message( + &self, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + fn set_message_request_mode( + &self, + broker_addr: CheetahString, + topic: CheetahString, + consumer_group: CheetahString, + mode: MessageRequestMode, + pop_work_group_size: i32, + timeout_millis: u64, + ) -> Result<()>; + + fn reset_offset_by_queue_id( + &self, + broker_addr: CheetahString, + consumer_group: CheetahString, + topic_name: CheetahString, + queue_id: i32, + reset_offset: u64, + ) -> Result<()>; + + fn examine_topic_config( + &self, + addr: CheetahString, + topic: CheetahString, + ) -> Result; + + fn create_static_topic( + &self, + addr: CheetahString, + default_topic: CheetahString, + topic_config: TopicConfig, + mapping_detail: TopicQueueMappingDetail, + force: bool, + ) -> Result<()>; + + /*fn update_and_get_group_read_forbidden( + &self, + broker_addr: CheetahString, + group_name: CheetahString, + topic_name: CheetahString, + readable: Option, + ) -> Result; + + fn query_message( + &self, + cluster_name: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + fn get_broker_ha_status(&self, broker_addr: CheetahString) -> Result; + + fn get_in_sync_state_data( + &self, + controller_address: CheetahString, + brokers: Vec, + ) -> Result; + + fn get_broker_epoch_cache( + &self, + broker_addr: CheetahString, + ) -> Result; + + fn get_controller_meta_data( + &self, + controller_addr: CheetahString, + ) -> Result;*/ + + fn reset_master_flush_offset( + &self, + broker_addr: CheetahString, + master_flush_offset: u64, + ) -> Result<()>; + + fn get_controller_config( + &self, + controller_servers: Vec, + ) -> Result>>; + + fn update_controller_config( + &self, + properties: HashMap, + controllers: Vec, + ) -> Result<()>; + + /*fn elect_master( + &self, + controller_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_id: Option, + ) -> Result<(ElectMasterResponseHeader, BrokerMemberGroup)>;*/ + + fn clean_controller_broker_data( + &self, + controller_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_controller_ids_to_clean: Option, + is_clean_living_broker: bool, + ) -> Result<()>; + + fn update_cold_data_flow_ctr_group_config( + &self, + broker_addr: CheetahString, + properties: HashMap, + ) -> Result<()>; + + fn remove_cold_data_flow_ctr_group_config( + &self, + broker_addr: CheetahString, + consumer_group: CheetahString, + ) -> Result<()>; + + fn get_cold_data_flow_ctr_info(&self, broker_addr: CheetahString) -> Result; + + fn set_commit_log_read_ahead_mode( + &self, + broker_addr: CheetahString, + mode: CheetahString, + ) -> Result; + + fn create_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + password: CheetahString, + user_type: CheetahString, + ) -> Result<()>; + + /*fn create_user_with_info( + &self, + broker_addr: CheetahString, + user_info: UserInfo, + ) -> Result<()>;*/ + + fn update_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + password: CheetahString, + user_type: CheetahString, + user_status: CheetahString, + ) -> Result<()>; + + /* fn update_user_with_info( + &self, + broker_addr: CheetahString, + user_info: UserInfo, + ) -> Result<()>;*/ + + fn delete_user(&self, broker_addr: CheetahString, username: CheetahString) -> Result<()>; + + /*fn get_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + ) -> Result;*/ + + /* fn list_users( + &self, + broker_addr: CheetahString, + filter: CheetahString, + ) -> Result>;*/ + + fn create_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resources: Vec, + actions: Vec, + source_ips: Vec, + decision: CheetahString, + ) -> Result<()>; + + /*fn create_acl_with_info( + &self, + broker_addr: CheetahString, + acl_info: AclInfo, + ) -> Result<()>;*/ + + fn update_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resources: Vec, + actions: Vec, + source_ips: Vec, + decision: CheetahString, + ) -> Result<()>; + + /*fn update_acl_with_info( + &self, + broker_addr: CheetahString, + acl_info: AclInfo, + ) -> Result<()>;*/ + + fn delete_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resource: CheetahString, + ) -> Result<()>; + + /*fn get_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + ) -> Result;*/ + + /*fn list_acl( + &self, + broker_addr: CheetahString, + subject_filter: CheetahString, + resource_filter: CheetahString, + ) -> Result>;*/ +} diff --git a/rocketmq-tools/src/admin/mq_admin_ext_async.rs b/rocketmq-tools/src/admin/mq_admin_ext_async.rs new file mode 100644 index 00000000..5d9d9b48 --- /dev/null +++ b/rocketmq-tools/src/admin/mq_admin_ext_async.rs @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::collections::HashSet; + +use cheetah_string::CheetahString; +use rocketmq_common::common::base::plain_access_config::PlainAccessConfig; +use rocketmq_common::common::config::TopicConfig; +use rocketmq_common::common::message::message_enum::MessageRequestMode; +use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats; +use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable; +use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo; +use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection; +use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo; +use rocketmq_remoting::protocol::body::group_list::GroupList; +use rocketmq_remoting::protocol::body::kv_table::KVTable; +use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection; +use rocketmq_remoting::protocol::body::topic::topic_list::TopicList; +use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper; +use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; +use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData; +use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail; +use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig; + +use crate::admin::common::admin_tool_result::AdminToolResult; +use crate::Result; + +#[cfg(feature = "async")] +#[allow(dead_code)] +#[trait_variant::make(MQAdminExt: Send)] +pub trait MQAdminExtLocal: Sync { + async fn start(&self) -> Result<()>; + async fn shutdown(&self); + async fn add_broker_to_container( + &self, + broker_container_addr: CheetahString, + broker_config: CheetahString, + ) -> Result<()>; + + async fn remove_broker_from_container( + &self, + broker_container_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_id: u64, + ) -> Result<()>; + + async fn update_broker_config( + &self, + broker_addr: CheetahString, + properties: HashMap, + ) -> Result<()>; + + async fn get_broker_config( + &self, + broker_addr: CheetahString, + ) -> Result>; + + async fn create_and_update_topic_config( + &self, + addr: CheetahString, + config: TopicConfig, + ) -> Result<()>; + + async fn create_and_update_topic_config_list( + &self, + addr: CheetahString, + topic_config_list: Vec, + ) -> Result<()>; + + async fn create_and_update_plain_access_config( + &self, + addr: CheetahString, + config: PlainAccessConfig, + ) -> Result<()>; + + async fn delete_plain_access_config( + &self, + addr: CheetahString, + access_key: CheetahString, + ) -> Result<()>; + + async fn update_global_white_addr_config( + &self, + addr: CheetahString, + global_white_addrs: CheetahString, + acl_file_full_path: Option, + ) -> Result<()>; + + async fn examine_broker_cluster_acl_version_info( + &self, + addr: CheetahString, + ) -> Result; + + async fn create_and_update_subscription_group_config( + &self, + addr: CheetahString, + config: SubscriptionGroupConfig, + ) -> Result<()>; + + async fn create_and_update_subscription_group_config_list( + &self, + broker_addr: CheetahString, + configs: Vec, + ) -> Result<()>; + + async fn examine_subscription_group_config( + &self, + addr: CheetahString, + group: CheetahString, + ) -> Result; + + async fn examine_topic_stats( + &self, + topic: CheetahString, + broker_addr: Option, + ) -> Result; + + async fn examine_topic_stats_concurrent( + &self, + topic: CheetahString, + ) -> AdminToolResult; + + async fn fetch_all_topic_list(&self) -> Result; + + async fn fetch_topics_by_cluster(&self, cluster_name: CheetahString) -> Result; + + async fn fetch_broker_runtime_stats(&self, broker_addr: CheetahString) -> Result; + + async fn examine_consume_stats( + &self, + consumer_group: CheetahString, + topic: Option, + cluster_name: Option, + broker_addr: Option, + timeout_millis: Option, + ) -> Result; + + /*async fn check_rocksdb_cq_write_progress( + &self, + broker_addr: CheetahString, + topic: CheetahString, + ) -> Result;*/ + + async fn examine_broker_cluster_info(&self) -> Result; + + async fn examine_topic_route_info(&self, topic: CheetahString) -> Result; + + async fn examine_consumer_connection_info( + &self, + consumer_group: CheetahString, + broker_addr: Option, + ) -> Result; + + async fn examine_producer_connection_info( + &self, + producer_group: CheetahString, + topic: CheetahString, + ) -> Result; + + /* async fn get_all_producer_info( + &self, + broker_addr: CheetahString, + ) -> Result;*/ + + async fn get_name_server_address_list(&self) -> Vec; + + async fn wipe_write_perm_of_broker( + &self, + namesrv_addr: CheetahString, + broker_name: CheetahString, + ) -> Result; + + async fn add_write_perm_of_broker( + &self, + namesrv_addr: CheetahString, + broker_name: CheetahString, + ) -> Result; + + async fn put_kv_config( + &self, + namespace: CheetahString, + key: CheetahString, + value: CheetahString, + ); + + async fn get_kv_config( + &self, + namespace: CheetahString, + key: CheetahString, + ) -> Result; + + async fn get_kv_list_by_namespace(&self, namespace: CheetahString) -> Result; + + async fn delete_topic( + &self, + topic_name: CheetahString, + cluster_name: CheetahString, + ) -> Result<()>; + + async fn delete_topic_in_broker( + &self, + addrs: HashSet, + topic: CheetahString, + ) -> Result<()>; + + /*async fn delete_topic_in_broker_concurrent( + &self, + addrs: HashSet, + topic: CheetahString, + ) -> AdminToolResult;*/ + + async fn delete_topic_in_name_server( + &self, + addrs: HashSet, + cluster_name: Option, + topic: CheetahString, + ) -> Result<()>; + + async fn delete_subscription_group( + &self, + addr: CheetahString, + group_name: CheetahString, + remove_offset: Option, + ) -> Result<()>; + + async fn create_and_update_kv_config( + &self, + namespace: CheetahString, + key: CheetahString, + value: CheetahString, + ) -> Result<()>; + + async fn delete_kv_config(&self, namespace: CheetahString, key: CheetahString) -> Result<()>; + + /*async fn reset_offset_by_timestamp_old( + &self, + consumer_group: CheetahString, + topic: CheetahString, + timestamp: u64, + force: bool, + ) -> Result>;*/ + + async fn reset_offset_by_timestamp( + &self, + cluster_name: Option, + topic: CheetahString, + group: CheetahString, + timestamp: u64, + is_force: bool, + ) -> Result>; + + async fn reset_offset_new( + &self, + consumer_group: CheetahString, + topic: CheetahString, + timestamp: u64, + ) -> Result<()>; + + /*async fn reset_offset_new_concurrent( + &self, + group: CheetahString, + topic: CheetahString, + timestamp: u64, + ) -> AdminToolResult;*/ + + async fn get_consume_status( + &self, + topic: CheetahString, + group: CheetahString, + client_addr: CheetahString, + ) -> Result>>; + + async fn create_or_update_order_conf( + &self, + key: CheetahString, + value: CheetahString, + is_cluster: bool, + ) -> Result<()>; + + async fn query_topic_consume_by_who(&self, topic: CheetahString) -> Result; + + async fn query_topics_by_consumer(&self, group: CheetahString) -> Result; + + async fn query_topics_by_consumer_concurrent( + &self, + group: CheetahString, + ) -> AdminToolResult; + + async fn query_subscription( + &self, + group: CheetahString, + topic: CheetahString, + ) -> Result; + + /*async fn query_consume_time_span( + &self, + topic: CheetahString, + group: CheetahString, + ) -> Result>; + + async fn query_consume_time_span_concurrent( + &self, + topic: CheetahString, + group: CheetahString, + ) -> AdminToolResult>;*/ + + async fn clean_expired_consumer_queue( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + async fn delete_expired_commit_log( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + async fn clean_unused_topic( + &self, + cluster: Option, + addr: Option, + ) -> Result; + + async fn get_consumer_running_info( + &self, + consumer_group: CheetahString, + client_id: CheetahString, + jstack: bool, + metrics: Option, + ) -> Result; + + async fn consume_message_directly( + &self, + consumer_group: CheetahString, + client_id: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + async fn consume_message_directly_ext( + &self, + cluster_name: CheetahString, + consumer_group: CheetahString, + client_id: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + /*async fn message_track_detail( + &self, + msg: MessageExt, + ) -> Result>; + + async fn message_track_detail_concurrent( + &self, + msg: MessageExt, + ) -> AdminToolResult>;*/ + + async fn clone_group_offset( + &self, + src_group: CheetahString, + dest_group: CheetahString, + topic: CheetahString, + is_offline: bool, + ) -> Result<()>; + + /*async fn view_broker_stats_data( + &self, + broker_addr: CheetahString, + stats_name: CheetahString, + stats_key: CheetahString, + ) -> Result;*/ + + async fn get_cluster_list(&self, topic: String) -> Result>; + + /*async fn fetch_consume_stats_in_broker( + &self, + broker_addr: CheetahString, + is_order: bool, + timeout_millis: u64, + ) -> Result;*/ + + async fn get_topic_cluster_list(&self, topic: String) -> Result>; + + /*async fn get_all_subscription_group( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result;*/ + + /*async fn get_user_subscription_group( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result;*/ + + async fn get_all_topic_config( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> Result; + + async fn get_user_topic_config( + &self, + broker_addr: CheetahString, + special_topic: bool, + timeout_millis: u64, + ) -> Result; + + async fn update_consume_offset( + &self, + broker_addr: CheetahString, + consume_group: CheetahString, + mq: MessageQueue, + offset: u64, + ) -> Result<()>; + + async fn update_name_server_config( + &self, + properties: HashMap, + name_servers: Vec, + ) -> Result<()>; + + async fn get_name_server_config( + &self, + name_servers: Vec, + ) -> Result>>; + + /*async fn query_consume_queue( + &self, + broker_addr: CheetahString, + topic: CheetahString, + queue_id: i32, + index: u64, + count: i32, + consumer_group: CheetahString, + ) -> Result;*/ + + async fn resume_check_half_message( + &self, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + async fn set_message_request_mode( + &self, + broker_addr: CheetahString, + topic: CheetahString, + consumer_group: CheetahString, + mode: MessageRequestMode, + pop_work_group_size: i32, + timeout_millis: u64, + ) -> Result<()>; + + async fn reset_offset_by_queue_id( + &self, + broker_addr: CheetahString, + consumer_group: CheetahString, + topic_name: CheetahString, + queue_id: i32, + reset_offset: u64, + ) -> Result<()>; + + async fn examine_topic_config( + &self, + addr: CheetahString, + topic: CheetahString, + ) -> Result; + + async fn create_static_topic( + &self, + addr: CheetahString, + default_topic: CheetahString, + topic_config: TopicConfig, + mapping_detail: TopicQueueMappingDetail, + force: bool, + ) -> Result<()>; + + /*async fn update_and_get_group_read_forbidden( + &self, + broker_addr: CheetahString, + group_name: CheetahString, + topic_name: CheetahString, + readable: Option, + ) -> Result; + + async fn query_message( + &self, + cluster_name: CheetahString, + topic: CheetahString, + msg_id: CheetahString, + ) -> Result; + + async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> Result; + + async fn get_in_sync_state_data( + &self, + controller_address: CheetahString, + brokers: Vec, + ) -> Result; + + async fn get_broker_epoch_cache( + &self, + broker_addr: CheetahString, + ) -> Result; + + async fn get_controller_meta_data( + &self, + controller_addr: CheetahString, + ) -> Result;*/ + + async fn reset_master_flush_offset( + &self, + broker_addr: CheetahString, + master_flush_offset: u64, + ) -> Result<()>; + + async fn get_controller_config( + &self, + controller_servers: Vec, + ) -> Result>>; + + async fn update_controller_config( + &self, + properties: HashMap, + controllers: Vec, + ) -> Result<()>; + + /*async fn elect_master( + &self, + controller_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_id: Option, + ) -> Result<(ElectMasterResponseHeader, BrokerMemberGroup)>;*/ + + async fn clean_controller_broker_data( + &self, + controller_addr: CheetahString, + cluster_name: CheetahString, + broker_name: CheetahString, + broker_controller_ids_to_clean: Option, + is_clean_living_broker: bool, + ) -> Result<()>; + + async fn update_cold_data_flow_ctr_group_config( + &self, + broker_addr: CheetahString, + properties: HashMap, + ) -> Result<()>; + + async fn remove_cold_data_flow_ctr_group_config( + &self, + broker_addr: CheetahString, + consumer_group: CheetahString, + ) -> Result<()>; + + async fn get_cold_data_flow_ctr_info( + &self, + broker_addr: CheetahString, + ) -> Result; + + async fn set_commit_log_read_ahead_mode( + &self, + broker_addr: CheetahString, + mode: CheetahString, + ) -> Result; + + async fn create_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + password: CheetahString, + user_type: CheetahString, + ) -> Result<()>; + + /*async fn create_user_with_info( + &self, + broker_addr: CheetahString, + user_info: UserInfo, + ) -> Result<()>;*/ + + async fn update_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + password: CheetahString, + user_type: CheetahString, + user_status: CheetahString, + ) -> Result<()>; + + /* async fn update_user_with_info( + &self, + broker_addr: CheetahString, + user_info: UserInfo, + ) -> Result<()>;*/ + + async fn delete_user(&self, broker_addr: CheetahString, username: CheetahString) -> Result<()>; + + /*async fn get_user( + &self, + broker_addr: CheetahString, + username: CheetahString, + ) -> Result;*/ + + /* async fn list_users( + &self, + broker_addr: CheetahString, + filter: CheetahString, + ) -> Result>;*/ + + async fn create_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resources: Vec, + actions: Vec, + source_ips: Vec, + decision: CheetahString, + ) -> Result<()>; + + /*async fn create_acl_with_info( + &self, + broker_addr: CheetahString, + acl_info: AclInfo, + ) -> Result<()>;*/ + + async fn update_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resources: Vec, + actions: Vec, + source_ips: Vec, + decision: CheetahString, + ) -> Result<()>; + + /*async fn update_acl_with_info( + &self, + broker_addr: CheetahString, + acl_info: AclInfo, + ) -> Result<()>;*/ + + async fn delete_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + resource: CheetahString, + ) -> Result<()>; + + /*async fn get_acl( + &self, + broker_addr: CheetahString, + subject: CheetahString, + ) -> Result;*/ + + /*async fn list_acl( + &self, + broker_addr: CheetahString, + subject_filter: CheetahString, + resource_filter: CheetahString, + ) -> Result>;*/ +} diff --git a/rocketmq-tools/src/lib.rs b/rocketmq-tools/src/lib.rs index cf3041a8..ad40f686 100644 --- a/rocketmq-tools/src/lib.rs +++ b/rocketmq-tools/src/lib.rs @@ -14,4 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use crate::tools_error::ToolsError; + mod admin; +mod tools_error; + +pub type Result = std::result::Result; diff --git a/rocketmq-tools/src/tools_error.rs b/rocketmq-tools/src/tools_error.rs new file mode 100644 index 00000000..9c3f2fdb --- /dev/null +++ b/rocketmq-tools/src/tools_error.rs @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use thiserror::Error; +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Error)] +pub enum ToolsError { + #[error("MQ client error occurred.")] + MQClientError, + #[error("MQ broker error occurred.")] + MQBrokerError, + #[error("Remoting timeout.")] + RemotingTimeoutError, + #[error("Remoting send request failed.")] + RemotingSendRequestError, + #[error("Remoting connect failed.")] + RemotingConnectError, + #[error("Unsupported encoding.")] + UnsupportedEncodingError, + #[error("Operation interrupted.")] + InterruptedError, +}