Skip to content

Commit

Permalink
[ISSUE #1617]🚀Implement ConsumeMessagePopConcurrentlyService#consumeM…
Browse files Browse the repository at this point in the history
…essageDirectly🔥
  • Loading branch information
mxsm committed Dec 7, 2024
1 parent 01c9508 commit db79f4f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
* limitations under the License.
*/
use std::sync::Arc;
use std::time::Instant;

use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_remoting::protocol::body::cm_result::CMResult;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_rust::ArcMut;
use rocketmq_rust::WeakArcMut;
use tracing::info;

use crate::base::client_config::ClientConfig;
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait;
use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue;
use crate::consumer::consumer_impl::process_queue::ProcessQueue;
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
use crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
use crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently;

pub struct ConsumeMessagePopConcurrentlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakArcMut<DefaultMQPushConsumerImpl>>,
pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcMut<ClientConfig>,
pub(crate) consumer_config: ArcMut<ConsumerConfig>,
pub(crate) consumer_group: CheetahString,
Expand All @@ -46,9 +50,10 @@ impl ConsumeMessagePopConcurrentlyService {
consumer_config: ArcMut<ConsumerConfig>,
consumer_group: CheetahString,
message_listener: ArcBoxMessageListenerConcurrently,
default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,

Check warning on line 53 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L53

Added line #L53 was not covered by tests
) -> Self {
Self {
default_mqpush_consumer_impl: None,
default_mqpush_consumer_impl,

Check warning on line 56 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L56

Added line #L56 was not covered by tests
client_config,
consumer_config,
consumer_group,
Expand Down Expand Up @@ -87,7 +92,50 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
todo!()
info!("consumeMessageDirectly receive new message: {}", msg);

Check warning on line 95 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L95

Added line #L95 was not covered by tests

let mq = MessageQueue::from_parts(
msg.topic().clone(),
broker_name.unwrap_or_default(),
msg.queue_id(),
);
let mut msgs = vec![ArcMut::new(MessageClientExt::new(msg))];
let context = ConsumeConcurrentlyContext::new(mq);
self.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.mut_from_ref()
.reset_retry_and_namespace(msgs.as_mut_slice(), self.consumer_group.as_str());

let begin_timestamp = Instant::now();

let status = self.message_listener.consume_message(
&msgs
.iter()
.map(|msg| &msg.message_ext_inner)
.collect::<Vec<&MessageExt>>(),
&context,
);
let mut result = ConsumeMessageDirectlyResult::default();
result.set_order(false);
result.set_auto_commit(true);
match status {
Ok(status) => match status {
ConsumeConcurrentlyStatus::ConsumeSuccess => {
result.set_consume_result(CMResult::CRSuccess);
}
ConsumeConcurrentlyStatus::ReconsumeLater => {
result.set_consume_result(CMResult::CRLater);
}

Check warning on line 129 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L97-L129

Added lines #L97 - L129 were not covered by tests
},
Err(e) => {
result.set_consume_result(CMResult::CRThrowException);
result.set_remark(CheetahString::from_string(e.to_string()))

Check warning on line 133 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L131-L133

Added lines #L131 - L133 were not covered by tests
}
}
result.set_spent_time_mills(begin_timestamp.elapsed().as_millis() as u64);
info!("consumeMessageDirectly Result: {}", result);
result

Check warning on line 138 in rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs#L136-L138

Added lines #L136 - L138 were not covered by tests
}

async fn submit_consume_request(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl DefaultMQPushConsumerImpl {
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.expect("listener is None"),
self.default_mqpush_consumer_impl.clone(),

Check warning on line 291 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L291

Added line #L291 was not covered by tests
));

self.consume_message_pop_service =
Expand Down

0 comments on commit db79f4f

Please # to comment.