From e3fa01193bf98a0c59b78b2f5d745bd8771154f2 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 23 Jan 2025 06:42:12 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2378]=F0=9F=94=A5Implement=20Rocketmq?= =?UTF-8?q?DefaultClient=20shutdown=20method=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/clients/rocketmq_default_impl.rs | 65 +++++++++++++------ 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs index 185a1c8d..98302905 100644 --- a/rocketmq-remoting/src/clients/rocketmq_default_impl.rs +++ b/rocketmq-remoting/src/clients/rocketmq_default_impl.rs @@ -25,6 +25,7 @@ use rand::Rng; use rocketmq_runtime::RocketMQRuntime; use rocketmq_rust::ArcMut; use rocketmq_rust::WeakArcMut; +use tokio::runtime::Handle; use tokio::sync::Mutex; use tokio::time; use tracing::debug; @@ -56,7 +57,7 @@ pub struct RocketmqDefaultClient { namesrv_addr_choosed: ArcMut>, available_namesrv_addr_set: ArcMut>, namesrv_index: Arc, - client_runtime: Arc, + client_runtime: Option, processor: PR, tx: Option>, } @@ -77,7 +78,7 @@ impl RocketmqDefaultClient { namesrv_addr_choosed: ArcMut::new(Default::default()), available_namesrv_addr_set: ArcMut::new(Default::default()), namesrv_index: Arc::new(AtomicI32::new(init_value_index())), - client_runtime: Arc::new(RocketMQRuntime::new_multi(10, "client-thread")), + client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")), processor, tx, } @@ -241,17 +242,33 @@ impl RemotingService for Rocketmq async fn start(&self, this: WeakArcMut) { if let Some(client) = this.upgrade() { let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64; - self.client_runtime.get_handle().spawn(async move { - loop { - client.scan_available_name_srv().await; - time::sleep(Duration::from_millis(connect_timeout_millis)).await; - } - }); + self.client_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + loop { + client.scan_available_name_srv().await; + time::sleep(Duration::from_millis(connect_timeout_millis)).await; + } + }); } } fn shutdown(&mut self) { - todo!() + if let Some(rt) = self.client_runtime.take() { + rt.shutdown(); + } + let connection_tables = self.connection_tables.clone(); + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + connection_tables.lock().await.clear(); + }); + }); + self.namesrv_addr_list.clear(); + self.available_namesrv_addr_set.clear(); + + info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<"); } fn register_rpc_hook(&mut self, hook: Arc>) { @@ -337,6 +354,8 @@ impl RemotingClient for RocketmqD Some(mut client) => { match self .client_runtime + .as_ref() + .unwrap() .get_handle() .spawn(async move { time::timeout(Duration::from_millis(timeout_millis), async move { @@ -371,18 +390,22 @@ impl RemotingClient for RocketmqD error!("get client failed"); } Some(mut client) => { - self.client_runtime.get_handle().spawn(async move { - match time::timeout(Duration::from_millis(timeout_millis), async move { - let mut request = request; - request.mark_oneway_rpc_ref(); - client.send(request).await - }) - .await - { - Ok(_) => Ok(()), - Err(err) => Err(RemotingError::RemoteError(err.to_string())), - } - }); + self.client_runtime + .as_ref() + .unwrap() + .get_handle() + .spawn(async move { + match time::timeout(Duration::from_millis(timeout_millis), async move { + let mut request = request; + request.mark_oneway_rpc_ref(); + client.send(request).await + }) + .await + { + Ok(_) => Ok(()), + Err(err) => Err(RemotingError::RemoteError(err.to_string())), + } + }); } } }