Skip to content

Commit

Permalink
[ISSUE #1037]Optimize DefaultMQPushConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Oct 6, 2024
1 parent e07dbc4 commit 63b1a4c
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 86 deletions.
1 change: 0 additions & 1 deletion rocketmq-client/src/base/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Validators {
),
));
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl DefaultMQPushConsumerImpl {
self.consumer_config.unit_mode
);
*self.service_state = ServiceState::StartFailed;
// check all config
self.check_config()?;
//copy_subscription is can be removed
self.copy_subscription().await?;
if self.consumer_config.message_model() == MessageModel::Clustering {
self.client_config.change_instance_name_to_pid();
Expand Down Expand Up @@ -474,20 +476,6 @@ impl DefaultMQPushConsumerImpl {
));
}

/* if !util_all::parse_date(self.consumer_config.consume_timestamp.as_str(), DATE_FORMAT)
.is_ok()
{
return Err(MQClientError::MQClientException(
-1,
format!(
"consumeTimestamp is invalid, the valid format is {}, but received {}{}",
DATE_FORMAT,
self.consumer_config.consume_timestamp,
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
),
));
}*/

if self
.consumer_config
.allocate_message_queue_strategy
Expand All @@ -502,16 +490,6 @@ impl DefaultMQPushConsumerImpl {
));
}

/* if self.consumer_config.subscription.is_empty() {
return Err(MQClientError::MQClientErr(
-1,
format!(
"subscription is null{}",
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
),
));
}*/

if self.consumer_config.message_listener.is_none() {
return Err(MQClientError::MQClientErr(
-1,
Expand All @@ -528,14 +506,14 @@ impl DefaultMQPushConsumerImpl {
.as_ref()
.unwrap()
.message_listener_orderly
.is_some()
.is_none()
&& self
.consumer_config
.message_listener
.as_ref()
.unwrap()
.message_listener_concurrently
.is_some()
.is_none()
{
return Err(MQClientError::MQClientErr(
-1,
Expand Down Expand Up @@ -701,7 +679,18 @@ impl DefaultMQPushConsumerImpl {
async fn copy_subscription(&mut self) -> Result<()> {
let sub = self.consumer_config.subscription();
if !sub.is_empty() {
unimplemented!()
for (topic, sub_expression) in sub.as_ref() {
let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression)
.map_err(|e| {
MQClientError::MQClientErr(
-1,
format!("buildSubscriptionData exception, {}", e),
)
})?;
self.rebalance_impl
.put_subscription_data(topic, subscription_data)
.await;
}
}
if self.message_listener.is_none() {
self.message_listener = self.consumer_config.message_listener.clone();
Expand Down
25 changes: 24 additions & 1 deletion rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl PullAPIWrapper {
}
msg_vec = inner_msg_vec;
}

// filter message
let mut msg_list_filter_again =
if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode {
let mut msg_vec_again = Vec::with_capacity(msg_vec.len());
Expand Down Expand Up @@ -219,6 +219,29 @@ impl PullAPIWrapper {
}
}

/// Pulls messages from the broker asynchronously.
///
/// # Arguments
///
/// * `mq` - A reference to the `MessageQueue` from which to pull messages.
/// * `sub_expression` - The subscription expression.
/// * `expression_type` - The type of the subscription expression.
/// * `sub_version` - The version of the subscription.
/// * `offset` - The offset from which to start pulling messages.
/// * `max_nums` - The maximum number of messages to pull.
/// * `max_size_in_bytes` - The maximum size of messages to pull in bytes.
/// * `sys_flag` - The system flag for the pull request.
/// * `commit_offset` - The commit offset.
/// * `broker_suspend_max_time_millis` - The maximum time in milliseconds for which the broker
/// can suspend the pull request.
/// * `timeout_millis` - The timeout for the pull request in milliseconds.
/// * `communication_mode` - The communication mode (e.g., sync, async).
/// * `pull_callback` - The callback to execute when the pull request completes.
///
/// # Returns
///
/// A `Result` containing an `Option` with the `PullResultExt` if successful, or an
/// `MQClientError` if an error occurs.
pub async fn pull_kernel_impl<PCB>(
&mut self,
mq: &MessageQueue,
Expand Down
93 changes: 43 additions & 50 deletions rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,79 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::ArcRefCellWrapper;
use tracing::info;
use tracing::warn;

use crate::consumer::consumer_impl::message_request::MessageRequest;
use crate::consumer::consumer_impl::pop_request::PopRequest;
use crate::consumer::consumer_impl::pull_request::PullRequest;
use crate::factory::mq_client_instance::MQClientInstance;

#[derive(Clone)]
pub struct PullMessageService {
pop_tx: Option<tokio::sync::mpsc::Sender<PopRequest>>,
pull_tx: Option<tokio::sync::mpsc::Sender<PullRequest>>,
tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,

Check warning on line 29 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L29

Added line #L29 was not covered by tests
}

impl PullMessageService {
pub fn new() -> Self {
PullMessageService {
pop_tx: None,
pull_tx: None,
}
PullMessageService { tx: None }

Check warning on line 34 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L34

Added line #L34 was not covered by tests
}
pub async fn start(&mut self, instance: ArcRefCellWrapper<MQClientInstance>) {
let (pop_tx, mut pop_rx) = tokio::sync::mpsc::channel(1024 * 4);
let (pull_tx, mut pull_rx) = tokio::sync::mpsc::channel(1024 * 4);
self.pop_tx = Some(pop_tx);
self.pull_tx = Some(pull_tx);
//let instance_wrapper = ArcRefCellWrapper::new(instance);
let instance_wrapper_clone = instance.clone();
tokio::spawn(async move {
info!(
">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PopRequest] \
started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"
);
while let Some(request) = pop_rx.recv().await {
if let Some(mut consumer) =
instance.select_consumer(request.get_consumer_group()).await
{
consumer.pop_message(request).await;
} else {
warn!(
"No matched consumer for the PopRequest {}, drop it",
request
)
}
}
});
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {

Check warning on line 36 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L36

Added line #L36 was not covered by tests
let (tx, mut rx) =
tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(1024 * 4);
self.tx = Some(tx);
tokio::spawn(async move {
info!(
">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PullRequest] \
started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"
);
while let Some(request) = pull_rx.recv().await {
if let Some(mut consumer) = instance_wrapper_clone
.select_consumer(request.get_consumer_group())
.await
{
consumer.pull_message(request).await;
info!(">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
while let Some(request) = rx.recv().await {
if request.get_message_request_mode() == MessageRequestMode::Pull {
let pull_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
PullMessageService::pull_message(pull_request, instance.as_mut()).await;
} else {
warn!(
"No matched consumer for the PullRequest {},drop it",
request
)
let pop_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
}
}
});
}

async fn pull_message(request: PullRequest, instance: &mut MQClientInstance) {

Check warning on line 56 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L56

Added line #L56 was not covered by tests
if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
consumer.pull_message(request).await;
} else {
warn!(
"No matched consumer for the PullRequest {},drop it",
request
)
}
}

Check warning on line 65 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L65

Added line #L65 was not covered by tests

async fn pop_message(request: PopRequest, instance: &mut MQClientInstance) {

Check warning on line 67 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L67

Added line #L67 was not covered by tests
if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
consumer.pop_message(request).await;
} else {
warn!(
"No matched consumer for the PopRequest {}, drop it",
request
)
}
}

Check warning on line 76 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L76

Added line #L76 was not covered by tests

pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) {
let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await;
if let Err(e) = this.pull_tx.as_ref().unwrap().send(pull_request).await {
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pull_request)).await {

Check warning on line 82 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L82

Added line #L82 was not covered by tests
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
});
}

pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) {
if let Err(e) = self.pull_tx.as_ref().unwrap().send(pull_request).await {
if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pull_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
}
Expand All @@ -102,14 +95,14 @@ impl PullMessageService {
let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await;
if let Err(e) = this.pop_tx.as_ref().unwrap().send(pop_request).await {
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await {

Check warning on line 98 in rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs#L98

Added line #L98 was not covered by tests
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
});
}

pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) {
if let Err(e) = self.pop_tx.as_ref().unwrap().send(pop_request).await {
if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pop_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ where
let mut balanced = true;
let sub_table = self.subscription_inner.read().await;
if !sub_table.is_empty() {
let topics = sub_table
.keys()
.map(|item| item.to_string())
.collect::<HashSet<String>>();
let topics = sub_table.keys().cloned().collect::<HashSet<String>>();
drop(sub_table);
for topic in &topics {
//try_query_assignment unimplemented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl RebalancePushImpl {
self.rebalance_impl_inner.client_instance = Some(client_instance);
}

#[inline]
pub async fn put_subscription_data(
&mut self,
topic: &str,
Expand Down
6 changes: 5 additions & 1 deletion rocketmq-client/src/consumer/default_mq_push_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct ConsumerConfig {
pub(crate) consume_from_where: ConsumeFromWhere,
pub(crate) consume_timestamp: Option<String>,
pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
//this field will be removed in a certain version after April 5, 2020
pub(crate) subscription: ArcRefCellWrapper<HashMap<String, String>>,
pub(crate) message_listener: Option<ArcRefCellWrapper<MessageListener>>,
pub(crate) message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
Expand Down Expand Up @@ -243,6 +244,10 @@ impl ConsumerConfig {
self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
}

/**
* This method will be removed in a certain version after April 5, 2020, so please do not
* use this method.
*/
pub fn set_subscription(&mut self, subscription: ArcRefCellWrapper<HashMap<String, String>>) {
self.subscription = subscription;
}
Expand Down Expand Up @@ -414,7 +419,6 @@ impl Default for ConsumerConfig {
}
}

#[derive(Clone)]
pub struct DefaultMQPushConsumer {
client_config: ClientConfig,
consumer_config: ArcRefCellWrapper<ConsumerConfig>,
Expand Down
12 changes: 10 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,16 @@ impl MQClientInstance {
if self.client_config.namesrv_addr.is_none() {
self.mq_client_api_impl
.as_mut()
.unwrap()
.expect("mq_client_api_impl is None")
.fetch_name_server_addr()
.await;
}
// Start request-response channel
self.mq_client_api_impl.as_mut().unwrap().start().await;
self.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.start()
.await;
// Start various schedule tasks
self.start_scheduled_task(this.clone());
// Start pull service
Expand Down Expand Up @@ -337,6 +341,7 @@ impl MQClientInstance {

fn start_scheduled_task(&mut self, this: ArcRefCellWrapper<Self>) {
if self.client_config.namesrv_addr.is_none() {
// Fetch name server address
let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
self.instance_runtime.get_handle().spawn(async move {
info!("ScheduledTask fetchNameServerAddr started");
Expand All @@ -352,6 +357,7 @@ impl MQClientInstance {
});
}

// Update topic route info from name server
let mut client_instance = this.clone();
let poll_name_server_interval = self.client_config.poll_name_server_interval;
self.instance_runtime.get_handle().spawn(async move {
Expand All @@ -370,6 +376,7 @@ impl MQClientInstance {
}
});

// Clean offline broker and send heartbeat to all broker
let mut client_instance = this.clone();
let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
self.instance_runtime.get_handle().spawn(async move {
Expand All @@ -389,6 +396,7 @@ impl MQClientInstance {
}
});

// Persist all consumer offset
let mut client_instance = this;
let persist_consumer_offset_interval =
self.client_config.persist_consumer_offset_interval as u64;
Expand Down
Loading

0 comments on commit 63b1a4c

Please # to comment.