Skip to content

Commit

Permalink
[ISSUE #2390]🤡Implement PopMessageProcessor is_pop_should_stop method🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 24, 2025
1 parent 144edb2 commit cebb22f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
8 changes: 7 additions & 1 deletion rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,13 @@ where
group: &CheetahString,
queue_id: i32,
) -> bool {
unimplemented!("PopMessageProcessor is_pop_should_stop")
let broker_config = self.broker_runtime_inner.broker_config();
broker_config.enable_pop_message_threshold
&& self
.broker_runtime_inner
.pop_inflight_message_counter()
.get_group_pop_in_flight_message_num(topic, group, queue_id)
> broker_config.pop_inflight_message_threshold

Check warning on line 1173 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1167-L1173

Added lines #L1167 - L1173 were not covered by tests
}

fn get_pop_offset(
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ pub struct BrokerConfig {
pub pop_polling_map_size: usize,
pub max_pop_polling_size: u64,
pub pop_polling_size: usize,
pub enable_pop_message_threshold: bool,
pub pop_inflight_message_threshold: i64,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -311,6 +313,8 @@ impl Default for BrokerConfig {
pop_polling_map_size: 100000,
max_pop_polling_size: 100000,
pop_polling_size: 1024,
enable_pop_message_threshold: false,
pop_inflight_message_threshold: 10000,
}
}
}
Expand Down

0 comments on commit cebb22f

Please # to comment.