diff --git a/rocketmq-broker/src/processor/pop_message_processor.rs b/rocketmq-broker/src/processor/pop_message_processor.rs index 35b7efd5..2ea51fbc 100644 --- a/rocketmq-broker/src/processor/pop_message_processor.rs +++ b/rocketmq-broker/src/processor/pop_message_processor.rs @@ -928,7 +928,6 @@ where offset, request_header.max_msg_nums as i32 - get_message_result.message_mapped_list().len() as i32, - // 1024 * 1024, message_filter.clone(), ) .await; @@ -992,6 +991,7 @@ where .get_max_offset_in_queue(topic, queue_id) - atomic_offset.load(Ordering::Acquire) + atomic_rest_num.load(Ordering::Acquire); + self.queue_lock_manager().unlock_with_key(lock_key).await; return num; } Some(value) => value, @@ -1037,6 +1037,7 @@ where .broker_name .as_str(), ) { + self.queue_lock_manager().unlock_with_key(lock_key).await; return atomic_rest_num.load(Ordering::Acquire) + result_inner.message_count() as i64; } @@ -1053,25 +1054,28 @@ where queue_id, result_inner.message_queue_offset().clone(), ); - } else if (result_inner.status().is_some() - && (result_inner.status().unwrap() == GetMessageStatus::NoMatchedMessage - || result_inner.status().unwrap() == GetMessageStatus::OffsetFoundNull - || result_inner.status().unwrap() == GetMessageStatus::MessageWasRemoving - || result_inner.status().unwrap() == GetMessageStatus::NoMatchedLogicQueue)) - && result_inner.next_begin_offset() > -1 - { - if is_order { - self.broker_runtime_inner - .consumer_offset_manager() - .commit_offset( - channel.remote_address().to_string().into(), - &request_header.consumer_group, - topic, - queue_id, - result_inner.next_begin_offset(), - ); - } else { - unimplemented!("PopMessageProcessor pop_msg_from_queue") + } else if let Some(status) = result_inner.status() { + if matches!( + status, + GetMessageStatus::NoMatchedMessage + | GetMessageStatus::OffsetFoundNull + | GetMessageStatus::MessageWasRemoving + | GetMessageStatus::NoMatchedLogicQueue + ) && result_inner.next_begin_offset() > -1 + { + if is_order { + self.broker_runtime_inner + .consumer_offset_manager() + .commit_offset( + channel.remote_address().to_string().into(), + &request_header.consumer_group, + topic, + queue_id, + result_inner.next_begin_offset(), + ); + } else { + unimplemented!("PopMessageProcessor pop_msg_from_queue") + } } }