Skip to content

Commit

Permalink
[ISSUE #2329]🤡Complete the PopMessageProcessor process_request proces…
Browse files Browse the repository at this point in the history
…sing logic🧑‍💻
  • Loading branch information
mxsm committed Jan 18, 2025
1 parent ebb18b1 commit e737a84
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 8 deletions.
67 changes: 59 additions & 8 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerCon
use rocketmq_rust::ArcMut;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::base::message_status_enum::GetMessageStatus;
use rocketmq_store::base::select_result::SelectMappedBufferResult;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::log_file::MessageStore;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
Expand Down Expand Up @@ -766,7 +767,7 @@ where
topic: &CheetahString,
attempt_id: &CheetahString,
is_retry: bool,
get_message_result: ArcMut<GetMessageResult>,
mut get_message_result: ArcMut<GetMessageResult>,

Check warning on line 770 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L770

Added line #L770 was not covered by tests
request_header: &PopMessageRequestHeader,
queue_id: i32,
rest_num: i64,
Expand Down Expand Up @@ -997,8 +998,26 @@ where
queue_id,
result_inner.message_queue_offset().clone(),
);
} else {
unimplemented!()
} else if (result_inner.status().is_none()
|| result_inner.status().unwrap() == GetMessageStatus::NoMatchedMessage
|| result_inner.status().unwrap() == GetMessageStatus::OffsetFoundNull
|| result_inner.status().unwrap() == GetMessageStatus::MessageWasRemoving
|| result_inner.status().unwrap() == GetMessageStatus::NoMatchedLogicQueue)
&& result_inner.next_begin_offset() > -1

Check warning on line 1006 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1001-L1006

Added lines #L1001 - L1006 were not covered by tests
{
if is_order {
self.broker_runtime_inner
.consumer_offset_manager()
.commit_offset(
channel.remote_address().to_string().into(),
&request_header.consumer_group,
topic,
queue_id,
result_inner.next_begin_offset(),
);
} else {
unimplemented!("PopMessageProcessor pop_msg_from_queue")

Check warning on line 1019 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1008-L1019

Added lines #L1008 - L1019 were not covered by tests
}
}

atomic_rest_num.fetch_add(
Expand All @@ -1010,19 +1029,51 @@ where
.broker_config()
.broker_name
.as_str();
for msg in result_inner.message_mapped_list() {
let message_count = result_inner.message_count();
for mut maped_buffer in result_inner.message_mapped_vec() {

Check warning on line 1033 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1032-L1033

Added lines #L1032 - L1033 were not covered by tests
if self
.broker_runtime_inner
.broker_config()
.pop_response_return_actual_retry_topic
|| !is_retry
{
//get_message_result.message_mapped_list().push(msg.clone());
get_message_result.add_message_inner(maped_buffer);

Check warning on line 1040 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1040

Added line #L1040 was not covered by tests
} else {
let mut bytes = msg.get_bytes().unwrap_or_default();
let mut bytes = maped_buffer.get_bytes().unwrap_or_default();

Check warning on line 1042 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1042

Added line #L1042 was not covered by tests
let message_ext_list =
message_decoder::decodes_batch(&mut bytes, true, false);
unimplemented!("PopMessageProcessor pop_msg_from_queue")
maped_buffer.release();
for mut message_ext in message_ext_list {
let ck_info = ExtraInfoUtil::build_extra_info_with_msg_queue_offset(
final_offset,
pop_time as i64,
request_header.invisible_time as i64,
revive_qid,
message_ext.get_topic(),
broker_name,
message_ext.queue_id(),
message_ext.queue_offset(),
);
message_ext
.message
.properties
.entry(CheetahString::from_static_str(
MessageConst::PROPERTY_POP_CK,
))
.or_insert(ck_info.into());

message_ext.set_topic(request_header.topic.clone());
message_ext.set_store_size(0);

let encode = message_decoder::encode(&message_ext, false).unwrap();
let tmp_result = SelectMappedBufferResult {
start_offset: maped_buffer.start_offset,
size: encode.len() as i32,
bytes: Some(encode),
..Default::default()
};
get_message_result.add_message_inner(tmp_result);
}

Check warning on line 1076 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1045-L1076

Added lines #L1045 - L1076 were not covered by tests
}
}
self.broker_runtime_inner
Expand All @@ -1031,7 +1082,7 @@ where
topic,
&request_header.consumer_group,
queue_id,
result_inner.message_count() as i64,
message_count as i64,

Check warning on line 1085 in rocketmq-broker/src/processor/pop_message_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/pop_message_processor.rs#L1085

Added line #L1085 was not covered by tests
);
}
}
Expand Down
14 changes: 14 additions & 0 deletions rocketmq-store/src/base/get_message_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,25 @@ impl GetMessageResult {
self.message_mapped_list.push(maped_buffer);
}

#[inline]
pub fn add_message_inner(&mut self, maped_buffer: SelectMappedBufferResult) {
let slice = maped_buffer.get_buffer();
// self.message_buffer_list.push(Bytes::copy_from_slice(slice));
self.buffer_total_size += maped_buffer.bytes.as_ref().map_or(0, |b| b.len() as i32);
self.message_count += 1;
self.message_mapped_list.push(maped_buffer);
}

Check warning on line 231 in rocketmq-store/src/base/get_message_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/base/get_message_result.rs#L225-L231

Added lines #L225 - L231 were not covered by tests

#[inline]
pub fn message_mapped_list(&self) -> &[SelectMappedBufferResult] {
self.message_mapped_list.as_slice()
}

#[inline]
pub fn message_mapped_vec(self) -> Vec<SelectMappedBufferResult> {
self.message_mapped_list
}

Check warning on line 241 in rocketmq-store/src/base/get_message_result.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/base/get_message_result.rs#L239-L241

Added lines #L239 - L241 were not covered by tests

#[inline]
pub fn message_queue_offset(&self) -> &Vec<u64> {
&self.message_queue_offset
Expand Down

0 comments on commit e737a84

Please # to comment.