diff --git a/rocketmq-client/src/base/validators.rs b/rocketmq-client/src/base/validators.rs index 038c4601..06900e9b 100644 --- a/rocketmq-client/src/base/validators.rs +++ b/rocketmq-client/src/base/validators.rs @@ -55,7 +55,6 @@ impl Validators { ), )); } - Ok(()) } diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index 9305352c..6e25825f 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -203,7 +203,9 @@ impl DefaultMQPushConsumerImpl { self.consumer_config.unit_mode ); *self.service_state = ServiceState::StartFailed; + // check all config self.check_config()?; + //copy_subscription is can be removed self.copy_subscription().await?; if self.consumer_config.message_model() == MessageModel::Clustering { self.client_config.change_instance_name_to_pid(); @@ -474,20 +476,6 @@ impl DefaultMQPushConsumerImpl { )); } - /* if !util_all::parse_date(self.consumer_config.consume_timestamp.as_str(), DATE_FORMAT) - .is_ok() - { - return Err(MQClientError::MQClientException( - -1, - format!( - "consumeTimestamp is invalid, the valid format is {}, but received {}{}", - DATE_FORMAT, - self.consumer_config.consume_timestamp, - FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - ), - )); - }*/ - if self .consumer_config .allocate_message_queue_strategy @@ -502,16 +490,6 @@ impl DefaultMQPushConsumerImpl { )); } - /* if self.consumer_config.subscription.is_empty() { - return Err(MQClientError::MQClientErr( - -1, - format!( - "subscription is null{}", - FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL) - ), - )); - }*/ - if self.consumer_config.message_listener.is_none() { return Err(MQClientError::MQClientErr( -1, @@ -528,14 +506,14 @@ impl DefaultMQPushConsumerImpl { .as_ref() .unwrap() .message_listener_orderly - .is_some() + .is_none() && self .consumer_config .message_listener .as_ref() .unwrap() .message_listener_concurrently - .is_some() + .is_none() { return Err(MQClientError::MQClientErr( -1, @@ -701,7 +679,18 @@ impl DefaultMQPushConsumerImpl { async fn copy_subscription(&mut self) -> Result<()> { let sub = self.consumer_config.subscription(); if !sub.is_empty() { - unimplemented!() + for (topic, sub_expression) in sub.as_ref() { + let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression) + .map_err(|e| { + MQClientError::MQClientErr( + -1, + format!("buildSubscriptionData exception, {}", e), + ) + })?; + self.rebalance_impl + .put_subscription_data(topic, subscription_data) + .await; + } } if self.message_listener.is_none() { self.message_listener = self.consumer_config.message_listener.clone(); diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs index 5069ff03..73ae23de 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs @@ -142,7 +142,7 @@ impl PullAPIWrapper { } msg_vec = inner_msg_vec; } - + // filter message let mut msg_list_filter_again = if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode { let mut msg_vec_again = Vec::with_capacity(msg_vec.len()); @@ -219,6 +219,29 @@ impl PullAPIWrapper { } } + /// Pulls messages from the broker asynchronously. + /// + /// # Arguments + /// + /// * `mq` - A reference to the `MessageQueue` from which to pull messages. + /// * `sub_expression` - The subscription expression. + /// * `expression_type` - The type of the subscription expression. + /// * `sub_version` - The version of the subscription. + /// * `offset` - The offset from which to start pulling messages. + /// * `max_nums` - The maximum number of messages to pull. + /// * `max_size_in_bytes` - The maximum size of messages to pull in bytes. + /// * `sys_flag` - The system flag for the pull request. + /// * `commit_offset` - The commit offset. + /// * `broker_suspend_max_time_millis` - The maximum time in milliseconds for which the broker + /// can suspend the pull request. + /// * `timeout_millis` - The timeout for the pull request in milliseconds. + /// * `communication_mode` - The communication mode (e.g., sync, async). + /// * `pull_callback` - The callback to execute when the pull request completes. + /// + /// # Returns + /// + /// A `Result` containing an `Option` with the `PullResultExt` if successful, or an + /// `MQClientError` if an error occurs. pub async fn pull_kernel_impl( &mut self, mq: &MessageQueue, diff --git a/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs b/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs index 71787ee7..4507ee60 100644 --- a/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs @@ -14,86 +14,79 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +use rocketmq_common::common::message::message_enum::MessageRequestMode; use rocketmq_common::ArcRefCellWrapper; use tracing::info; use tracing::warn; +use crate::consumer::consumer_impl::message_request::MessageRequest; use crate::consumer::consumer_impl::pop_request::PopRequest; use crate::consumer::consumer_impl::pull_request::PullRequest; use crate::factory::mq_client_instance::MQClientInstance; #[derive(Clone)] pub struct PullMessageService { - pop_tx: Option>, - pull_tx: Option>, + tx: Option>>, } impl PullMessageService { pub fn new() -> Self { - PullMessageService { - pop_tx: None, - pull_tx: None, - } + PullMessageService { tx: None } } - pub async fn start(&mut self, instance: ArcRefCellWrapper) { - let (pop_tx, mut pop_rx) = tokio::sync::mpsc::channel(1024 * 4); - let (pull_tx, mut pull_rx) = tokio::sync::mpsc::channel(1024 * 4); - self.pop_tx = Some(pop_tx); - self.pull_tx = Some(pull_tx); - //let instance_wrapper = ArcRefCellWrapper::new(instance); - let instance_wrapper_clone = instance.clone(); - tokio::spawn(async move { - info!( - ">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PopRequest] \ - started<<<<<<<<<<<<<<<<<<<<<<<<<<<<" - ); - while let Some(request) = pop_rx.recv().await { - if let Some(mut consumer) = - instance.select_consumer(request.get_consumer_group()).await - { - consumer.pop_message(request).await; - } else { - warn!( - "No matched consumer for the PopRequest {}, drop it", - request - ) - } - } - }); + pub async fn start(&mut self, mut instance: ArcRefCellWrapper) { + let (tx, mut rx) = + tokio::sync::mpsc::channel::>(1024 * 4); + self.tx = Some(tx); tokio::spawn(async move { - info!( - ">>>>>>>>>>>>>>>>>>>>>>>PullMessageService [PullRequest] \ - started<<<<<<<<<<<<<<<<<<<<<<<<<<<<" - ); - while let Some(request) = pull_rx.recv().await { - if let Some(mut consumer) = instance_wrapper_clone - .select_consumer(request.get_consumer_group()) - .await - { - consumer.pull_message(request).await; + info!(">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); + while let Some(request) = rx.recv().await { + if request.get_message_request_mode() == MessageRequestMode::Pull { + let pull_request = + unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) }; + PullMessageService::pull_message(pull_request, instance.as_mut()).await; } else { - warn!( - "No matched consumer for the PullRequest {},drop it", - request - ) + let pop_request = + unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) }; + PullMessageService::pop_message(pop_request, instance.as_mut()).await; } } }); } + async fn pull_message(request: PullRequest, instance: &mut MQClientInstance) { + if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await { + consumer.pull_message(request).await; + } else { + warn!( + "No matched consumer for the PullRequest {},drop it", + request + ) + } + } + + async fn pop_message(request: PopRequest, instance: &mut MQClientInstance) { + if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await { + consumer.pop_message(request).await; + } else { + warn!( + "No matched consumer for the PopRequest {}, drop it", + request + ) + } + } + pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) { let this = self.clone(); tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await; - if let Err(e) = this.pull_tx.as_ref().unwrap().send(pull_request).await { + if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pull_request)).await { warn!("Failed to send pull request to pull_tx, error: {:?}", e); } }); } pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) { - if let Err(e) = self.pull_tx.as_ref().unwrap().send(pull_request).await { + if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pull_request)).await { warn!("Failed to send pull request to pull_tx, error: {:?}", e); } } @@ -102,14 +95,14 @@ impl PullMessageService { let this = self.clone(); tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(time_delay)).await; - if let Err(e) = this.pop_tx.as_ref().unwrap().send(pop_request).await { + if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await { warn!("Failed to send pull request to pull_tx, error: {:?}", e); } }); } pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) { - if let Err(e) = self.pop_tx.as_ref().unwrap().send(pop_request).await { + if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pop_request)).await { warn!("Failed to send pull request to pull_tx, error: {:?}", e); } } diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index ceed2504..3fb13ee0 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -94,10 +94,7 @@ where let mut balanced = true; let sub_table = self.subscription_inner.read().await; if !sub_table.is_empty() { - let topics = sub_table - .keys() - .map(|item| item.to_string()) - .collect::>(); + let topics = sub_table.keys().cloned().collect::>(); drop(sub_table); for topic in &topics { //try_query_assignment unimplemented diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs index 9b13e522..ceb0d90b 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs @@ -112,6 +112,7 @@ impl RebalancePushImpl { self.rebalance_impl_inner.client_instance = Some(client_instance); } + #[inline] pub async fn put_subscription_data( &mut self, topic: &str, diff --git a/rocketmq-client/src/consumer/default_mq_push_consumer.rs b/rocketmq-client/src/consumer/default_mq_push_consumer.rs index 84f64963..cf38b085 100644 --- a/rocketmq-client/src/consumer/default_mq_push_consumer.rs +++ b/rocketmq-client/src/consumer/default_mq_push_consumer.rs @@ -62,6 +62,7 @@ pub struct ConsumerConfig { pub(crate) consume_from_where: ConsumeFromWhere, pub(crate) consume_timestamp: Option, pub(crate) allocate_message_queue_strategy: Option>, + //this field will be removed in a certain version after April 5, 2020 pub(crate) subscription: ArcRefCellWrapper>, pub(crate) message_listener: Option>, pub(crate) message_queue_listener: Option>>, @@ -243,6 +244,10 @@ impl ConsumerConfig { self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy); } + /** + * This method will be removed in a certain version after April 5, 2020, so please do not + * use this method. + */ pub fn set_subscription(&mut self, subscription: ArcRefCellWrapper>) { self.subscription = subscription; } @@ -414,7 +419,6 @@ impl Default for ConsumerConfig { } } -#[derive(Clone)] pub struct DefaultMQPushConsumer { client_config: ClientConfig, consumer_config: ArcRefCellWrapper, diff --git a/rocketmq-client/src/factory/mq_client_instance.rs b/rocketmq-client/src/factory/mq_client_instance.rs index 6949f58d..d840b475 100644 --- a/rocketmq-client/src/factory/mq_client_instance.rs +++ b/rocketmq-client/src/factory/mq_client_instance.rs @@ -282,12 +282,16 @@ impl MQClientInstance { if self.client_config.namesrv_addr.is_none() { self.mq_client_api_impl .as_mut() - .unwrap() + .expect("mq_client_api_impl is None") .fetch_name_server_addr() .await; } // Start request-response channel - self.mq_client_api_impl.as_mut().unwrap().start().await; + self.mq_client_api_impl + .as_mut() + .expect("mq_client_api_impl is None") + .start() + .await; // Start various schedule tasks self.start_scheduled_task(this.clone()); // Start pull service @@ -337,6 +341,7 @@ impl MQClientInstance { fn start_scheduled_task(&mut self, this: ArcRefCellWrapper) { if self.client_config.namesrv_addr.is_none() { + // Fetch name server address let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone(); self.instance_runtime.get_handle().spawn(async move { info!("ScheduledTask fetchNameServerAddr started"); @@ -352,6 +357,7 @@ impl MQClientInstance { }); } + // Update topic route info from name server let mut client_instance = this.clone(); let poll_name_server_interval = self.client_config.poll_name_server_interval; self.instance_runtime.get_handle().spawn(async move { @@ -370,6 +376,7 @@ impl MQClientInstance { } }); + // Clean offline broker and send heartbeat to all broker let mut client_instance = this.clone(); let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval; self.instance_runtime.get_handle().spawn(async move { @@ -389,6 +396,7 @@ impl MQClientInstance { } }); + // Persist all consumer offset let mut client_instance = this; let persist_consumer_offset_interval = self.client_config.persist_consumer_offset_interval as u64; diff --git a/rocketmq-common/src/common/message/message_accessor.rs b/rocketmq-common/src/common/message/message_accessor.rs index 5fedf761..f221660a 100644 --- a/rocketmq-common/src/common/message/message_accessor.rs +++ b/rocketmq-common/src/common/message/message_accessor.rs @@ -23,60 +23,172 @@ use crate::common::message::MessageTrait; pub struct MessageAccessor; impl MessageAccessor { + /// Sets the properties of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `properties` - A `HashMap` containing the properties to set. #[inline] pub fn set_properties(msg: &mut T, properties: HashMap) { msg.set_properties(properties); } + /// Puts a property into a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `name` - The name of the property. + /// * `value` - The value of the property. + #[inline] pub fn put_property(msg: &mut T, name: &str, value: &str) { msg.put_property(name, value); } + /// Clears a property from a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `name` - The name of the property to clear. + #[inline] pub fn clear_property(msg: &mut T, name: &str) { msg.clear_property(name); } + /// Sets the transfer flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `unit` - The transfer flag value. + #[inline] pub fn set_transfer_flag(msg: &mut T, unit: &str) { msg.put_property(MessageConst::PROPERTY_TRANSFER_FLAG, unit); } + /// Gets the transfer flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The transfer flag value if it exists. + #[inline] pub fn get_transfer_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_TRANSFER_FLAG) } + /// Sets the correction flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `unit` - The correction flag value. + #[inline] pub fn set_correction_flag(msg: &mut T, unit: &str) { msg.put_property(MessageConst::PROPERTY_CORRECTION_FLAG, unit); } + /// Gets the correction flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The correction flag value if it exists. + #[inline] pub fn get_correction_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_CORRECTION_FLAG) } + /// Sets the origin message ID of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `origin_message_id` - The origin message ID value. + #[inline] pub fn set_origin_message_id(msg: &mut T, origin_message_id: &str) { msg.put_property(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID, origin_message_id); } + /// Gets the origin message ID of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The origin message ID value if it exists. + #[inline] pub fn get_origin_message_id(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID) } + /// Sets the MQ2 flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `flag` - The MQ2 flag value. + #[inline] pub fn set_mq2_flag(msg: &mut T, flag: &str) { msg.put_property(MessageConst::PROPERTY_MQ2_FLAG, flag); } + /// Gets the MQ2 flag of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The MQ2 flag value if it exists. + #[inline] pub fn get_mq2_flag(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_MQ2_FLAG) } + /// Sets the reconsume time of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `reconsume_times` - The reconsume time value. + #[inline] pub fn set_reconsume_time(msg: &mut T, reconsume_times: &str) { msg.put_property(MessageConst::PROPERTY_RECONSUME_TIME, reconsume_times); } + /// Gets the reconsume time of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The reconsume time value if it exists. #[inline] pub fn get_reconsume_time(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_RECONSUME_TIME) } + /// Sets the maximum reconsume times of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `max_reconsume_times` - The maximum reconsume times value. + #[inline] pub fn set_max_reconsume_times(msg: &mut T, max_reconsume_times: &str) { msg.put_property( MessageConst::PROPERTY_MAX_RECONSUME_TIMES, @@ -84,10 +196,27 @@ impl MessageAccessor { ); } + /// Gets the maximum reconsume times of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The maximum reconsume times value if it exists. + #[inline] pub fn get_max_reconsume_times(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_MAX_RECONSUME_TIMES) } + /// Sets the consume start timestamp of a message. + /// + /// # Arguments + /// + /// * `msg` - A mutable reference to a message implementing the `MessageTrait`. + /// * `property_consume_start_time_stamp` - The consume start timestamp value. + #[inline] pub fn set_consume_start_time_stamp( msg: &mut T, property_consume_start_time_stamp: &str, @@ -98,6 +227,16 @@ impl MessageAccessor { ); } + /// Gets the consume start timestamp of a message. + /// + /// # Arguments + /// + /// * `msg` - A reference to a message implementing the `MessageTrait`. + /// + /// # Returns + /// + /// * `Option` - The consume start timestamp value if it exists. + #[inline] pub fn get_consume_start_time_stamp(msg: &T) -> Option { msg.get_property(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP) }