Skip to content

Commit

Permalink
[ISSUE #2340]🤡Add ReplicasInfo for rust🧑‍💻 (#2343)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 19, 2025
1 parent 09f394e commit 8ce52c8
Showing 1 changed file with 211 additions and 0 deletions.
211 changes: 211 additions & 0 deletions rocketmq-remoting/src/protocol/body/broker_replicas_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,121 @@ use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ReplicasInfo {
master_broker_id: u64,
master_address: CheetahString,
master_epoch: i32,
sync_state_set_epoch: i32,
in_sync_replicas: Vec<ReplicaIdentity>,
not_in_sync_replicas: Vec<ReplicaIdentity>,
}

impl ReplicasInfo {
pub fn new(
master_broker_id: u64,
master_address: impl Into<CheetahString>,
master_epoch: i32,
sync_state_set_epoch: i32,
in_sync_replicas: Vec<ReplicaIdentity>,
not_in_sync_replicas: Vec<ReplicaIdentity>,
) -> Self {
Self {
master_broker_id,
master_address: master_address.into(),
master_epoch,
sync_state_set_epoch,
in_sync_replicas,
not_in_sync_replicas,
}
}

pub fn get_master_address(&self) -> &str {
&self.master_address
}

pub fn set_master_address(&mut self, master_address: impl Into<CheetahString>) {
self.master_address = master_address.into();
}

pub fn get_master_epoch(&self) -> i32 {
self.master_epoch
}

pub fn set_master_epoch(&mut self, master_epoch: i32) {
self.master_epoch = master_epoch;
}

pub fn get_sync_state_set_epoch(&self) -> i32 {
self.sync_state_set_epoch
}

pub fn set_sync_state_set_epoch(&mut self, sync_state_set_epoch: i32) {
self.sync_state_set_epoch = sync_state_set_epoch;
}

pub fn get_in_sync_replicas(&self) -> &Vec<ReplicaIdentity> {
&self.in_sync_replicas
}

pub fn set_in_sync_replicas(&mut self, in_sync_replicas: Vec<ReplicaIdentity>) {
self.in_sync_replicas = in_sync_replicas;
}

pub fn get_not_in_sync_replicas(&self) -> &Vec<ReplicaIdentity> {
&self.not_in_sync_replicas
}

pub fn set_not_in_sync_replicas(&mut self, not_in_sync_replicas: Vec<ReplicaIdentity>) {
self.not_in_sync_replicas = not_in_sync_replicas;
}

pub fn get_master_broker_id(&self) -> u64 {
self.master_broker_id
}

pub fn set_master_broker_id(&mut self, master_broker_id: u64) {
self.master_broker_id = master_broker_id;
}

pub fn is_exist_in_sync(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.in_sync_replicas.iter().any(|replica| {
replica.broker_name == broker_name
&& replica.broker_id == broker_id
&& replica.broker_address == broker_address
})
}

pub fn is_exist_in_not_sync(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.not_in_sync_replicas.iter().any(|replica| {
replica.broker_name == broker_name
&& replica.broker_id == broker_id
&& replica.broker_address == broker_address
})
}

pub fn is_exist_in_all_replicas(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.is_exist_in_sync(broker_name, broker_id, broker_address)
|| self.is_exist_in_not_sync(broker_name, broker_id, broker_address)
}
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ReplicaIdentity {
Expand Down Expand Up @@ -173,4 +288,100 @@ mod tests {
alive: true }"
);
}

#[test]
fn new_creates_instance_with_all_fields() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info = ReplicasInfo::new(
1,
"master_address",
100,
200,
in_sync_replicas.clone(),
not_in_sync_replicas.clone(),
);
assert_eq!(replicas_info.get_master_broker_id(), 1);
assert_eq!(replicas_info.get_master_address(), "master_address");
assert_eq!(replicas_info.get_master_epoch(), 100);
assert_eq!(replicas_info.get_sync_state_set_epoch(), 200);
}

#[test]
fn set_master_address_updates_master_address() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_address("new_master_address");
assert_eq!(replicas_info.get_master_address(), "new_master_address");
}

#[test]
fn set_master_epoch_updates_master_epoch() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_epoch(101);
assert_eq!(replicas_info.get_master_epoch(), 101);
}

#[test]
fn set_sync_state_set_epoch_updates_sync_state_set_epoch() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_sync_state_set_epoch(201);
assert_eq!(replicas_info.get_sync_state_set_epoch(), 201);
}

#[test]
fn set_master_broker_id_updates_master_broker_id() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_broker_id(2);
assert_eq!(replicas_info.get_master_broker_id(), 2);
}

#[test]
fn is_exist_in_sync_returns_true_for_existing_replica() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, in_sync_replicas, vec![]);
assert!(replicas_info.is_exist_in_sync("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_sync_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_sync("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_not_sync_returns_true_for_existing_replica() {
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, vec![], not_in_sync_replicas);
assert!(replicas_info.is_exist_in_not_sync("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_not_sync_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_not_sync("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_all_replicas_returns_true_for_existing_replica_in_sync() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, in_sync_replicas, vec![]);
assert!(replicas_info.is_exist_in_all_replicas("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_all_replicas_returns_true_for_existing_replica_in_not_sync() {
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, vec![], not_in_sync_replicas);
assert!(replicas_info.is_exist_in_all_replicas("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_all_replicas_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_all_replicas("broker1", 1, "address1"));
}
}

0 comments on commit 8ce52c8

Please # to comment.