Skip to content

Commit

Permalink
[ISSUE #1885]📝Add code comments for QueryAssignmentProcessor (#1886)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 20, 2024
1 parent 26fbc7c commit 7dd2383
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion rocketmq-broker/src/processor/query_assignment_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl QueryAssignmentProcessor {
channel: Channel,
) -> Option<HashSet<MessageQueue>> {
match message_model {
// handle broadcasting consumer
// handle broadcasting consumer, this mode returns all message queues
MessageModel::Broadcasting => {
let assigned_queue_set = self
.topic_route_info_manager
Expand All @@ -203,6 +203,7 @@ impl QueryAssignmentProcessor {
}
// handle clustering consumer
MessageModel::Clustering => {
// get all message queues for the topic
let mq_set = if mix_all::is_lmq(Some(topic.as_str())) {
let mut set = HashSet::new();
let queue = MessageQueue::from_parts(
Expand Down Expand Up @@ -231,6 +232,7 @@ impl QueryAssignmentProcessor {
if !self.broker_config.server_load_balancer_enable {
return mq_set;
}
// get all consumer ids for the consumer group
let consumer_group_info = self
.consumer_manager
.get_consumer_group_info(consumer_group);
Expand All @@ -248,6 +250,7 @@ impl QueryAssignmentProcessor {
return None;
}
let mut mq_all = mq_set.unwrap().into_iter().collect::<Vec<MessageQueue>>();
// sort message queues and consumer ids
mq_all.sort();
cid_all.sort();

Expand All @@ -263,6 +266,7 @@ impl QueryAssignmentProcessor {
let strategy = strategy.unwrap();
let result =
if set_message_request_mode_request_body.mode == MessageRequestMode::Pop {
// allocate message queues for pop mode
self.allocate_for_pop(
strategy,
consumer_group,
Expand All @@ -272,6 +276,7 @@ impl QueryAssignmentProcessor {
set_message_request_mode_request_body.pop_share_queue_num,
)
} else {
// allocate message queues for pull mode
match strategy.allocate(
consumer_group,
client_id,
Expand Down Expand Up @@ -300,6 +305,9 @@ impl QueryAssignmentProcessor {
pop_share_queue_num: i32,
) -> Result<HashSet<MessageQueue>> {
if pop_share_queue_num <= 0 || pop_share_queue_num >= cid_all.len() as i32 - 1 {
//Each consumer can consume all queues, return all queues. Queue ID -1 means consume
// all queues when consuming in Pop mode
//each client pop all messagequeue
Ok(mq_all
.iter()
.map(|mq| {
Expand All @@ -311,6 +319,8 @@ impl QueryAssignmentProcessor {
})
.collect::<HashSet<MessageQueue>>())
} else if cid_all.len() <= mq_all.len() {
//consumer working in pop mode could share the MessageQueues assigned to
// the N (N = popWorkGroupSize) consumer following it in the cid list
let mut allocate_result = strategy
.allocate(consumer_group, current_cid, mq_all, cid_all)
.unwrap();
Expand Down

0 comments on commit 7dd2383

Please # to comment.