From 8aabc8af27162571db6c884f1f3bcb042d768616 Mon Sep 17 00:00:00 2001 From: frankchen7788 <44548832+frankchen7788@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:22:28 +0800 Subject: [PATCH 1/3] fix the bug " subtract with overflow" fix bug --- rocketmq-client/examples/quickstart/consumer.rs | 2 +- .../allocate_message_queue_averagely.rs | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index 3d5e236c..b0f46500 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -58,7 +58,7 @@ impl MessageListenerConcurrently for MyMessageListener { _context: &ConsumeConcurrentlyContext, ) -> Result { for msg in msgs { - info!("Receive message: {:?}", msg); + info!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaReceive message: {:?}", msg); } Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) } diff --git a/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs index 8ed6e069..b1da778f 100644 --- a/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs +++ b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs @@ -52,7 +52,13 @@ impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely { } else { index * average_size + mod_val }; - let range = average_size.min(mq_all.len() - start_index); + //fix the bug " subtract with overflow" caused by (mq_all.len() - start_index ) + let mut range: usize = 0; + if mq_all.len() > start_index { + range = average_size.min(mq_all.len() - start_index); + } + //in case of mq_all.len() < start_index , means the customers is much more than queue + // so let range ==0 ,the for loop not work, then no queue alloced to this customerID for i in 0..range { result.push(mq_all[start_index + i].clone()); } From a341e29210824e96d58435fa3e7869b92de19a11 Mon Sep 17 00:00:00 2001 From: frankchen7788 <44548832+frankchen7788@users.noreply.github.com> Date: Fri, 1 Nov 2024 21:59:56 +0800 Subject: [PATCH 2/3] remove comment remove comment --- rocketmq-client/examples/quickstart/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-client/examples/quickstart/consumer.rs b/rocketmq-client/examples/quickstart/consumer.rs index b0f46500..3d5e236c 100644 --- a/rocketmq-client/examples/quickstart/consumer.rs +++ b/rocketmq-client/examples/quickstart/consumer.rs @@ -58,7 +58,7 @@ impl MessageListenerConcurrently for MyMessageListener { _context: &ConsumeConcurrentlyContext, ) -> Result { for msg in msgs { - info!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaReceive message: {:?}", msg); + info!("Receive message: {:?}", msg); } Ok(ConsumeConcurrentlyStatus::ConsumeSuccess) } From 12bc2eb219da8a76fa924e6985ade6ae7c59a545 Mon Sep 17 00:00:00 2001 From: frankchen7788 <44548832+frankchen7788@users.noreply.github.com> Date: Fri, 1 Nov 2024 22:03:17 +0800 Subject: [PATCH 3/3] fix the bug " subtract with overflow" fix the bug " subtract with overflow" --- .../rebalance_strategy/allocate_message_queue_averagely.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs index b1da778f..7e9e8f1a 100644 --- a/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs +++ b/rocketmq-client/src/consumer/rebalance_strategy/allocate_message_queue_averagely.rs @@ -52,13 +52,14 @@ impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragely { } else { index * average_size + mod_val }; + //let range = average_size.min(mq_all.len() - start_index); //fix the bug " subtract with overflow" caused by (mq_all.len() - start_index ) let mut range: usize = 0; if mq_all.len() > start_index { range = average_size.min(mq_all.len() - start_index); } //in case of mq_all.len() < start_index , means the customers is much more than queue - // so let range ==0 ,the for loop not work, then no queue alloced to this customerID + // so let range ==0 ,the for loop not work, and then no queue alloced to this customerID for i in 0..range { result.push(mq_all[start_index + i].clone()); }