diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index 2b62a7286..d89faf72b 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -126,20 +126,20 @@ fn on_ready(r: &mut RawNode, cbs: &mut HashMap) } } - if !raft::is_empty_snap(&ready.snapshot) { + if !raft::is_empty_snap(ready.snapshot()) { // This is a snapshot, we need to apply the snapshot at first. r.mut_store() .wl() - .apply_snapshot(ready.snapshot.clone()) + .apply_snapshot(ready.snapshot().clone()) .unwrap(); } - if !ready.entries.is_empty() { + if !ready.entries().is_empty() { // Append entries to the Raft log - r.mut_store().wl().append(&ready.entries).unwrap(); + r.mut_store().wl().append(ready.entries()).unwrap(); } - if let Some(ref hs) = ready.hs { + if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. r.mut_store().wl().set_hardstate(hs.clone()); } diff --git a/src/lib.rs b/src/lib.rs index 21d4446c7..805e05d3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,11 +185,11 @@ The `Ready` state contains quite a bit of information, and you need to check and 1. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has received a Raft snapshot from the leader and we must apply the snapshot: ```rust,ignore - if !raft::is_empty_snap(&ready.snapshot) { + if !raft::is_empty_snap(ready.snapshot()) { // This is a snapshot, we need to apply the snapshot at first. node.mut_store() .wl() - .apply_snapshot(ready.snapshot.clone()) + .apply_snapshot(ready.snapshot().clone()) .unwrap(); } @@ -200,7 +200,7 @@ The `Ready` state contains quite a bit of information, and you need to check and ```rust,ignore if !ready.entries.is_empty() { // Append entries to the Raft log - node.mut_store().wl().append(&ready.entries).unwrap(); + node.mut_store().wl().append(ready.entries()).unwrap(); } ``` @@ -208,7 +208,7 @@ The `Ready` state contains quite a bit of information, and you need to check and 3. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node has changed. For example, the node may vote for a new leader, or the commit index has been increased. We must persist the changed `HardState`: ```rust,ignore - if let Some(ref hs) = ready.hs { + if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. node.mut_store().wl().set_hardstate(hs.clone()); } diff --git a/src/raw_node.rs b/src/raw_node.rs index 3478289cc..aaecaf5f4 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -96,28 +96,15 @@ pub fn is_empty_snap(s: &Snapshot) -> bool { /// All fields in Ready are read-only. #[derive(Default, Debug, PartialEq)] pub struct Ready { - /// The current volatile state of a Node. - /// SoftState will be nil if there is no update. - /// It is not required to consume or store SoftState. - pub ss: Option, + ss: Option, - /// The current state of a Node to be saved to stable storage BEFORE - /// Messages are sent. - /// HardState will be equal to empty state if there is no update. - pub hs: Option, + hs: Option, - /// States can be used for node to serve linearizable read requests locally - /// when its applied index is greater than the index in ReadState. - /// Note that the read_state will be returned when raft receives MsgReadIndex. - /// The returned is only valid for the request that requested to read. - pub read_states: Vec, + read_states: Vec, - /// Entries specifies entries to be saved to stable storage BEFORE - /// Messages are sent. - pub entries: Vec, + entries: Vec, - /// Snapshot specifies the snapshot to be saved to stable storage. - pub snapshot: Snapshot, + snapshot: Snapshot, /// CommittedEntries specifies entries to be committed to a /// store/state-machine. These have previously been committed to stable @@ -130,9 +117,7 @@ pub struct Ready { /// when the snapshot has been received or has failed by calling ReportSnapshot. pub messages: Vec, - /// MustSync indicates whether the HardState and Entries must be synchronously - /// written to disk or if an asynchronous write is permissible. - pub must_sync: bool, + must_sync: bool, } impl Ready { @@ -174,6 +159,51 @@ impl Ready { } rd } + + /// The current volatile state of a Node. + /// SoftState will be nil if there is no update. + /// It is not required to consume or store SoftState. + #[inline] + pub fn ss(&self) -> Option<&SoftState> { + self.ss.as_ref() + } + + /// The current state of a Node to be saved to stable storage BEFORE + /// Messages are sent. + /// HardState will be equal to empty state if there is no update. + #[inline] + pub fn hs(&self) -> Option<&HardState> { + self.hs.as_ref() + } + + /// States can be used for node to serve linearizable read requests locally + /// when its applied index is greater than the index in ReadState. + /// Note that the read_state will be returned when raft receives MsgReadIndex. + /// The returned is only valid for the request that requested to read. + #[inline] + pub fn read_states(&self) -> &[ReadState] { + &self.read_states + } + + /// Entries specifies entries to be saved to stable storage BEFORE + /// Messages are sent. + #[inline] + pub fn entries(&self) -> &[Entry] { + &self.entries + } + + /// Snapshot specifies the snapshot to be saved to stable storage. + #[inline] + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot + } + + /// MustSync indicates whether the HardState and Entries must be synchronously + /// written to disk or if an asynchronous write is permissible. + #[inline] + pub fn must_sync(&self) -> bool { + self.must_sync + } } /// RawNode is a thread-unsafe Node. diff --git a/tests/integration_cases/test_raw_node.rs b/tests/integration_cases/test_raw_node.rs index cd60c8301..95221ad31 100644 --- a/tests/integration_cases/test_raw_node.rs +++ b/tests/integration_cases/test_raw_node.rs @@ -56,21 +56,22 @@ fn conf_change(t: ConfChangeType, node_id: u64) -> ConfChange { cc } -fn new_ready( +fn cmp_ready( + r: &Ready, ss: Option, hs: Option, entries: Vec, committed_entries: Vec, must_sync: bool, -) -> Ready { - Ready { - ss, - hs, - entries, - committed_entries: Some(committed_entries), - must_sync, - ..Default::default() - } +) -> bool { + r.ss() == ss.as_ref() + && r.hs() == hs.as_ref() + && r.entries() == entries.as_slice() + && r.committed_entries == Some(committed_entries) + && r.must_sync() == must_sync + && r.read_states().is_empty() + && r.snapshot() == &Snapshot::default() + && r.messages.is_empty() } fn new_raw_node( @@ -178,7 +179,7 @@ fn test_raw_node_propose_and_conf_change() { let s = new_storage(); let mut raw_node = new_raw_node(1, vec![], 10, 1, s.clone(), vec![new_peer(1)]); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.campaign().expect(""); let mut proposed = false; @@ -186,9 +187,9 @@ fn test_raw_node_propose_and_conf_change() { let mut ccdata = vec![]; loop { let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); // Once we are the leader, propose a command and a ConfChange. - if !proposed && rd.ss.is_some() && rd.ss.as_ref().unwrap().leader_id == raw_node.raft.id { + if !proposed && rd.ss().is_some() && rd.ss().unwrap().leader_id == raw_node.raft.id { raw_node.propose(vec![], b"somedata".to_vec()).expect(""); let cc = conf_change(ConfChangeType::AddNode, 1); @@ -222,14 +223,14 @@ fn test_raw_node_propose_add_duplicate_node() { let s = new_storage(); let mut raw_node = new_raw_node(1, vec![], 10, 1, s.clone(), vec![new_peer(1)]); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.campaign().expect(""); loop { let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); - if rd.ss.is_some() && rd.ss.as_ref().unwrap().leader_id == raw_node.raft.id { + s.wl().append(rd.entries()).expect(""); + if rd.ss().is_some() && rd.ss().unwrap().leader_id == raw_node.raft.id { raw_node.advance(rd); break; } @@ -239,7 +240,7 @@ fn test_raw_node_propose_add_duplicate_node() { let mut propose_conf_change_and_apply = |cc| { raw_node.propose_conf_change(vec![], cc).expect(""); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); for e in rd.committed_entries.as_ref().unwrap() { if e.get_entry_type() == EntryType::EntryConfChange { let conf_change = protobuf::parse_from_bytes(e.get_data()).unwrap(); @@ -276,14 +277,14 @@ fn test_raw_node_propose_add_learner_node() { let s = new_storage(); let mut raw_node = new_raw_node(1, vec![], 10, 1, s.clone(), vec![new_peer(1)]); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.campaign().expect(""); loop { let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); - if rd.ss.is_some() && rd.ss.as_ref().unwrap().leader_id == raw_node.raft.id { + s.wl().append(rd.entries()).expect(""); + if rd.ss().is_some() && rd.ss().unwrap().leader_id == raw_node.raft.id { raw_node.advance(rd); break; } @@ -295,7 +296,7 @@ fn test_raw_node_propose_add_learner_node() { raw_node.propose_conf_change(vec![], cc).expect(""); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); assert!( rd.committed_entries.is_some() && rd.committed_entries.as_ref().unwrap().len() == 1, @@ -323,17 +324,13 @@ fn test_raw_node_read_index() { let s = new_storage(); let mut raw_node = new_raw_node(1, vec![], 10, 1, s.clone(), vec![new_peer(1)]); let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); + s.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.campaign().expect(""); loop { let rd = raw_node.ready(); - s.wl().append(&rd.entries).expect(""); - if rd - .ss - .as_ref() - .map_or(false, |ss| ss.leader_id == raw_node.raft.id) - { + s.wl().append(rd.entries()).expect(""); + if rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id) { raw_node.advance(rd); // Once we are the leader, issue a read index request @@ -347,8 +344,8 @@ fn test_raw_node_read_index() { assert!(!raw_node.raft.read_states.is_empty()); assert!(raw_node.has_ready()); let rd = raw_node.ready(); - assert_eq!(rd.read_states, wrs); - s.wl().append(&rd.entries).expect(""); + assert_eq!(rd.read_states(), wrs.as_slice()); + s.wl().append(&rd.entries()).expect(""); raw_node.advance(rd); // ensure raft.read_states is reset after advance @@ -364,54 +361,51 @@ fn test_raw_node_start() { setup_for_test(); let cc = conf_change(ConfChangeType::AddNode, 1); let ccdata = protobuf::Message::write_to_bytes(&cc).unwrap(); - let wants = vec![ - new_ready( - None, - Some(hard_state(1, 1, 0)), - vec![entry( - EntryType::EntryConfChange, - 1, - 1, - Some(ccdata.clone()), - )], - vec![entry( - EntryType::EntryConfChange, - 1, - 1, - Some(ccdata.clone()), - )], - true, - ), - new_ready( - None, - Some(hard_state(2, 3, 1)), - vec![new_entry(2, 3, Some("foo"))], - vec![new_entry(2, 3, Some("foo"))], - false, - ), - ]; - let store = new_storage(); let mut raw_node = new_raw_node(1, vec![], 10, 1, store.clone(), vec![new_peer(1)]); let rd = raw_node.ready(); info!("rd {:?}", &rd); - assert_eq!(rd, wants[0]); - store.wl().append(&rd.entries).expect(""); + assert!(cmp_ready( + &rd, + None, + Some(hard_state(1, 1, 0)), + vec![entry( + EntryType::EntryConfChange, + 1, + 1, + Some(ccdata.clone()), + )], + vec![entry( + EntryType::EntryConfChange, + 1, + 1, + Some(ccdata.clone()), + )], + true, + )); + store.wl().append(rd.entries()).expect(""); raw_node.advance(rd); let rd = raw_node.ready(); - store.wl().append(&rd.entries).expect(""); + store.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.campaign().expect(""); let rd = raw_node.ready(); - store.wl().append(&rd.entries).expect(""); + store.wl().append(rd.entries()).expect(""); raw_node.advance(rd); raw_node.propose(vec![], b"foo".to_vec()).expect(""); let rd = raw_node.ready(); - assert_eq!(rd, wants[1]); - store.wl().append(&rd.entries).expect(""); + assert!(cmp_ready( + &rd, + None, + Some(hard_state(2, 3, 1)), + vec![new_entry(2, 3, Some("foo"))], + vec![new_entry(2, 3, Some("foo"))], + false, + )); + store.wl().append(rd.entries()).expect(""); raw_node.advance(rd); assert!(!raw_node.has_ready()); } @@ -422,14 +416,19 @@ fn test_raw_node_restart() { let entries = vec![empty_entry(1, 1), new_entry(1, 2, Some("foo"))]; let st = hard_state(1, 1, 0); - let want = new_ready(None, None, vec![], entries[..1].to_vec(), false); - let store = new_storage(); store.wl().set_hardstate(st); store.wl().append(&entries).expect(""); let mut raw_node = new_raw_node(1, vec![], 10, 1, store, vec![]); let rd = raw_node.ready(); - assert_eq!(rd, want); + assert!(cmp_ready( + &rd, + None, + None, + vec![], + entries[..1].to_vec(), + false + )); raw_node.advance(rd); assert!(!raw_node.has_ready()); } @@ -441,15 +440,13 @@ fn test_raw_node_restart_from_snapshot() { let entries = vec![new_entry(1, 3, Some("foo"))]; let st = hard_state(1, 3, 0); - let want = new_ready(None, None, vec![], entries.clone(), false); - let s = new_storage(); s.wl().set_hardstate(st); s.wl().apply_snapshot(snap).expect(""); s.wl().append(&entries).expect(""); let mut raw_node = new_raw_node(1, vec![], 10, 1, s, vec![]); let rd = raw_node.ready(); - assert_eq!(rd, want); + assert!(cmp_ready(&rd, None, None, vec![], entries.clone(), false)); raw_node.advance(rd); assert!(!raw_node.has_ready()); }