From 617828a9743bda0c73718450843d85da7b6c7553 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 1 Dec 2024 18:16:56 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1484]=E2=99=BB=EF=B8=8FRefactor=20cre?= =?UTF-8?q?ate=20MQBrokerErr=20replace=20with=20client=5Fbroker=5Ferr!=20m?= =?UTF-8?q?acro=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/implementation/mq_client_api_impl.rs | 85 +++++++++---------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 0ae51a9a..ebab1c52 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -84,6 +84,7 @@ use tracing::error; use tracing::warn; use crate::base::client_config::ClientConfig; +use crate::client_broker_err; use crate::client_error::MQBrokerErr; use crate::client_error::MQClientError; use crate::client_error::MQClientError::MQClientBrokerError; @@ -683,13 +684,11 @@ impl MQClientAPIImpl { if ResponseCode::from(response.code()) == ResponseCode::Success { return Ok(response.version()); } - Err(MQClientError::MQClientBrokerError( - MQBrokerErr::new_with_broker( - response.code(), - response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ), - )) + client_broker_err!( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string() + ) } pub async fn check_client_in_broker( @@ -777,13 +776,11 @@ impl MQClientAPIImpl { )); } } - Err(MQClientError::MQClientBrokerError( - MQBrokerErr::new_with_broker( - response.code(), - response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ), - )) + client_broker_err!( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string() + ) } pub async fn update_consumer_offset_oneway( @@ -870,13 +867,11 @@ impl MQClientAPIImpl { } _ => {} } - Err(MQClientError::MQClientBrokerError( - MQBrokerErr::new_with_broker( - response.code(), - response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ), - )) + client_broker_err!( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()), + addr.to_string() + ) } pub async fn pull_message( @@ -972,11 +967,11 @@ impl MQClientAPIImpl { ResponseCode::PullRetryImmediately => PullStatus::NoMatchedMsg, ResponseCode::PullOffsetMoved => PullStatus::OffsetIllegal, _ => { - return Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + return client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } }; let response_header = response @@ -1039,11 +1034,11 @@ impl MQClientAPIImpl { if ResponseCode::from(response.code()) == ResponseCode::Success { Ok(()) } else { - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } } @@ -1070,11 +1065,11 @@ impl MQClientAPIImpl { if ResponseCode::from(response.code()) == ResponseCode::Success { Ok(()) } else { - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } } @@ -1110,11 +1105,11 @@ impl MQClientAPIImpl { if ResponseCode::from(response.code()) == ResponseCode::Success { Ok(()) } else { - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } } } @@ -1153,18 +1148,18 @@ impl MQClientAPIImpl { )) }) } else { - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), "Response body is empty".to_string(), - addr.to_string(), - ))) + addr.to_string() + ) } } else { - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } } @@ -1224,11 +1219,11 @@ impl MQClientAPIImpl { .expect("decode error"); return Ok(response_header.offset); } - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } pub async fn set_message_request_mode( @@ -1310,10 +1305,10 @@ impl MQClientAPIImpl { return Ok(None); } - Err(MQClientBrokerError(MQBrokerErr::new_with_broker( + client_broker_err!( response.code(), response.remark().map_or("".to_string(), |s| s.to_string()), - addr.to_string(), - ))) + addr.to_string() + ) } }