Skip to content

Commit

Permalink
[ISSUE #1779]🤡Optimize RebalanceImpl#update_message_queue_assignment …
Browse files Browse the repository at this point in the history
…method🔥 (#1780)
  • Loading branch information
mxsm authored Dec 14, 2024
1 parent c69c40e commit c6c3bea
Showing 1 changed file with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,23 @@ where
mq_set.eq(&set)
}

/// Updates the message queue assignments for a given topic.
///
/// This function processes the provided message queue assignments, categorizing them into push
/// and pop assignments. It handles the subscription and unsubscription of retry topics
/// based on the assignments. It also removes unnecessary message queues and adds new
/// message queues as needed.
///
/// # Arguments
///
/// * `topic` - A reference to a `CheetahString` representing the topic to update.
/// * `assignments` - A reference to a `HashSet` of `MessageQueueAssignment` representing the
/// new assignments.
/// * `is_order` - A boolean indicating whether the message queues should be ordered.
///
/// # Returns
///
/// A `bool` indicating whether the message queue assignments were changed.
async fn update_message_queue_assignment(
&mut self,
topic: &CheetahString,
Expand All @@ -344,13 +361,12 @@ where
let mut changed = false;
let mut mq2push_assignment = HashMap::new();
let mut mq2pop_assignment = HashMap::new();
//maybe need to optimize
for assignment in assignments {
if let Some(ref mq) = assignment.message_queue {
if MessageRequestMode::Pop == assignment.mode {
mq2pop_assignment.insert(mq.clone(), assignment.clone());
mq2pop_assignment.insert(mq, assignment);
} else {
mq2push_assignment.insert(mq.clone(), assignment.clone());
mq2push_assignment.insert(mq, assignment);
}
}
}
Expand Down Expand Up @@ -490,6 +506,7 @@ where
}

{
// add new message queue
let mut all_mq_locked = true;
let mut pull_request_list = Vec::new();
let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade();
Expand All @@ -500,8 +517,8 @@ where
let process_queue_table_clone = self.process_queue_table.clone();
let mut process_queue_table = process_queue_table_clone.write().await;
for (mq, assignment) in mq2push_assignment {
if !process_queue_table.contains_key(&mq) {
if is_order && !self.lock_with(&mq, process_queue_table.deref()).await {
if !process_queue_table.contains_key(mq) {
if is_order && !self.lock_with(mq, process_queue_table.deref()).await {
warn!(
"doRebalance, {:?}, add a new mq failed, {}, because lock failed",
self.consumer_group,
Expand All @@ -511,10 +528,16 @@ where
continue;
}

sub_rebalance_impl.remove_dirty_offset(&mq).await;
sub_rebalance_impl.remove_dirty_offset(mq).await;
let pq = Arc::new(sub_rebalance_impl.create_process_queue());
pq.set_locked(true);
let next_offset = sub_rebalance_impl.compute_pull_from_where(&mq).await;
let next_offset = sub_rebalance_impl
.compute_pull_from_where_with_exception(mq)
.await;
if next_offset.is_err() {
continue;
}
let next_offset = next_offset.unwrap();
if next_offset >= 0 {
if process_queue_table.insert(mq.clone(), pq.clone()).is_none() {
info!(
Expand Down Expand Up @@ -558,7 +581,7 @@ where
if let Some(rebalance_impl) = self.sub_rebalance_impl.as_ref().unwrap().upgrade() {
let mut pop_request_list = Vec::new();
let mut pop_process_queue_table = self.pop_process_queue_table.write().await;
for (mq, assignment) in &mq2pop_assignment {
for (mq, assignment) in mq2pop_assignment {
if !pop_process_queue_table.contains_key(mq) {
let pq = rebalance_impl.create_pop_process_queue();
let pre = pop_process_queue_table.insert(mq.clone(), Arc::new(pq.clone()));
Expand Down

0 comments on commit c6c3bea

Please # to comment.