Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #2340]🤡Add ReplicasInfo for rust🧑‍💻 #2343

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions rocketmq-remoting/src/protocol/body/broker_replicas_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,121 @@
use serde::Deserialize;
use serde::Serialize;

#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ReplicasInfo {
master_broker_id: u64,
master_address: CheetahString,
master_epoch: i32,
sync_state_set_epoch: i32,
in_sync_replicas: Vec<ReplicaIdentity>,
not_in_sync_replicas: Vec<ReplicaIdentity>,
}

impl ReplicasInfo {
pub fn new(
master_broker_id: u64,
master_address: impl Into<CheetahString>,
master_epoch: i32,
sync_state_set_epoch: i32,
in_sync_replicas: Vec<ReplicaIdentity>,
not_in_sync_replicas: Vec<ReplicaIdentity>,
) -> Self {
Self {
master_broker_id,
master_address: master_address.into(),
master_epoch,
sync_state_set_epoch,
in_sync_replicas,
not_in_sync_replicas,
}
}

pub fn get_master_address(&self) -> &str {
&self.master_address
}

pub fn set_master_address(&mut self, master_address: impl Into<CheetahString>) {
self.master_address = master_address.into();
}

pub fn get_master_epoch(&self) -> i32 {
self.master_epoch
}

pub fn set_master_epoch(&mut self, master_epoch: i32) {
self.master_epoch = master_epoch;
}

pub fn get_sync_state_set_epoch(&self) -> i32 {
self.sync_state_set_epoch
}

pub fn set_sync_state_set_epoch(&mut self, sync_state_set_epoch: i32) {
self.sync_state_set_epoch = sync_state_set_epoch;
}

pub fn get_in_sync_replicas(&self) -> &Vec<ReplicaIdentity> {
&self.in_sync_replicas
}

Check warning on line 80 in rocketmq-remoting/src/protocol/body/broker_replicas_info.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/broker_replicas_info.rs#L78-L80

Added lines #L78 - L80 were not covered by tests

pub fn set_in_sync_replicas(&mut self, in_sync_replicas: Vec<ReplicaIdentity>) {
self.in_sync_replicas = in_sync_replicas;
}

Check warning on line 84 in rocketmq-remoting/src/protocol/body/broker_replicas_info.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/broker_replicas_info.rs#L82-L84

Added lines #L82 - L84 were not covered by tests

pub fn get_not_in_sync_replicas(&self) -> &Vec<ReplicaIdentity> {
&self.not_in_sync_replicas
}

Check warning on line 88 in rocketmq-remoting/src/protocol/body/broker_replicas_info.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/broker_replicas_info.rs#L86-L88

Added lines #L86 - L88 were not covered by tests

pub fn set_not_in_sync_replicas(&mut self, not_in_sync_replicas: Vec<ReplicaIdentity>) {
self.not_in_sync_replicas = not_in_sync_replicas;
}

Check warning on line 92 in rocketmq-remoting/src/protocol/body/broker_replicas_info.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-remoting/src/protocol/body/broker_replicas_info.rs#L90-L92

Added lines #L90 - L92 were not covered by tests

pub fn get_master_broker_id(&self) -> u64 {
self.master_broker_id
}

pub fn set_master_broker_id(&mut self, master_broker_id: u64) {
self.master_broker_id = master_broker_id;
}

pub fn is_exist_in_sync(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.in_sync_replicas.iter().any(|replica| {
replica.broker_name == broker_name
&& replica.broker_id == broker_id
&& replica.broker_address == broker_address
})
}

pub fn is_exist_in_not_sync(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.not_in_sync_replicas.iter().any(|replica| {
replica.broker_name == broker_name
&& replica.broker_id == broker_id
&& replica.broker_address == broker_address
})
}

pub fn is_exist_in_all_replicas(
&self,
broker_name: &str,
broker_id: u64,
broker_address: &str,
) -> bool {
self.is_exist_in_sync(broker_name, broker_id, broker_address)
|| self.is_exist_in_not_sync(broker_name, broker_id, broker_address)
}
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ReplicaIdentity {
Expand Down Expand Up @@ -173,4 +288,100 @@
alive: true }"
);
}

#[test]
fn new_creates_instance_with_all_fields() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info = ReplicasInfo::new(
1,
"master_address",
100,
200,
in_sync_replicas.clone(),
not_in_sync_replicas.clone(),
);
assert_eq!(replicas_info.get_master_broker_id(), 1);
assert_eq!(replicas_info.get_master_address(), "master_address");
assert_eq!(replicas_info.get_master_epoch(), 100);
assert_eq!(replicas_info.get_sync_state_set_epoch(), 200);
}

#[test]
fn set_master_address_updates_master_address() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_address("new_master_address");
assert_eq!(replicas_info.get_master_address(), "new_master_address");
}

#[test]
fn set_master_epoch_updates_master_epoch() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_epoch(101);
assert_eq!(replicas_info.get_master_epoch(), 101);
}

#[test]
fn set_sync_state_set_epoch_updates_sync_state_set_epoch() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_sync_state_set_epoch(201);
assert_eq!(replicas_info.get_sync_state_set_epoch(), 201);
}

#[test]
fn set_master_broker_id_updates_master_broker_id() {
let mut replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
replicas_info.set_master_broker_id(2);
assert_eq!(replicas_info.get_master_broker_id(), 2);
}

#[test]
fn is_exist_in_sync_returns_true_for_existing_replica() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, in_sync_replicas, vec![]);
assert!(replicas_info.is_exist_in_sync("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_sync_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_sync("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_not_sync_returns_true_for_existing_replica() {
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, vec![], not_in_sync_replicas);
assert!(replicas_info.is_exist_in_not_sync("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_not_sync_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_not_sync("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_all_replicas_returns_true_for_existing_replica_in_sync() {
let in_sync_replicas = vec![ReplicaIdentity::new("broker1", 1, "address1")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, in_sync_replicas, vec![]);
assert!(replicas_info.is_exist_in_all_replicas("broker1", 1, "address1"));
}

#[test]
fn is_exist_in_all_replicas_returns_true_for_existing_replica_in_not_sync() {
let not_in_sync_replicas = vec![ReplicaIdentity::new("broker2", 2, "address2")];
let replicas_info =
ReplicasInfo::new(1, "master_address", 100, 200, vec![], not_in_sync_replicas);
assert!(replicas_info.is_exist_in_all_replicas("broker2", 2, "address2"));
}

#[test]
fn is_exist_in_all_replicas_returns_false_for_non_existing_replica() {
let replicas_info = ReplicasInfo::new(1, "master_address", 100, 200, vec![], vec![]);
assert!(!replicas_info.is_exist_in_all_replicas("broker1", 1, "address1"));
}
}
Loading