Skip to content

Commit

Permalink
[ISSUE #2090]💫Implement PopReviveService#mergeAndRevive⚗️
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 4, 2025
1 parent 259be44 commit 2027b49
Showing 1 changed file with 135 additions and 3 deletions.
138 changes: 135 additions & 3 deletions rocketmq-broker/src/processor/processor_service/pop_revive_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tracing::info;

use crate::failover::escape_bridge::EscapeBridge;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager;
use crate::topic::manager::topic_config_manager::TopicConfigManager;

pub struct PopReviveService<MS> {
Expand All @@ -69,6 +70,7 @@ pub struct PopReviveService<MS> {
escape_bridge: ArcMut<EscapeBridge<MS>>,
message_store: ArcMut<MS>,
should_start_time: Arc<AtomicU64>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,
}
impl<MS: MessageStore> PopReviveService<MS> {
pub fn new(
Expand All @@ -81,6 +83,7 @@ impl<MS: MessageStore> PopReviveService<MS> {
escape_bridge: ArcMut<EscapeBridge<MS>>,
message_store: ArcMut<MS>,
should_start_time: Arc<AtomicU64>,
subscription_group_manager: Arc<SubscriptionGroupManager<MS>>,

Check warning on line 86 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L86

Added line #L86 was not covered by tests
) -> Self {
Self {
queue_id,
Expand All @@ -98,6 +101,7 @@ impl<MS: MessageStore> PopReviveService<MS> {
escape_bridge,
message_store,
should_start_time,
subscription_group_manager,

Check warning on line 104 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L104

Added line #L104 was not covered by tests
}
}

Expand Down Expand Up @@ -340,7 +344,10 @@ impl<MS: MessageStore> PopReviveService<MS> {
);
continue;
}
this.merge_and_revive(&mut consume_revive_obj);
if let Err(e) = this.merge_and_revive(&mut consume_revive_obj).await {
error!("reviveQueueId={}, revive error:{}", this.queue_id, e);
continue;
}

Check warning on line 350 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L347-L350

Added lines #L347 - L350 were not covered by tests
let mut delay = 0;
if let Some(ref sort_list) = consume_revive_obj.sort_list {
if !sort_list.is_empty() {
Expand Down Expand Up @@ -586,8 +593,133 @@ impl<MS: MessageStore> PopReviveService<MS> {
unimplemented!()
}

fn merge_and_revive(&self, _consume_revive_obj: &mut ConsumeReviveObj) {
unimplemented!("consume_revive_message")
async fn merge_and_revive(
&mut self,
consume_revive_obj: &mut ConsumeReviveObj,
) -> crate::Result<()> {
let mut new_offset = consume_revive_obj.old_offset;
let end_time = consume_revive_obj.end_time;
let sort_list = consume_revive_obj.gen_sort_list();
info!(
"reviveQueueId={}, ck listSize={}",
self.queue_id,
sort_list.len()

Check warning on line 606 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L596-L606

Added lines #L596 - L606 were not covered by tests
);
if !sort_list.is_empty() {
let last = sort_list.last().unwrap();
info!(
"reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={}; last ck, \
startOffset={}, reviveOffset={}",
self.queue_id,
sort_list[0].start_offset,
sort_list[0].revive_offset,

Check warning on line 615 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L608-L615

Added lines #L608 - L615 were not covered by tests
last.start_offset,
last.revive_offset
);
}

Check warning on line 619 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L619

Added line #L619 was not covered by tests

for pop_check_point in sort_list {
if !self.should_run_pop_revive {
info!(
"slave skip ck process, revive topic={}, reviveQueueId={}",

Check warning on line 624 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L621-L624

Added lines #L621 - L624 were not covered by tests
self.revive_topic, self.queue_id
);
break;
}
if end_time - pop_check_point.get_revive_time()
<= (PopAckConstants::ACK_TIME_INTERVAL + PopAckConstants::SECOND)

Check warning on line 630 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L627-L630

Added lines #L627 - L630 were not covered by tests
{
break;
}

let normal_topic = CheetahString::from_string(KeyBuilder::parse_normal_topic(
pop_check_point.topic.as_str(),
pop_check_point.cid.as_str(),
));
if self
.topic_config_manager
.select_topic_config(&normal_topic)
.is_none()

Check warning on line 642 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L632-L642

Added lines #L632 - L642 were not covered by tests
{
info!(
"reviveQueueId={}, can not get normal topic {}, then continue",

Check warning on line 645 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L644-L645

Added lines #L644 - L645 were not covered by tests
self.queue_id, pop_check_point.topic
);
new_offset = pop_check_point.revive_offset;
continue;
}
if self
.subscription_group_manager
.find_subscription_group_config(&pop_check_point.cid)
.is_none()

Check warning on line 654 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L648-L654

Added lines #L648 - L654 were not covered by tests
{
info!(
"reviveQueueId={}, can not get cid {}, then continue",

Check warning on line 657 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L656-L657

Added lines #L656 - L657 were not covered by tests
self.queue_id, pop_check_point.cid
);
new_offset = pop_check_point.revive_offset;
continue;
}

// may be need to optimize
let mut remove = vec![];
let length = self.inflight_revive_request_map.lock().len();
while length - remove.len() > 3 {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let mut inflight_map = self.inflight_revive_request_map.lock();
let entry = inflight_map.first_entry().unwrap();
let pair = entry.get();
let old_ck = entry.key();
if !pair.1 && (get_current_millis() - pair.0 as u64 > 30 * 1000) {
self.re_put_ck(old_ck, pair);
remove.push(old_ck.clone());
info!(
"stay too long, remove from reviveRequestMap, {}, {:?}, {}, {}",

Check warning on line 677 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L660-L677

Added lines #L660 - L677 were not covered by tests
pop_check_point.topic,
pop_check_point.broker_name,
pop_check_point.queue_id,
pop_check_point.start_offset
);
}

Check warning on line 683 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L683

Added line #L683 was not covered by tests
}
let mut inflight_revive_request_map = self.inflight_revive_request_map.lock();
for ck in remove {
inflight_revive_request_map.remove(&ck);
}

Check warning on line 688 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L685-L688

Added lines #L685 - L688 were not covered by tests
// may be need to optimize
self.revive_msg_from_ck(pop_check_point);

new_offset = pop_check_point.revive_offset;

Check warning on line 692 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L690-L692

Added lines #L690 - L692 were not covered by tests
}
if new_offset > consume_revive_obj.old_offset {
if !self.should_run_pop_revive {
println!(
"slave skip commit, revive topic={}, reviveQueueId={}",
self.revive_topic, self.queue_id
);
return Ok(());
}
self.consumer_offset_manager.commit_offset(
CheetahString::from_static_str(PopAckConstants::LOCAL_HOST),
&CheetahString::from_static_str(PopAckConstants::REVIVE_GROUP),
&self.revive_topic,
self.queue_id,
new_offset,
);
}
self.revive_offset = new_offset;
consume_revive_obj.new_offset = new_offset;
Ok(())
}

Check warning on line 713 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L694-L713

Added lines #L694 - L713 were not covered by tests

fn re_put_ck(&self, _old_ck: &PopCheckPoint, _pair: &(i64, bool)) {
// Implement the logic to re-put checkpoint
unimplemented!()

Check warning on line 717 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L715-L717

Added lines #L715 - L717 were not covered by tests
}

fn revive_msg_from_ck(&self, _pop_check_point: &PopCheckPoint) {
// Implement the logic to revive message from checkpoint
unimplemented!()

Check warning on line 722 in rocketmq-broker/src/processor/processor_service/pop_revive_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_revive_service.rs#L720-L722

Added lines #L720 - L722 were not covered by tests
}
}

Expand Down

0 comments on commit 2027b49

Please # to comment.