Skip to content

Commit

Permalink
[ISSUE #2410]Implement AckMessageProcessor start method
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 25, 2025
1 parent ec358cb commit 147d061
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ use crate::broker_error::BrokerError::BrokerCommonError;
use crate::broker_error::BrokerError::BrokerRemotingError;
use crate::broker_runtime::BrokerRuntimeInner;
use crate::processor::pop_message_processor::PopMessageProcessor;
use crate::processor::processor_service::pop_revive_service::PopReviveService;

pub struct AckMessageProcessor<MS> {
pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
revive_topic: CheetahString,
pop_revive_services: Vec<ArcMut<PopReviveService<MS>>>,
}

impl<MS> AckMessageProcessor<MS>
Expand All @@ -66,9 +69,25 @@ where
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
) -> AckMessageProcessor<MS> {
let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
broker_runtime_inner
.broker_config()
.broker_identity
.broker_cluster_name
.as_str(),
));
let mut pop_revive_services = vec![];
for i in 0..broker_runtime_inner.broker_config().revive_queue_num {
let revive_queue_id = POP_ORDER_REVIVE_QUEUE;
let pop_revive_service =
PopReviveService::new(revive_topic.clone(), i as i32, broker_runtime_inner.clone());
pop_revive_services.push(ArcMut::new(pop_revive_service));
}

Check warning on line 85 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L72-L85

Added lines #L72 - L85 were not covered by tests
AckMessageProcessor {
pop_message_processor,
broker_runtime_inner,
revive_topic,
pop_revive_services,

Check warning on line 90 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L89-L90

Added lines #L89 - L90 were not covered by tests
}
}

Expand Down Expand Up @@ -97,7 +116,9 @@ where
}

pub fn start(&mut self) {
warn!("AckMessageProcessor started unimplemented, need to be implemented");
for pop_revive_service in self.pop_revive_services.iter() {
PopReviveService::start(pop_revive_service.clone());
}

Check warning on line 121 in rocketmq-broker/src/processor/ack_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/ack_message_processor.rs#L119-L121

Added lines #L119 - L121 were not covered by tests
}
}

Expand Down

0 comments on commit 147d061

Please # to comment.