Skip to content

Commit

Permalink
[ISSUE #2333]🤡Implement PopBufferMergeService addCkMock method🧑‍💻 (#2334
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mxsm authored Jan 18, 2025
1 parent 03c2c4a commit 2caa670
Showing 1 changed file with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,41 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
unimplemented!()
}

pub fn add_ck_mock(
&mut self,
group: CheetahString,
topic: CheetahString,
queue_id: i32,
start_offset: u64,
invisible_time: u64,
pop_time: u64,
revive_queue_id: i32,
next_begin_offset: u64,
broker_name: CheetahString,
) {
let ck = PopCheckPoint {
pop_time: pop_time as i64,
invisible_time: invisible_time as i64,
start_offset: start_offset as i64,
topic,
cid: group,
queue_id,
broker_name: Some(broker_name),
..Default::default()
};

let point_wrapper = PopCheckPointWrapper::new_with_offset(
revive_queue_id,
i64::MAX,
Arc::new(ck),
next_begin_offset as i64,
true,
);
point_wrapper.set_ck_stored(true);

self.put_offset_queue(point_wrapper);
}

fn is_ck_done_for_finish(&self, point_wrapper: &PopCheckPointWrapper) -> bool {
let num = point_wrapper.get_ck().num;
let bits = point_wrapper.get_bits().load(Ordering::Relaxed)
Expand All @@ -511,6 +546,15 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
true
}

fn put_offset_queue(&mut self, point_wrapper: PopCheckPointWrapper) -> bool {
let mut queue = self
.commit_offsets
.entry(point_wrapper.lock_key.clone())
.or_insert(QueueWithTime::new());
queue.get_queue_mut().push_back(point_wrapper);
true
}

async fn put_ck_to_store(&self, point_wrapper: &PopCheckPointWrapper, flag: bool) {
if point_wrapper.get_revive_queue_offset() >= 0 {
return;
Expand Down Expand Up @@ -821,7 +865,7 @@ impl PopCheckPointWrapper {
self.next_begin_offset
}

pub fn get_lock_key(&self) -> &str {
pub fn get_lock_key(&self) -> &CheetahString {
&self.lock_key
}

Expand Down

0 comments on commit 2caa670

Please # to comment.