Skip to content

Commit

Permalink
[ISSUE #1775]🤡Simply MQClientInstance#query_assignment method code an…
Browse files Browse the repository at this point in the history
…d add doc for method🔥 (#1776)
  • Loading branch information
mxsm authored Dec 14, 2024
1 parent 0a21a03 commit 3abfb47
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,25 @@ impl MQClientInstance {
false
}

/// Queries the assignment for a given topic.
///
/// This function attempts to find the broker address for the specified topic. If the broker
/// address is not found, it updates the topic route information from the name server and
/// retries. If the broker address is found, it queries the assignment from the broker.
///
/// # Arguments
///
/// * `topic` - A reference to a `CheetahString` representing the topic to query.
/// * `consumer_group` - A reference to a `CheetahString` representing the consumer group.
/// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy
/// name.
/// * `message_model` - The message model to use for the query.
/// * `timeout` - The timeout duration for the query.
///
/// # Returns
///
/// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query
/// is successful, or an error if it fails.
pub async fn query_assignment(
&mut self,
topic: &CheetahString,
Expand All @@ -1159,27 +1178,33 @@ impl MQClientInstance {
message_model: MessageModel,
timeout: u64,
) -> Result<Option<HashSet<MessageQueueAssignment>>> {
// Try to find broker address
let mut broker_addr = self.find_broker_addr_by_topic(topic).await;

// If not found, update and retry
if broker_addr.is_none() {
self.update_topic_route_info_from_name_server_topic(topic)
.await;
broker_addr = self.find_broker_addr_by_topic(topic).await;
}
if let Some(broker_addr) = broker_addr {
let client_id = self.client_id.clone();
self.mq_client_api_impl
.as_mut()
.unwrap()
.query_assignment(
&broker_addr,
topic.clone(),
consumer_group.clone(),
strategy_name.clone(),
client_id,
message_model,
timeout,
)
.await
match self.mq_client_api_impl.as_mut() {
Some(api_impl) => {
api_impl
.query_assignment(
&broker_addr,
topic.clone(),
consumer_group.clone(),
strategy_name.clone(),
client_id,
message_model,
timeout,
)
.await
}
None => mq_client_err!("mq_client_api_impl is None"),
}
} else {
Ok(None)
}
Expand Down

0 comments on commit 3abfb47

Please # to comment.