Skip to content

Commit

Permalink
[ISSUE #1106]Fix When topic not create the client can not consume (#1108
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mxsm authored Nov 2, 2024
1 parent 1894168 commit 2e8e9ed
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 9 deletions.
3 changes: 2 additions & 1 deletion rocketmq-client/examples/quickstart/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub const TAG: &str = "*";
#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();
//rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQPushConsumer::builder();
Expand All @@ -59,6 +59,7 @@ impl MessageListenerConcurrently for MyMessageListener {
) -> Result<ConsumeConcurrentlyStatus> {
for msg in msgs {
info!("Receive message: {:?}", msg);
println!("Receive message: {:?}", msg);
}
Ok(ConsumeConcurrentlyStatus::ConsumeSuccess)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl {
}
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) {
let sub_table = self.rebalance_impl.get_subscription_inner();
let sub_table_inner = sub_table.read().await;
if sub_table_inner.contains_key(topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,16 @@ where
}
}
MessageModel::Clustering => {
let topic_sub_cloned = self.topic_subscribe_info_table.clone();
let topic_subscribe_info_table_inner = topic_sub_cloned.read().await;
let mq_set = topic_subscribe_info_table_inner.get(topic);
//get consumer id list from broker
let cid_all = self
.client_instance
.as_mut()
.unwrap()
.find_consumer_id_list(topic, self.consumer_group.as_ref().unwrap())
.await;
let topic_sub_cloned = self.topic_subscribe_info_table.clone();
let topic_subscribe_info_table_inner = topic_sub_cloned.read().await;
let mq_set = topic_subscribe_info_table_inner.get(topic);
if mq_set.is_none() && !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
if let Some(mut sub_rebalance_impl) =
self.sub_rebalance_impl.as_ref().unwrap().upgrade()
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/consumer/mq_consumer_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub trait MQConsumerInnerLocal: MQConsumerInnerAny + Sync + 'static {
///
/// * `topic` - A string slice that holds the name of the topic.
/// * `info` - A reference to a `HashSet` containing `MessageQueue` information.
async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>);
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>);

/// Checks if the subscription information for a given topic needs to be updated asynchronously.
///
Expand Down Expand Up @@ -204,7 +204,7 @@ impl MQConsumerInner for MQConsumerInnerImpl {
panic!("default_mqpush_consumer_impl is None");
}

async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {
async fn update_topic_subscribe_info(&self, topic: &str, info: &HashSet<MessageQueue>) {
if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
if let Some(mut default_mqpush_consumer_impl) = default_mqpush_consumer_impl.upgrade() {
return MQConsumerInner::update_topic_subscribe_info(
Expand Down
6 changes: 4 additions & 2 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ impl MQClientInstance {
broker_version_table: Arc::new(Default::default()),
send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
});
let instance_clone = instance.clone();
instance.mq_admin_impl.set_client(instance_clone);
let weak_instance = ArcRefCellWrapper::downgrade(&instance);
let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);

Expand Down Expand Up @@ -583,11 +585,11 @@ impl MQClientInstance {

// Update sub info
{
let mut consumer_table = self.consumer_table.write().await;
let consumer_table = self.consumer_table.read().await;
if !consumer_table.is_empty() {
let subscribe_info =
topic_route_data2topic_subscribe_info(topic, &topic_route_data);
for (_, value) in consumer_table.iter_mut() {
for (_, value) in consumer_table.iter() {
value
.update_topic_subscribe_info(topic, &subscribe_info)
.await;
Expand Down
31 changes: 31 additions & 0 deletions rocketmq-client/src/implementation/mq_admin_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,26 @@ use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
use crate::base::client_config::ClientConfig;
use crate::error::MQClientError::MQClientErr;
use crate::factory::mq_client_instance;
use crate::factory::mq_client_instance::MQClientInstance;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::Result;

pub struct MQAdminImpl {
timeout_millis: u64,
client: Option<ArcRefCellWrapper<MQClientInstance>>,
}

impl MQAdminImpl {
pub fn new() -> Self {
MQAdminImpl {
timeout_millis: 60000,
client: None,
}
}

pub fn set_client(&mut self, client: ArcRefCellWrapper<MQClientInstance>) {
self.client = Some(client);
}
}

impl MQAdminImpl {
Expand Down Expand Up @@ -93,6 +100,30 @@ impl MQAdminImpl {
}

pub async fn max_offset(&mut self, mq: &MessageQueue) -> Result<i64> {
let client = self.client.as_mut().expect("client is None");
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let mut broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
if broker_addr.is_none() {
client
.update_topic_route_info_from_name_server_topic(mq.get_topic())
.await;
let broker_name = client.get_broker_name_from_message_queue(mq).await;
broker_addr = client
.find_broker_address_in_publish(broker_name.as_str())
.await;
}
if let Some(ref broker_addr) = broker_addr {
let offset = client
.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.get_max_offset(broker_addr, mq, self.timeout_millis)
.await?;
return Ok(offset);
}

unimplemented!("max_offset")
}
pub async fn search_offset(&mut self, mq: &MessageQueue, timestamp: u64) -> Result<i64> {
Expand Down
49 changes: 49 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use rocketmq_remoting::protocol::header::client_request_header::GetRouteInfoRequ
use rocketmq_remoting::protocol::header::consumer_send_msg_back_request_header::ConsumerSendMsgBackRequestHeader;
use rocketmq_remoting::protocol::header::end_transaction_request_header::EndTransactionRequestHeader;
use rocketmq_remoting::protocol::header::get_consumer_listby_group_request_header::GetConsumerListByGroupRequestHeader;
use rocketmq_remoting::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader;
use rocketmq_remoting::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatchMqRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
Expand All @@ -68,6 +70,7 @@ use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::protocol::RemotingSerializable;
use rocketmq_remoting::remoting::RemotingService;
use rocketmq_remoting::rpc::rpc_request_header::RpcRequestHeader;
use rocketmq_remoting::rpc::topic_request_header::TopicRequestHeader;
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
use rocketmq_remoting::runtime::RPCHook;
use tracing::error;
Expand Down Expand Up @@ -1147,4 +1150,50 @@ impl MQClientAPIImpl {
.await;
Ok(())
}

pub async fn get_max_offset(
&mut self,
addr: &str,
message_queue: &MessageQueue,
timeout_millis: u64,
) -> Result<i64> {
let request_header = GetMaxOffsetRequestHeader {
topic: message_queue.get_topic().to_string(),
queue_id: message_queue.get_queue_id(),
committed: false,
topic_request_header: Some(TopicRequestHeader {
rpc_request_header: Some(RpcRequestHeader {
broker_name: Some(message_queue.get_broker_name().to_string()),
..Default::default()
}),
lo: None,
}),
};

let request =
RemotingCommand::create_request_command(RequestCode::GetMaxOffset, request_header);

let response = self
.remoting_client
.invoke_async(
Some(mix_all::broker_vip_channel(
self.client_config.vip_channel_enabled,
addr,
)),
request,
timeout_millis,
)
.await?;
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetMaxOffsetResponseHeader>()
.expect("decode error");
return Ok(response_header.offset);
}
Err(MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
addr.to_string(),
))
}
}

0 comments on commit 2e8e9ed

Please # to comment.