From 0a21a037abc1cb7c1c6ac99f86a7d8bf2faf8589 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 14 Dec 2024 17:06:44 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1773]=F0=9F=A4=A1Optimize=20Rebalance?= =?UTF-8?q?Impl#try=5Fquery=5Fassignment=20method=20logic=20and=20simply?= =?UTF-8?q?=20code=F0=9F=94=A5=20(#1774)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../re_balance/rebalance_impl.rs | 41 +++++++++++++------ 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 5dbafa7e..e52cc467 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -116,8 +116,8 @@ where let topics = sub_table.keys().cloned().collect::>(); drop(sub_table); for topic in &topics { - //try_query_assignment unimplemented if !self.client_rebalance(topic) && self.try_query_assignment(topic).await { + //pop consumer if !self.get_rebalance_result_from_broker(topic, is_order).await { balanced = false; } @@ -138,6 +138,23 @@ where } } + /// Attempts to query the assignment for a given topic. + /// + /// This function checks if the topic is already present in the client or broker rebalance + /// tables. If not, it queries the assignment from the broker using the allocation strategy. + /// + /// # Arguments + /// + /// * `topic` - A reference to a `CheetahString` representing the topic to query. + /// + /// # Returns + /// + /// A `bool` indicating whether the assignment query was successful. + /// + /// # Errors + /// + /// This function logs errors if the allocation strategy is not set or if the query assignment + /// fails. async fn try_query_assignment(&mut self, topic: &CheetahString) -> bool { let topic_client_rebalance = self.topic_client_rebalance.read().await; if topic_client_rebalance.contains_key(topic) { @@ -152,11 +169,11 @@ where let strategy_name = if let Some(strategy) = &self.allocate_message_queue_strategy { CheetahString::from_static_str(strategy.get_name()) } else { - CheetahString::from_static_str("unknown") + error!("tryQueryAssignment error, allocateMessageQueueStrategy is None."); + return false; }; - let mut retry_times = 0; - for _ in 0..TIMEOUT_CHECK_TIMES { - retry_times += 1; + + for retry_times in 1..=TIMEOUT_CHECK_TIMES { match self .client_instance .as_mut() @@ -166,7 +183,7 @@ where self.consumer_group.as_ref().unwrap(), &strategy_name, self.message_model.unwrap(), - QUERY_ASSIGNMENT_TIMEOUT, + QUERY_ASSIGNMENT_TIMEOUT / (TIMEOUT_CHECK_TIMES as u64) * (retry_times as u64), ) .await { @@ -186,13 +203,11 @@ where }, } } - - if retry_times >= TIMEOUT_CHECK_TIMES { - let mut topic_client_rebalance = self.topic_client_rebalance.write().await; - topic_client_rebalance.insert(topic.clone(), topic.clone()); - return false; - } - true + self.topic_client_rebalance + .write() + .await + .insert(topic.clone(), topic.clone()); + false } async fn truncate_message_queue_not_my_topic(&self) {