Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #1692]🍻Implement ChangeInvisibleTimeProcessor#append_check_point method🚀 #1734

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions rocketmq-broker/src/processor/change_invisible_time_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::log_file::MessageStore;
use rocketmq_store::pop::ack_msg::AckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager;
use tracing::error;
use tracing::info;
Expand Down Expand Up @@ -203,10 +204,12 @@
.append_check_point(
&request_header,
revive_qid,
request_header.queue_id,
request_header.offset,

Check warning on line 208 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L207-L208

Added lines #L207 - L208 were not covered by tests
now,
CheetahString::from_string(ExtraInfoUtil::get_broker_name(extra_info.as_slice())?),
)
.await;
.await?;

Check warning on line 212 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L212

Added line #L212 was not covered by tests
match ck_result.put_message_status() {
PutMessageStatus::PutOk
| PutMessageStatus::FlushDiskTimeout
Expand Down Expand Up @@ -313,12 +316,67 @@

async fn append_check_point(
&mut self,
_request_header: &ChangeInvisibleTimeRequestHeader,
_revive_qid: i32,
_pop_time: u64,
_broker_name: CheetahString,
) -> PutMessageResult {
unimplemented!("ChangeInvisibleTimeProcessor append_check_point")
request_header: &ChangeInvisibleTimeRequestHeader,
revive_qid: i32,
queue_id: i32,
offset: i64,
pop_time: u64,
broker_name: CheetahString,
) -> crate::Result<PutMessageResult> {
let mut ck = PopCheckPoint {
bit_map: 0,
num: 1,
pop_time: pop_time as i64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential overflow when casting pop_time from u64 to i64

Casting pop_time from u64 to i64 may result in an overflow if the value of pop_time exceeds i64::MAX. Consider validating pop_time before casting or using u64 for the pop_time field in PopCheckPoint.

Apply this diff to change the field type to u64:

 pub struct PopCheckPoint {
-    #[serde(rename = "pt")]
-    pub pop_time: i64,
+    #[serde(rename = "pt")]
+    pub pop_time: u64,
     // ...
 }

Committable suggestion skipped: line range outside the PR's diff.

invisible_time: request_header.invisible_time,
start_offset: offset,
cid: request_header.consumer_group.clone(),
topic: request_header.topic.clone(),
queue_id,
broker_name: Some(broker_name),
..Default::default()
};

ck.add_diff(0);

let mut inner = MessageExtBrokerInner::default();
inner.set_topic(self.revive_topic.clone());
inner.set_body(Bytes::from(ck.encode()?));
inner.message_ext_inner.queue_id = revive_qid;
inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
inner.message_ext_inner.born_host = self.store_host;
inner.message_ext_inner.store_host = self.store_host;
let deliver_time_ms = ck.get_revive_time() - PopAckConstants::ACK_TIME_INTERVAL;
let deliver_time_ms = if deliver_time_ms > 0 {
deliver_time_ms as u64

Check warning on line 351 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L319-L351

Added lines #L319 - L351 were not covered by tests
} else {
0

Check warning on line 353 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L353

Added line #L353 was not covered by tests
};
inner.set_delay_time_ms(deliver_time_ms);
inner.message_ext_inner.put_property(
CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
CheetahString::from(PopMessageProcessor::gen_ck_unique_id(&ck)),
);
inner.properties_string =
message_decoder::message_properties_to_string(inner.get_properties());
let put_message_result = self
.escape_bridge
.put_message_to_specific_queue(inner)
.await;
if self.broker_config.enable_pop_log {
info!(
"change Invisible , appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, \
startOffset {}, rt {}, result {}",
request_header.topic,
queue_id,
revive_qid,
request_header.consumer_group,
offset,
ck.get_revive_time(),
put_message_result.put_message_status()

Check warning on line 376 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L355-L376

Added lines #L355 - L376 were not covered by tests
)
}
Ok(put_message_result)

Check warning on line 379 in rocketmq-broker/src/processor/change_invisible_time_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/change_invisible_time_processor.rs#L378-L379

Added lines #L378 - L379 were not covered by tests
}

async fn process_change_invisible_time_for_order(
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub struct BrokerConfig {
pub load_balance_poll_name_server_interval: u64,
pub server_load_balancer_enable: bool,
pub enable_remote_escape: bool,
pub enable_pop_log: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure enable_pop_log is included in properties serialization

The new field enable_pop_log is added to BrokerConfig but is not included in the get_properties method. If this property needs to be shared or persisted, consider adding it to the properties map.

Apply this diff to include enable_pop_log in get_properties:

         properties.insert(
             "forwardTimeout".into(),
             self.forward_timeout.to_string().into(),
         );
+        properties.insert(
+            "enablePopLog".into(),
+            self.enable_pop_log.to_string().into(),
+        );
     }

Committable suggestion skipped: line range outside the PR's diff.

}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -266,6 +267,7 @@ impl Default for BrokerConfig {
load_balance_poll_name_server_interval: 30_000,
server_load_balancer_enable: true,
enable_remote_escape: false,
enable_pop_log: false,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-store/src/pop/pop_check_point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
}
i32::MAX
}

pub fn get_revive_time(&self) -> i64 {
self.pop_time + self.invisible_time
}

Check warning on line 104 in rocketmq-store/src/pop/pop_check_point.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/pop/pop_check_point.rs#L102-L104

Added lines #L102 - L104 were not covered by tests
Comment on lines +102 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential integer overflow in get_revive_time

Adding pop_time and invisible_time may cause an overflow if the sum exceeds i64::MAX. To prevent this, consider using checked_add to handle potential overflows safely.

Apply this diff to use checked_add and handle the overflow:

-    pub fn get_revive_time(&self) -> i64 {
-        self.pop_time + self.invisible_time
+    pub fn get_revive_time(&self) -> Option<i64> {
+        self.pop_time.checked_add(self.invisible_time)
    }

Ensure to handle the Option<i64> return type appropriately in the calling code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_revive_time(&self) -> i64 {
self.pop_time + self.invisible_time
}
pub fn get_revive_time(&self) -> Option<i64> {
self.pop_time.checked_add(self.invisible_time)
}

}

impl Ord for PopCheckPoint {
Expand Down
Loading