Skip to content

Commit

Permalink
[ISSUE #2378]🔥Implement RocketmqDefaultClient shutdown method🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 23, 2025
1 parent 438ff1f commit e3fa011
Showing 1 changed file with 44 additions and 21 deletions.
65 changes: 44 additions & 21 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use rand::Rng;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::ArcMut;
use rocketmq_rust::WeakArcMut;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::time;
use tracing::debug;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
namesrv_index: Arc<AtomicI32>,
client_runtime: Arc<RocketMQRuntime>,
client_runtime: Option<RocketMQRuntime>,
processor: PR,
tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
}
Expand All @@ -77,7 +78,7 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
namesrv_addr_choosed: ArcMut::new(Default::default()),
available_namesrv_addr_set: ArcMut::new(Default::default()),
namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
client_runtime: Arc::new(RocketMQRuntime::new_multi(10, "client-thread")),
client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),

Check warning on line 81 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L81

Added line #L81 was not covered by tests
processor,
tx,
}
Expand Down Expand Up @@ -241,17 +242,33 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for Rocketmq
async fn start(&self, this: WeakArcMut<Self>) {
if let Some(client) = this.upgrade() {
let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
self.client_runtime.get_handle().spawn(async move {
loop {
client.scan_available_name_srv().await;
time::sleep(Duration::from_millis(connect_timeout_millis)).await;
}
});
self.client_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {

Check warning on line 249 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L245-L249

Added lines #L245 - L249 were not covered by tests
loop {
client.scan_available_name_srv().await;
time::sleep(Duration::from_millis(connect_timeout_millis)).await;

Check warning on line 252 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L251-L252

Added lines #L251 - L252 were not covered by tests
}
});

Check warning on line 254 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L254

Added line #L254 was not covered by tests
}
}

fn shutdown(&mut self) {
todo!()
if let Some(rt) = self.client_runtime.take() {
rt.shutdown();
}
let connection_tables = self.connection_tables.clone();
tokio::task::block_in_place(move || {
Handle::current().block_on(async move {
connection_tables.lock().await.clear();
});
});
self.namesrv_addr_list.clear();
self.available_namesrv_addr_set.clear();

info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");

Check warning on line 271 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L259-L271

Added lines #L259 - L271 were not covered by tests
}

fn register_rpc_hook(&mut self, hook: Arc<Box<dyn RPCHook>>) {
Expand Down Expand Up @@ -337,6 +354,8 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
Some(mut client) => {
match self
.client_runtime
.as_ref()
.unwrap()

Check warning on line 358 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L357-L358

Added lines #L357 - L358 were not covered by tests
.get_handle()
.spawn(async move {
time::timeout(Duration::from_millis(timeout_millis), async move {
Expand Down Expand Up @@ -371,18 +390,22 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
error!("get client failed");
}
Some(mut client) => {
self.client_runtime.get_handle().spawn(async move {
match time::timeout(Duration::from_millis(timeout_millis), async move {
let mut request = request;
request.mark_oneway_rpc_ref();
client.send(request).await
})
.await
{
Ok(_) => Ok(()),
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
}
});
self.client_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
match time::timeout(Duration::from_millis(timeout_millis), async move {
let mut request = request;
request.mark_oneway_rpc_ref();
client.send(request).await
})
.await

Check warning on line 403 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L393-L403

Added lines #L393 - L403 were not covered by tests
{
Ok(_) => Ok(()),
Err(err) => Err(RemotingError::RemoteError(err.to_string())),

Check warning on line 406 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L405-L406

Added lines #L405 - L406 were not covered by tests
}
});

Check warning on line 408 in rocketmq-remoting/src/clients/rocketmq_default_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L408

Added line #L408 was not covered by tests
}
}
}
Expand Down

0 comments on commit e3fa011

Please # to comment.