Skip to content

Commit

Permalink
[ISSUE #1939]🔖PopMessageProcessor supports process_request handle-4🚀 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 25, 2024
1 parent fd06db8 commit 100dffb
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 50 deletions.
3 changes: 3 additions & 0 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,15 @@ impl BrokerRuntime {
);
let pop_message_processor = ArcMut::new(PopMessageProcessor::new(
self.consumer_manager.clone(),
Arc::new(self.consumer_offset_manager.clone()),
self.consumer_order_info_manager.clone(),
self.broker_config.clone(),
self.message_store.clone().unwrap(),
self.message_store_config.clone(),
Arc::new(self.topic_config_manager.clone()),
self.subscription_group_manager.clone(),
self.consumer_filter_manager.clone(),
self.pop_inflight_message_counter.clone(),
));
let ack_message_processor = ArcMut::new(AckMessageProcessor::new(
self.topic_config_manager.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl PopLongPollingService {
remoting_command: RemotingCommand,
request_header: PollingHeader,
subscription_data: SubscriptionData,
message_filter: Arc<Option<Box<dyn MessageFilter>>>,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
) -> PollingResult {
unimplemented!("PopLongPollingService::polling")
}
Expand Down
15 changes: 15 additions & 0 deletions rocketmq-broker/src/offset/manager/consumer_order_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ impl<MS> ConsumerOrderInfoManager<MS> {
) -> bool {
unimplemented!()
}

pub fn update(
&self,
attempt_id: CheetahString,
is_retry: bool,
topic: &CheetahString,
group: &CheetahString,
queue_id: i32,
pop_time: u64,
invisible_time: u64,
msg_queue_offset_list: Vec<u64>,
order_info_builder: &str,
) -> bool {
unimplemented!()
}
}

#[inline]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
subscription_data: SubscriptionData,
subscription_group_config: SubscriptionGroupConfig,
broker_allow_suspend: bool,
message_filter: Box<dyn MessageFilter>,
message_filter: Arc<Box<dyn MessageFilter>>,
mut response: RemotingCommand,
mut mapping_context: TopicQueueMappingContext,
_begin_time_mills: u64,
Expand Down Expand Up @@ -234,7 +234,7 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
get_current_millis(),
offset,
subscription_data,
Arc::new(message_filter),
message_filter,
);
self.pull_request_hold_service
.as_ref()
Expand Down
Loading

0 comments on commit 100dffb

Please # to comment.