Skip to content

Commit

Permalink
[ISSUE #1507]♻️Refactor BrokerHeartbeatRequestHeader with derive marc…
Browse files Browse the repository at this point in the history
…o RequestHeaderCodec
  • Loading branch information
Happy-debug-lang authored Dec 5, 2024
1 parent fa1050c commit 1b0556b
Showing 1 changed file with 157 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -1,151 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use anyhow::Error;
use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct BrokerHeartbeatRequestHeader {
#[serde(rename = "clusterName")]
pub cluster_name: CheetahString,
#[serde(rename = "brokerAddr")]
pub broker_addr: CheetahString,
#[serde(rename = "brokerName")]
pub broker_name: CheetahString,
#[serde(rename = "brokerId")]
pub broker_id: Option<i64>,
pub epoch: Option<i32>,
#[serde(rename = "maxOffset")]
pub max_offset: Option<i64>,
#[serde(rename = "confirmOffset")]
pub confirm_offset: Option<i64>,
#[serde(rename = "heartbeatTimeoutMills")]
pub heartbeat_timeout_mills: Option<i64>,
#[serde(rename = "electionPriority")]
pub election_priority: Option<i32>,
}

impl BrokerHeartbeatRequestHeader {
const BROKER_ADDR: &'static str = "brokerAddr";
const BROKER_ID: &'static str = "brokerId";
const BROKER_NAME: &'static str = "brokerName";
const CLUSTER_NAME: &'static str = "clusterName";
const CONFIRM_OFFSET: &'static str = "confirmOffset";
const ELECTION_PRIORITY: &'static str = "electionPriority";
const EPOCH: &'static str = "epoch";
const HEARTBEAT_TIMEOUT_MILLIS: &'static str = "heartbeatTimeoutMills";
const MAX_OFFSET: &'static str = "maxOffset";

pub fn new(
cluster_name: CheetahString,
broker_addr: CheetahString,
broker_name: CheetahString,
broker_id: Option<i64>,
epoch: Option<i32>,
max_offset: Option<i64>,
confirm_offset: Option<i64>,
heartbeat_timeout_mills: Option<i64>,
election_priority: Option<i32>,
) -> Self {
Self {
cluster_name,
broker_addr,
broker_name,
broker_id,
epoch,
max_offset,
confirm_offset,
heartbeat_timeout_mills,
election_priority,
}
}
}

impl FromMap for BrokerHeartbeatRequestHeader {
type Error = crate::remoting_error::RemotingError;

type Target = BrokerHeartbeatRequestHeader;

fn from(map: &HashMap<CheetahString, CheetahString>) -> Result<Self::Target, Self::Error> {
Ok(BrokerHeartbeatRequestHeader {
cluster_name: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::CLUSTER_NAME,
))
.cloned()
.unwrap_or_default(),
broker_addr: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::BROKER_ADDR,
))
.cloned()
.unwrap_or_default(),
broker_name: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::BROKER_NAME,
))
.cloned()
.unwrap_or_default(),
broker_id: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::BROKER_ID,
))
.and_then(|s| s.parse::<i64>().ok()),
epoch: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::EPOCH,
))
.and_then(|s| s.parse::<i32>().ok()),
max_offset: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::MAX_OFFSET,
))
.and_then(|s| s.parse::<i64>().ok()),
confirm_offset: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::CONFIRM_OFFSET,
))
.and_then(|s| s.parse::<i64>().ok()),
heartbeat_timeout_mills: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::HEARTBEAT_TIMEOUT_MILLIS,
))
.and_then(|s| s.parse::<i64>().ok()),
election_priority: map
.get(&CheetahString::from_static_str(
BrokerHeartbeatRequestHeader::ELECTION_PRIORITY,
))
.and_then(|s| s.parse::<i32>().ok()),
})
}
}

impl CommandCustomHeader for BrokerHeartbeatRequestHeader {
fn check_fields(&self) -> anyhow::Result<(), Error> {
todo!()
}

fn to_map(&self) -> Option<HashMap<CheetahString, CheetahString>> {
todo!()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use cheetah_string::CheetahString;
use rocketmq_macros::RequestHeaderCodec;
use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)]

Check warning on line 23 in rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs#L23

Added line #L23 was not covered by tests
pub struct BrokerHeartbeatRequestHeader {
#[serde(rename = "clusterName")]
#[required]
pub cluster_name: CheetahString,

#[serde(rename = "brokerAddr")]
#[required]
pub broker_addr: CheetahString,

#[serde(rename = "brokerName")]
#[required]
pub broker_name: CheetahString,

#[serde(rename = "brokerId")]
pub broker_id: Option<i64>,

pub epoch: Option<i32>,

#[serde(rename = "maxOffset")]
pub max_offset: Option<i64>,

#[serde(rename = "confirmOffset")]
pub confirm_offset: Option<i64>,

#[serde(rename = "heartbeatTimeoutMills")]
pub heartbeat_timeout_mills: Option<i64>,

#[serde(rename = "electionPriority")]
pub election_priority: Option<i32>,
}

#[cfg(test)]
mod tests {
use cheetah_string::CheetahString;

use super::*;

#[test]
fn broker_heartbeat_request_header_with_required_fields() {
let header = BrokerHeartbeatRequestHeader {
cluster_name: CheetahString::from("testCluster"),
broker_addr: CheetahString::from("testAddr"),
broker_name: CheetahString::from("testBroker"),
broker_id: Some(1),
epoch: Some(1),
max_offset: Some(100),
confirm_offset: Some(50),
heartbeat_timeout_mills: Some(3000),
election_priority: Some(1),
};
assert_eq!(header.cluster_name, CheetahString::from("testCluster"));
assert_eq!(header.broker_addr, CheetahString::from("testAddr"));
assert_eq!(header.broker_name, CheetahString::from("testBroker"));
assert_eq!(header.broker_id, Some(1));
assert_eq!(header.epoch, Some(1));
assert_eq!(header.max_offset, Some(100));
assert_eq!(header.confirm_offset, Some(50));
assert_eq!(header.heartbeat_timeout_mills, Some(3000));
assert_eq!(header.election_priority, Some(1));
}

#[test]
fn broker_heartbeat_request_header_with_optional_fields() {
let header = BrokerHeartbeatRequestHeader {
cluster_name: CheetahString::from("testCluster"),
broker_addr: CheetahString::from("testAddr"),
broker_name: CheetahString::from("testBroker"),
broker_id: None,
epoch: None,
max_offset: None,
confirm_offset: None,
heartbeat_timeout_mills: None,
election_priority: None,
};
assert_eq!(header.cluster_name, CheetahString::from("testCluster"));
assert_eq!(header.broker_addr, CheetahString::from("testAddr"));
assert_eq!(header.broker_name, CheetahString::from("testBroker"));
assert!(header.broker_id.is_none());
assert!(header.epoch.is_none());
assert!(header.max_offset.is_none());
assert!(header.confirm_offset.is_none());
assert!(header.heartbeat_timeout_mills.is_none());
assert!(header.election_priority.is_none());
}

#[test]
fn broker_heartbeat_request_header_with_empty_values() {
let header = BrokerHeartbeatRequestHeader {
cluster_name: CheetahString::from(""),
broker_addr: CheetahString::from(""),
broker_name: CheetahString::from(""),
broker_id: None,
epoch: None,
max_offset: None,
confirm_offset: None,
heartbeat_timeout_mills: None,
election_priority: None,
};
assert_eq!(header.cluster_name, CheetahString::from(""));
assert_eq!(header.broker_addr, CheetahString::from(""));
assert_eq!(header.broker_name, CheetahString::from(""));
assert!(header.broker_id.is_none());
assert!(header.epoch.is_none());
assert!(header.max_offset.is_none());
assert!(header.confirm_offset.is_none());
assert!(header.heartbeat_timeout_mills.is_none());
assert!(header.election_priority.is_none());
}

#[test]
fn broker_heartbeat_request_header_with_long_values() {
let long_string = "a".repeat(1000);
let header = BrokerHeartbeatRequestHeader {
cluster_name: CheetahString::from(&long_string),
broker_addr: CheetahString::from(&long_string),
broker_name: CheetahString::from(&long_string),
broker_id: Some(1),
epoch: Some(1),
max_offset: Some(100),
confirm_offset: Some(50),
heartbeat_timeout_mills: Some(3000),
election_priority: Some(1),
};
assert_eq!(header.cluster_name, CheetahString::from(&long_string));
assert_eq!(header.broker_addr, CheetahString::from(&long_string));
assert_eq!(header.broker_name, CheetahString::from(&long_string));
assert_eq!(header.broker_id, Some(1));
assert_eq!(header.epoch, Some(1));
assert_eq!(header.max_offset, Some(100));
assert_eq!(header.confirm_offset, Some(50));
assert_eq!(header.heartbeat_timeout_mills, Some(3000));
assert_eq!(header.election_priority, Some(1));
}
}

0 comments on commit 1b0556b

Please # to comment.