Skip to content

Commit

Permalink
[ISSUE #1773]šŸ¤”Optimize RebalanceImpl#try_query_assignment method logiā€¦
Browse files Browse the repository at this point in the history
ā€¦c and simply codešŸ”„ (#1774)
  • Loading branch information
mxsm authored Dec 14, 2024
1 parent b541d9b commit 0a21a03
Showing 1 changed file with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ where
let topics = sub_table.keys().cloned().collect::<Vec<CheetahString>>();
drop(sub_table);
for topic in &topics {
//try_query_assignment unimplemented
if !self.client_rebalance(topic) && self.try_query_assignment(topic).await {
//pop consumer
if !self.get_rebalance_result_from_broker(topic, is_order).await {
balanced = false;
}
Expand All @@ -138,6 +138,23 @@ where
}
}

/// Attempts to query the assignment for a given topic.
///
/// This function checks if the topic is already present in the client or broker rebalance
/// tables. If not, it queries the assignment from the broker using the allocation strategy.
///
/// # Arguments
///
/// * `topic` - A reference to a `CheetahString` representing the topic to query.
///
/// # Returns
///
/// A `bool` indicating whether the assignment query was successful.
///
/// # Errors
///
/// This function logs errors if the allocation strategy is not set or if the query assignment
/// fails.
async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool {
let topic_client_rebalance = self.topic_client_rebalance.read().await;
if topic_client_rebalance.contains_key(topic) {
Expand All @@ -152,11 +169,11 @@ where
let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy {
CheetahString::from_static_str(strategy.get_name())
} else {
CheetahString::from_static_str("unknown")
error!("tryQueryAssignment error, allocateMessageQueueStrategy is None.");
return false;
};
let mut retry_times = 0;
for _ in 0..TIMEOUT_CHECK_TIMES {
retry_times += 1;

for retry_times in 1..=TIMEOUT_CHECK_TIMES {
match self
.client_instance
.as_mut()
Expand All @@ -166,7 +183,7 @@ where
self.consumer_group.as_ref().unwrap(),
&strategy_name,
self.message_model.unwrap(),
QUERY_ASSIGNMENT_TIMEOUT,
QUERY_ASSIGNMENT_TIMEOUT / (TIMEOUT_CHECK_TIMES as u64) * (retry_times as u64),
)
.await
{
Expand All @@ -186,13 +203,11 @@ where
},
}
}

if retry_times >= TIMEOUT_CHECK_TIMES {
let mut topic_client_rebalance = self.topic_client_rebalance.write().await;
topic_client_rebalance.insert(topic.clone(), topic.clone());
return false;
}
true
self.topic_client_rebalance
.write()
.await
.insert(topic.clone(), topic.clone());
false
}

async fn truncate_message_queue_not_my_topic(&self) {
Expand Down

0 comments on commit 0a21a03

Please # to comment.