Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #1106]Fix When topic not create the client can not consume #1108

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub const TAG: &str = "*";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();
//rocketmq_common::log::init_logger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don't disable logger initialization in examples.

Commenting out the logger initialization reduces visibility into potential issues, especially for users trying to debug topic-related problems. Examples should demonstrate best practices including proper logging setup.

-    //rocketmq_common::log::init_logger();
+    rocketmq_common::log::init_logger();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
//rocketmq_common::log::init_logger();
rocketmq_common::log::init_logger();


// create a producer builder with default configuration
let builder = DefaultMQPushConsumer::builder();
Expand All @@ -59,6 +59,7 @@ impl MessageListenerConcurrently for MyMessageListener {
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
info!("Receive message: {:?}", msg);
println!("Receive message: {:?}", msg);
}
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@
}
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) {

Check warning on line 1320 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1320

Added line #L1320 was not covered by tests
let sub_table = self.rebalance_impl.get_subscription_inner();
let sub_table_inner = sub_table.read().await;
if sub_table_inner.contains_key(topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,16 @@ where
}
}
MessageModel::Clustering => {
let topic_sub_cloned = self.topic_subscribe_info_table.clone();
let topic_subscribe_info_table_inner = topic_sub_cloned.read().await;
let mq_set = topic_subscribe_info_table_inner.get(topic);
//get consumer id list from broker
let cid_all = self
.client_instance
.as_mut()
.unwrap()
.find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap())
.await;
let topic_sub_cloned = self.topic_subscribe_info_table.clone();
let topic_subscribe_info_table_inner = topic_sub_cloned.read().await;
let mq_set = topic_subscribe_info_table_inner.get(topic);
if mq_set.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
if let Some(mut sub_rebalance_impl) =
self.sub_rebalance_impl.as_ref().unwrap().upgrade()
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/consumer/mq_consumer_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
///
/// * `topic` - A string slice that holds the name of the topic.
/// * `info` - A reference to a `HashSet` containing `MessageQueue` information.
async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>);
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>);

/// Checks if the subscription information for a given topic needs to be updated asynchronously.
///
Expand Down Expand Up @@ -204,7 +204,7 @@
panic!("default_mqpush_consumer_impl is None");
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) {

Check warning on line 207 in rocketmq-client/src/consumer/mq_consumer_inner.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/mq_consumer_inner.rs#L207

Added line #L207 was not covered by tests
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
return MQConsumerInner::update_topic_subscribe_info(
Expand Down
6 changes: 4 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@
broker_version_table: Arc::new(Default::default()),
send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
});
let instance_clone = instance.clone();
instance.mq_admin_impl.set_client(instance_clone);

Check warning on line 210 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L209-L210

Added lines #L209 - L210 were not covered by tests
let weak_instance = ArcRefCellWrapper::downgrade(&instance);
let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);

Expand Down Expand Up @@ -583,11 +585,11 @@

// Update sub info
{
let mut consumer_table = self.consumer_table.write().await;
let consumer_table = self.consumer_table.read().await;

Check warning on line 588 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L588

Added line #L588 was not covered by tests
if !consumer_table.is_empty() {
let subscribe_info =
topic_route_data2topic_subscribe_info(topic, &topic_route_data);
for (_, value) in consumer_table.iter_mut() {
for (_, value) in consumer_table.iter() {

Check warning on line 592 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L592

Added line #L592 was not covered by tests
value
.update_topic_subscribe_info(topic, &subscribe_info)
.await;
Expand Down
31 changes: 31 additions & 0 deletions rocketmq-client/src/implementation/mq_admin_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,26 @@
use crate::base::client_config::ClientConfig;
use crate::error::MQClientError::MQClientErr;
use crate::factory::mq_client_instance;
use crate::factory::mq_client_instance::MQClientInstance;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::Result;

pub struct MQAdminImpl {
timeout_millis: u64,
client: Option<ArcRefCellWrapper<MQClientInstance>>,
}

impl MQAdminImpl {
pub fn new() -> Self {
MQAdminImpl {
timeout_millis: 60000,
client: None,

Check warning on line 37 in rocketmq-client/src/implementation/mq_admin_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_admin_impl.rs#L37

Added line #L37 was not covered by tests
}
}

pub fn set_client(&mut self, client: ArcRefCellWrapper<MQClientInstance>) {
self.client = Some(client);
}

Check warning on line 43 in rocketmq-client/src/implementation/mq_admin_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_admin_impl.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
}

impl MQAdminImpl {
Expand Down Expand Up @@ -93,6 +100,30 @@
}

pub async fn max_offset(&mut self, mq: &MessageQueue) -> Result<i64> {
let client = self.client.as_mut().expect("client is None");
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let mut broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
if broker_addr.is_none() {
client
.update_topic_route_info_from_name_server_topic(mq.get_topic())
.await;
let broker_name = client.get_broker_name_from_message_queue(mq).await;
broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
}
if let Some(ref broker_addr) = broker_addr {
let offset = client
.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.get_max_offset(broker_addr, mq, self.timeout_millis)
.await?;
return Ok(offset);
}

Comment on lines +103 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Handle potential None values without panics

Using .expect() on Option types can lead to panics if the value is None. It's better to handle None cases gracefully by returning an appropriate error.

Apply the following changes to handle None values:

-            let client = self.client.as_mut().expect("client is None");
+            let client = match self.client.as_mut() {
+                Some(client) => client,
+                None => {
+                    return Err(MQClientErr(
+                        -1,
+                        "MQClientInstance is not initialized".to_string(),
+                    ));
+                }
+            };

...

-                    .as_mut()
-                    .expect("mq_client_api_impl is None")
+                    .as_mut().ok_or_else(|| MQClientErr(
+                        -1,
+                        "MQClientAPIImpl is not initialized".to_string(),
+                    ))?

...

-            unimplemented!("max_offset")
+            Err(MQClientErr(
+                -1,
+                "Failed to retrieve broker address; cannot compute max offset".to_string(),
+            ))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let client = self.client.as_mut().expect("client is None");
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let mut broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
if broker_addr.is_none() {
client
.update_topic_route_info_from_name_server_topic(mq.get_topic())
.await;
let broker_name = client.get_broker_name_from_message_queue(mq).await;
broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
}
if let Some(ref broker_addr) = broker_addr {
let offset = client
.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.get_max_offset(broker_addr, mq, self.timeout_millis)
.await?;
return Ok(offset);
}
let client = match self.client.as_mut() {
Some(client) => client,
None => {
return Err(MQClientErr(
-1,
"MQClientInstance is not initialized".to_string(),
));
}
};
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let mut broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
if broker_addr.is_none() {
client
.update_topic_route_info_from_name_server_topic(mq.get_topic())
.await;
let broker_name = client.get_broker_name_from_message_queue(mq).await;
broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
}
if let Some(ref broker_addr) = broker_addr {
let offset = client
.mq_client_api_impl
.as_mut().ok_or_else(|| MQClientErr(
-1,
"MQClientAPIImpl is not initialized".to_string(),
))?
.get_max_offset(broker_addr, mq, self.timeout_millis)
.await?;
return Ok(offset);
}
Err(MQClientErr(
-1,
"Failed to retrieve broker address; cannot compute max offset".to_string(),
))

unimplemented!("max_offset")
}
pub async fn search_offset(&mut self, mq: &MessageQueue, timestamp: u64) -> Result<i64> {
Expand Down
49 changes: 49 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
use rocketmq_remoting::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader;
use rocketmq_remoting::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
Expand All @@ -68,6 +70,7 @@
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::remoting::RemotingService;
use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader;
use rocketmq_remoting::rpc::topic_request_header::TopicRequestHeader;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_remoting::runtime::RPCHook;
use tracing::error;
Expand Down Expand Up @@ -1147,4 +1150,50 @@
.await;
Ok(())
}

pub async fn get_max_offset(

Check warning on line 1154 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1154

Added line #L1154 was not covered by tests
&mut self,
addr: &str,
message_queue: &MessageQueue,
timeout_millis: u64,
) -> Result<i64> {

Check warning on line 1159 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1159

Added line #L1159 was not covered by tests
let request_header = GetMaxOffsetRequestHeader {
topic: message_queue.get_topic().to_string(),
queue_id: message_queue.get_queue_id(),
committed: false,
topic_request_header: Some(TopicRequestHeader {
rpc_request_header: Some(RpcRequestHeader {
broker_name: Some(message_queue.get_broker_name().to_string()),
..Default::default()
}),
lo: None,
}),
};

let request =
RemotingCommand::create_request_command(RequestCode::GetMaxOffset, request_header);

let response = self
.remoting_client
.invoke_async(
Some(mix_all::broker_vip_channel(
self.client_config.vip_channel_enabled,
addr,
)),
request,
timeout_millis,
)
.await?;
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetMaxOffsetResponseHeader>()
.expect("decode error");
return Ok(response_header.offset);
Comment on lines +1188 to +1191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace expect() with proper error handling.

Using expect() can cause panics in production. Consider using map_err() to convert the decode error into a proper MQClientError.

-            let response_header = response
-                .decode_command_custom_header::<GetMaxOffsetResponseHeader>()
-                .expect("decode error");
-            return Ok(response_header.offset);
+            let response_header = response
+                .decode_command_custom_header::<GetMaxOffsetResponseHeader>()
+                .map_err(|e| MQBrokerError(
+                    response.code(),
+                    format!("Failed to decode response header: {}", e),
+                    addr.to_string(),
+                ))?;
+            return Ok(response_header.offset);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let response_header = response
.decode_command_custom_header::<GetMaxOffsetResponseHeader>()
.expect("decode error");
return Ok(response_header.offset);
let response_header = response
.decode_command_custom_header::<GetMaxOffsetResponseHeader>()
.map_err(|e| MQBrokerError(
response.code(),
format!("Failed to decode response header: {}", e),
addr.to_string(),
))?;
return Ok(response_header.offset);

}
Err(MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
addr.to_string(),
))
}

Check warning on line 1198 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1198

Added line #L1198 was not covered by tests
}
Loading