-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #1106]Fix When topic not create the client can not consume #1108
Conversation
WalkthroughThe pull request introduces several changes across multiple files in the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (8)
rocketmq-client/examples/quickstart/consumer.rs (3)
Line range hint
41-44
: Add error handling for topic subscription.Given that this PR addresses issues with non-existent topics, the example should demonstrate proper error handling for topic subscription failures.
let mut consumer = builder .consumer_group(CONSUMER_GROUP.to_string()) .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) .build(); - consumer.subscribe(TOPIC, "*")?; + match consumer.subscribe(TOPIC, "*") { + Ok(_) => info!("Successfully subscribed to topic {}", TOPIC), + Err(e) => { + eprintln!("Failed to subscribe to topic {}: {}", TOPIC, e); + // In a real application, you might want to: + // 1. Implement retry logic + // 2. Check if topic exists + // 3. Create topic if missing (with appropriate permissions) + return Err(e); + } + };
61-62
: Remove redundant println statement.The message is already being logged using the
info!
macro. Adding aprintln!
statement creates duplicate output and doesn't follow logging best practices.info!("Receive message: {:?}", msg); - println!("Receive message: {:?}", msg);
Line range hint
56-64
: Enhance error handling in message consumption.Since this PR addresses consumption issues, the example should demonstrate robust error handling and logging during message consumption.
fn consume_message( &self, msgs: &[&MessageExt], _context: &ConsumeConcurrentlyContext, ) -> Result<ConsumeConcurrentlyStatus> { for msg in msgs { - info!("Receive message: {:?}", msg); - println!("Receive message: {:?}", msg); + match msg.get_topic() { + Ok(topic) if topic.is_empty() => { + warn!("Received message with empty topic: {:?}", msg); + return Ok(ConsumeConcurrentlyStatus::ReconsumeLater); + } + Ok(topic) => { + info!("Processing message from topic {}: {:?}", topic, msg); + } + Err(e) => { + error!("Failed to get topic from message: {}", e); + return Ok(ConsumeConcurrentlyStatus::ReconsumeLater); + } + } } Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) }rocketmq-client/src/consumer/mq_consumer_inner.rs (1)
Line range hint
207-219
: Critical: Fix mutability mismatch in implementation.The implementation incorrectly uses
as_mut()
while the trait specifies an immutable&self
reference. This mismatch could lead to race conditions and undefined behavior in concurrent scenarios.Apply this fix:
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) { if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl { - if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() { + if let Some(default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() { return MQConsumerInner::update_topic_subscribe_info( - default_mqpush_consumer_impl.as_mut(), + default_mqpush_consumer_impl.as_ref(), topic, info, ) .await; } } panic!("default_mqpush_consumer_impl is None"); }This change:
- Removes the
mut
binding as it's not needed- Uses
as_ref()
instead ofas_mut()
to maintain immutability- Aligns with the trait's immutable contract
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (2)
Line range hint
340-349
: Improve error handling for non-existent topics.The code now properly handles non-existent topics by:
- Checking if the topic doesn't exist
- Notifying the rebalance implementation
- Logging a warning message
However, the condition
!topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX)
suggests that retry topics are handled differently. This special case should be documented.Add a comment explaining why retry topics are handled differently:
if mq_set.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { + // Skip message queue change notification for retry topics as they are created dynamically if let Some(mut sub_rebalance_impl) =
Line range hint
350-357
: Consider enhancing error logging for consumer ID list retrieval failure.While the warning message is helpful, it could be more informative by including the reason for the failure.
- warn!( - "doRebalance, {}, {}, get consumer id list failed.", - self.consumer_group.as_ref().unwrap(), - topic - ); + warn!( + "doRebalance, {}, {}, get consumer id list failed. This may occur if the topic doesn't exist or if there are connectivity issues.", + self.consumer_group.as_ref().unwrap(), + topic + );rocketmq-client/src/implementation/mq_client_api_impl.rs (1)
1160-1171
: Consider documenting the committed flag.The
committed
field in the request header is hardcoded tofalse
without explanation. Consider adding a comment explaining the significance of this flag or making it a parameter if it needs to be configurable.rocketmq-client/src/factory/mq_client_instance.rs (1)
Line range hint
588-596
: LGTM! Performance improvement in consumer table access.Changed from write lock to read lock on consumer_table, which should reduce lock contention in concurrent scenarios. The immutable access pattern is correct since topic subscription updates don't require mutable access to the consumer table.
Consider monitoring the following metrics to validate the performance improvement:
- Lock contention rates
- Topic subscription update latency
- Consumer group rebalance times
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (7)
rocketmq-client/examples/quickstart/consumer.rs
(2 hunks)rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs
(1 hunks)rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
(1 hunks)rocketmq-client/src/consumer/mq_consumer_inner.rs
(2 hunks)rocketmq-client/src/factory/mq_client_instance.rs
(2 hunks)rocketmq-client/src/implementation/mq_admin_impl.rs
(2 hunks)rocketmq-client/src/implementation/mq_client_api_impl.rs
(3 hunks)
🔇 Additional comments (12)
rocketmq-client/src/consumer/mq_consumer_inner.rs (1)
70-70
: LGTM! Improved thread safety with immutable self.
The change from &mut self
to &self
enhances concurrent access capabilities, which is particularly beneficial for async operations. This could help resolve race conditions during topic creation and subscription updates.
Let's verify the thread safety implications:
✅ Verification successful
Let me analyze the thread safety implications of changing update_topic_subscribe_info
from &mut self
to &self
.
From the search results:
- The method is part of the
MQConsumerInnerLocal
trait and is used in consumer implementations - The codebase shows extensive use of proper synchronization primitives:
RocketMQTokioMutex
,RocketMQTokioRwLock
for async lockingparking_lot::Mutex
,parking_lot::RwLock
for sync locking- Atomic types for lock-free operations
Let me verify the actual implementation to ensure thread safety:
Let me try a different search pattern to find the implementation:
I can now make a determination based on the implementation details found. The change from &mut self
to &self
is safe because:
-
The actual implementation in
DefaultMQPushConsumerImpl
already uses proper interior mutability withRwLock
:async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) { let sub_table = self.rebalance_impl.get_subscription_inner(); let sub_table_inner = sub_table.read().await; // ... let mut guard = self.rebalance_impl.rebalance_impl_inner.topic_subscribe_info_table.write().await; guard.insert(topic.to_string(), info.clone()); }
-
The implementation uses proper async locking mechanisms:
- Uses
.read()
for shared access to subscription table - Uses
.write()
for exclusive access to topic info table - All locks are properly acquired and released through RAII guards
- Uses
-
The change aligns with Rust's async patterns by allowing shared references for concurrent operations while maintaining thread safety through interior mutability.
LGTM! The change to &self
is thread-safe and improves concurrency.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any concurrent access patterns or locks that might be affected by this change
rg -A 3 "update_topic_subscribe_info.*?async" --type rust
# Look for any potential deadlock scenarios in concurrent topic updates
rg -B 2 "lock|mutex|rwlock" --type rust
Length of output: 209422
Script:
#!/bin/bash
# Find implementation of update_topic_subscribe_info
ast-grep --pattern 'impl $_ {
$$$
async fn update_topic_subscribe_info(&self, $_, $_) {
$$$
}
$$$
}'
Length of output: 114
Script:
#!/bin/bash
# Search for update_topic_subscribe_info implementation with context
rg -B2 -A10 "update_topic_subscribe_info.*?self.*?topic.*?info" --type rust
Length of output: 4024
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (1)
337-339
: Verify thread safety of topic subscription info access.
The reordering of operations to fetch mq_set
before its usage is logically sound. However, since we're dealing with shared state through RwLock
, we should ensure that holding the read lock for topic_subscribe_info_table
doesn't create potential deadlock scenarios when combined with other locks in the system.
Consider implementing a timeout mechanism for the read lock acquisition to prevent potential deadlocks. This could be achieved using tokio::time::timeout
.
✅ Verification successful
Based on the search results, I can now provide a final response:
The read lock usage on topic_subscribe_info_table is safe and follows proper patterns
The code's handling of topic_subscribe_info_table
shows a safe implementation pattern:
- The lock is properly scoped within the async context
- The read lock is used consistently and briefly for the get operation
- No nested locks or complex lock patterns are observed that could lead to deadlocks
- The table is initialized with reasonable capacity and wrapped in Arc for thread-safe sharing
The reordering of operations in the reviewed code follows the established pattern seen elsewhere in the codebase, where the read lock is acquired, used immediately, and implicitly released when the guard goes out of scope.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other places where topic_subscribe_info_table is locked
ast-grep --pattern 'topic_subscribe_info_table.$$$lock().$$$await'
Length of output: 68
Script:
#!/bin/bash
# Let's try a different pattern to find RwLock usage on topic_subscribe_info_table
ast-grep --pattern 'topic_subscribe_info_table.$$$read().$$$await'
# Also search for write locks
ast-grep --pattern 'topic_subscribe_info_table.$$$write().$$$await'
# Let's also see the broader context of RwLock usage in this file
rg "RwLock" -A 5 -B 5 "rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs"
# And check for any potential deadlock prevention mechanisms
rg "timeout|deadlock" "rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs"
Length of output: 3076
Script:
#!/bin/bash
# Let's search for the actual usage patterns of these RwLocks in the code
rg -A 5 "topic_subscribe_info_table" "rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs"
# Let's also check if there are any other methods that might interact with this lock
rg "async fn.*topic" "rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs"
# And look for any error handling patterns around the lock usage
rg "Error|Result" -A 2 "rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs"
Length of output: 2830
rocketmq-client/src/implementation/mq_client_api_impl.rs (2)
50-51
: LGTM: Imports are properly organized.
The new imports for GetMaxOffset related headers are correctly placed and necessary for the implementation.
1154-1159
: LGTM: Method signature is well-defined.
The async method signature properly captures the required parameters for retrieving the max offset of a message queue.
rocketmq-client/src/factory/mq_client_instance.rs (1)
209-210
: LGTM! Verify circular reference handling.
The circular reference between MQClientInstance and MQAdminImpl is implemented correctly using ArcRefCellWrapper for thread-safe reference management.
Let's verify the circular reference implementation:
✅ Verification successful
Let me gather more information about the circular reference implementation by checking the struct definitions and their fields.
Let me try a different approach to check the struct definitions and their field types.
Circular reference implementation is safe and properly managed
The implementation correctly handles the circular reference:
- MQAdminImpl stores the client reference as
Option<ArcRefCellWrapper<MQClientInstance>>
, allowing for proper initialization and cleanup - The reference is managed through
ArcRefCellWrapper
, which provides thread-safe reference counting - The optional field ensures proper memory management as it can be explicitly set to None when needed
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for proper circular reference handling between MQClientInstance and MQAdminImpl
# Test: Search for Drop trait implementations to ensure proper cleanup
# Check for Drop implementations
ast-grep --pattern 'impl Drop for MQClientInstance'
ast-grep --pattern 'impl Drop for MQAdminImpl'
# Check for weak reference usage to prevent memory leaks
rg -A 5 'ArcRefCellWrapper::downgrade'
Length of output: 10028
Script:
#!/bin/bash
# Check MQClientInstance and MQAdminImpl struct definitions
ast-grep --pattern 'struct MQClientInstance'
ast-grep --pattern 'struct MQAdminImpl'
# Check for weak references in these structs
rg -A 5 'WeakRefCellWrapper<MQClientInstance>'
rg -A 5 'WeakRefCellWrapper<MQAdminImpl>'
Length of output: 181
Script:
#!/bin/bash
# Search for struct definitions with more context
rg -B 2 -A 10 "^pub struct MQClientInstance"
rg -B 2 -A 10 "^pub struct MQAdminImpl"
# Search for client field in MQAdminImpl
rg -B 2 -A 2 "client.*MQClientInstance" rocketmq-client/src/implementation/mq_admin_impl.rs
Length of output: 2724
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (3)
1320-1320
: LGTM: Improved thread safety with immutable self reference.
The change from &mut self
to &self
is appropriate as the method only reads data, allowing for better concurrency through shared references.
Line range hint 1321-1328
: LGTM: Thread-safe implementation with proper locking.
The implementation correctly:
- Uses read-write locks for thread safety
- Only updates when the topic exists in the subscription table
- Properly clones the data to minimize lock duration
Line range hint 1320-1328
: Verify if additional changes are needed for the topic creation issue.
While these changes improve thread safety, they may not fully address the issue where clients cannot consume when a topic hasn't been created. Let's verify the complete fix.
rocketmq-client/src/implementation/mq_admin_impl.rs (4)
24-24
: Addition of MQClientInstance
import is appropriate
The added import of MQClientInstance
is necessary for the new functionality and is correctly specified.
30-30
: Added client
field to MQAdminImpl
The client
field has been appropriately added to store an optional ArcRefCellWrapper<MQClientInstance>
.
37-37
: Ensure client
is properly initialized before use
Initializing client
to None
is acceptable, but since methods like max_offset
rely on client
being initialized, please ensure that client
is set before these methods are called to prevent potential panics.
41-43
: Addition of set_client
method
The set_client
method correctly allows for setting the client
field.
@@ -33,7 +33,7 @@ pub const TAG: &str = "*"; | |||
#[rocketmq::main] | |||
pub async fn main() -> Result<()> { | |||
//init logger | |||
rocketmq_common::log::init_logger(); | |||
//rocketmq_common::log::init_logger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't disable logger initialization in examples.
Commenting out the logger initialization reduces visibility into potential issues, especially for users trying to debug topic-related problems. Examples should demonstrate best practices including proper logging setup.
- //rocketmq_common::log::init_logger();
+ rocketmq_common::log::init_logger();
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
//rocketmq_common::log::init_logger(); | |
rocketmq_common::log::init_logger(); |
let response_header = response | ||
.decode_command_custom_header::<GetMaxOffsetResponseHeader>() | ||
.expect("decode error"); | ||
return Ok(response_header.offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace expect() with proper error handling.
Using expect()
can cause panics in production. Consider using map_err()
to convert the decode error into a proper MQClientError
.
- let response_header = response
- .decode_command_custom_header::<GetMaxOffsetResponseHeader>()
- .expect("decode error");
- return Ok(response_header.offset);
+ let response_header = response
+ .decode_command_custom_header::<GetMaxOffsetResponseHeader>()
+ .map_err(|e| MQBrokerError(
+ response.code(),
+ format!("Failed to decode response header: {}", e),
+ addr.to_string(),
+ ))?;
+ return Ok(response_header.offset);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let response_header = response | |
.decode_command_custom_header::<GetMaxOffsetResponseHeader>() | |
.expect("decode error"); | |
return Ok(response_header.offset); | |
let response_header = response | |
.decode_command_custom_header::<GetMaxOffsetResponseHeader>() | |
.map_err(|e| MQBrokerError( | |
response.code(), | |
format!("Failed to decode response header: {}", e), | |
addr.to_string(), | |
))?; | |
return Ok(response_header.offset); |
let client = self.client.as_mut().expect("client is None"); | ||
let broker_name = client.get_broker_name_from_message_queue(mq).await; | ||
let mut broker_addr = client | ||
.find_broker_address_in_publish(broker_name.as_str()) | ||
.await; | ||
if broker_addr.is_none() { | ||
client | ||
.update_topic_route_info_from_name_server_topic(mq.get_topic()) | ||
.await; | ||
let broker_name = client.get_broker_name_from_message_queue(mq).await; | ||
broker_addr = client | ||
.find_broker_address_in_publish(broker_name.as_str()) | ||
.await; | ||
} | ||
if let Some(ref broker_addr) = broker_addr { | ||
let offset = client | ||
.mq_client_api_impl | ||
.as_mut() | ||
.expect("mq_client_api_impl is None") | ||
.get_max_offset(broker_addr, mq, self.timeout_millis) | ||
.await?; | ||
return Ok(offset); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle potential None
values without panics
Using .expect()
on Option
types can lead to panics if the value is None
. It's better to handle None
cases gracefully by returning an appropriate error.
Apply the following changes to handle None
values:
- let client = self.client.as_mut().expect("client is None");
+ let client = match self.client.as_mut() {
+ Some(client) => client,
+ None => {
+ return Err(MQClientErr(
+ -1,
+ "MQClientInstance is not initialized".to_string(),
+ ));
+ }
+ };
...
- .as_mut()
- .expect("mq_client_api_impl is None")
+ .as_mut().ok_or_else(|| MQClientErr(
+ -1,
+ "MQClientAPIImpl is not initialized".to_string(),
+ ))?
...
- unimplemented!("max_offset")
+ Err(MQClientErr(
+ -1,
+ "Failed to retrieve broker address; cannot compute max offset".to_string(),
+ ))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let client = self.client.as_mut().expect("client is None"); | |
let broker_name = client.get_broker_name_from_message_queue(mq).await; | |
let mut broker_addr = client | |
.find_broker_address_in_publish(broker_name.as_str()) | |
.await; | |
if broker_addr.is_none() { | |
client | |
.update_topic_route_info_from_name_server_topic(mq.get_topic()) | |
.await; | |
let broker_name = client.get_broker_name_from_message_queue(mq).await; | |
broker_addr = client | |
.find_broker_address_in_publish(broker_name.as_str()) | |
.await; | |
} | |
if let Some(ref broker_addr) = broker_addr { | |
let offset = client | |
.mq_client_api_impl | |
.as_mut() | |
.expect("mq_client_api_impl is None") | |
.get_max_offset(broker_addr, mq, self.timeout_millis) | |
.await?; | |
return Ok(offset); | |
} | |
let client = match self.client.as_mut() { | |
Some(client) => client, | |
None => { | |
return Err(MQClientErr( | |
-1, | |
"MQClientInstance is not initialized".to_string(), | |
)); | |
} | |
}; | |
let broker_name = client.get_broker_name_from_message_queue(mq).await; | |
let mut broker_addr = client | |
.find_broker_address_in_publish(broker_name.as_str()) | |
.await; | |
if broker_addr.is_none() { | |
client | |
.update_topic_route_info_from_name_server_topic(mq.get_topic()) | |
.await; | |
let broker_name = client.get_broker_name_from_message_queue(mq).await; | |
broker_addr = client | |
.find_broker_address_in_publish(broker_name.as_str()) | |
.await; | |
} | |
if let Some(ref broker_addr) = broker_addr { | |
let offset = client | |
.mq_client_api_impl | |
.as_mut().ok_or_else(|| MQClientErr( | |
-1, | |
"MQClientAPIImpl is not initialized".to_string(), | |
))? | |
.get_max_offset(broker_addr, mq, self.timeout_millis) | |
.await?; | |
return Ok(offset); | |
} | |
Err(MQClientErr( | |
-1, | |
"Failed to retrieve broker address; cannot compute max offset".to_string(), | |
)) |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1108 +/- ##
==========================================
- Coverage 19.77% 19.76% -0.01%
==========================================
Files 436 436
Lines 36380 36390 +10
==========================================
Hits 7193 7193
- Misses 29187 29197 +10 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1106
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Refactor