diff --git a/.travis.yml b/.travis.yml index ec6e39ef3..b6ccbd8c6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ rust: - stable - nightly # Officially the oldest compiler we support. - - 1.39.0 + - 1.41.0 matrix: include: - os: windows diff --git a/README.md b/README.md index 669238133..72823aa45 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ You can use raft with either [rust-protobuf](https://github.com/pingcap/rust-pro ## Developing the Raft crate `Raft` is built using the latest version of `stable` Rust, using [the 2018 edition](https://doc.rust-lang.org/edition-guide/rust-2018/). -Minimum supported version is `1.33.0`. +Minimum supported version is `1.41.0`. Using `rustup` you can get started this way: diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 35d4ec6b8..08e11d71b 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -4724,3 +4724,199 @@ fn test_request_snapshot_on_role_change() { nt.peers[&2].pending_request_snapshot ); } + +/// Tests group commit. +/// +/// 1. Logs should be replicated to at least different groups before committed; +/// 2. all peers are configured to the same group, simple quorum should be used. +#[test] +fn test_group_commit() { + let l = default_logger(); + let mut tests = vec![ + // Single + (vec![1], vec![0], 1, 1), + (vec![1], vec![1], 1, 1), + // Odd + (vec![2, 2, 1], vec![1, 2, 1], 2, 2), + (vec![2, 2, 1], vec![1, 1, 2], 1, 2), + (vec![2, 2, 1], vec![1, 0, 1], 1, 2), + (vec![2, 2, 1], vec![0, 0, 0], 1, 2), + // Even + (vec![4, 2, 1, 3], vec![0, 0, 0, 0], 1, 2), + (vec![4, 2, 1, 3], vec![1, 0, 0, 0], 1, 2), + (vec![4, 2, 1, 3], vec![0, 1, 0, 2], 2, 2), + (vec![4, 2, 1, 3], vec![0, 2, 1, 0], 1, 2), + (vec![4, 2, 1, 3], vec![1, 1, 1, 1], 2, 2), + (vec![4, 2, 1, 3], vec![1, 1, 2, 1], 1, 2), + (vec![4, 2, 1, 3], vec![1, 2, 1, 1], 2, 2), + (vec![4, 2, 1, 3], vec![4, 3, 2, 1], 2, 2), + ]; + + for (i, (matches, group_ids, g_w, q_w)) in tests.drain(..).enumerate() { + let store = MemStorage::new_with_conf_state((vec![1], vec![])); + let min_index = *matches.iter().min().unwrap(); + let max_index = *matches.iter().max().unwrap(); + let logs: Vec<_> = (min_index..=max_index).map(|i| empty_entry(1, i)).collect(); + store.wl().append(&logs).unwrap(); + let mut hs = HardState::default(); + hs.term = 1; + store.wl().set_hardstate(hs); + let cfg = new_test_config(1, 5, 1); + let mut sm = new_test_raft_with_config(&cfg, store, &l); + + let mut groups = vec![]; + for (j, (m, g)) in matches.into_iter().zip(group_ids).enumerate() { + let id = j as u64 + 1; + if sm.mut_prs().get(id).is_none() { + sm.set_progress(id, m, m + 1, false); + } + if g != 0 { + groups.push((id, g)); + } + } + sm.enable_group_commit(true); + sm.assign_commit_groups(&groups); + if sm.raft_log.committed != 0 { + panic!( + "#{}: follower group committed {}, want 0", + i, sm.raft_log.committed + ); + } + sm.state = StateRole::Leader; + sm.assign_commit_groups(&groups); + if sm.raft_log.committed != g_w { + panic!( + "#{}: leader group committed {}, want {}", + i, sm.raft_log.committed, g_w + ); + } + sm.enable_group_commit(false); + if sm.raft_log.committed != q_w { + panic!( + "#{}: quorum committed {}, want {}", + i, sm.raft_log.committed, q_w + ); + } + } +} + +#[test] +fn test_group_commit_consistent() { + let l = default_logger(); + let mut logs = vec![]; + for i in 1..6 { + logs.push(empty_entry(1, i)); + } + for i in 6..=8 { + logs.push(empty_entry(2, i)); + } + let mut tests = vec![ + // Single node is not using group commit + (vec![8], vec![0], 8, 6, StateRole::Leader, Some(false)), + (vec![8], vec![1], 8, 5, StateRole::Leader, None), + (vec![8], vec![1], 8, 6, StateRole::Follower, None), + // Not commit to current term should return None, as old leader may + // have reach consistent. + (vec![8, 2, 0], vec![1, 2, 1], 2, 2, StateRole::Leader, None), + ( + vec![8, 2, 6], + vec![1, 1, 2], + 6, + 6, + StateRole::Leader, + Some(true), + ), + // Not apply to current term should return None, as there maybe pending conf change. + (vec![8, 2, 6], vec![1, 1, 2], 6, 5, StateRole::Leader, None), + // It should be false when not using group commit. + ( + vec![8, 6, 6], + vec![0, 0, 0], + 6, + 6, + StateRole::Leader, + Some(false), + ), + // It should be false when there is only one group. + ( + vec![8, 6, 6], + vec![1, 1, 1], + 6, + 6, + StateRole::Leader, + Some(false), + ), + ( + vec![8, 6, 6], + vec![1, 1, 0], + 6, + 6, + StateRole::Leader, + Some(false), + ), + // Only leader knows what's the current state. + ( + vec![8, 2, 6], + vec![1, 1, 2], + 6, + 6, + StateRole::Follower, + None, + ), + ( + vec![8, 2, 6], + vec![1, 1, 2], + 6, + 6, + StateRole::Candidate, + None, + ), + ( + vec![8, 2, 6], + vec![1, 1, 2], + 6, + 6, + StateRole::PreCandidate, + None, + ), + ]; + + for (i, (matches, group_ids, committed, applied, role, exp)) in tests.drain(..).enumerate() { + let store = MemStorage::new_with_conf_state((vec![1], vec![])); + store.wl().append(&logs).unwrap(); + let mut hs = HardState::default(); + hs.term = 2; + hs.commit = committed; + store.wl().set_hardstate(hs); + let mut cfg = new_test_config(1, 5, 1); + cfg.applied = applied; + let mut sm = new_test_raft_with_config(&cfg, store, &l); + sm.state = role; + + let mut groups = vec![]; + for (j, (m, g)) in matches.into_iter().zip(group_ids).enumerate() { + let id = j as u64 + 1; + if sm.mut_prs().get(id).is_none() { + sm.set_progress(id, m, m + 1, false); + } + if g != 0 { + groups.push((id, g)); + } + } + sm.assign_commit_groups(&groups); + if Some(true) == exp { + let is_consistent = sm.check_group_commit_consistent(); + if is_consistent != Some(false) { + panic!( + "#{}: consistency = {:?}, want Some(false)", + i, is_consistent + ); + } + } + sm.enable_group_commit(true); + let is_consistent = sm.check_group_commit_consistent(); + if is_consistent != exp { + panic!("#{}: consistency = {:?}, want {:?}", i, is_consistent, exp); + } + } +} diff --git a/src/progress/mod.rs b/src/progress/mod.rs index 70c4b9778..c5ac1e1ce 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -81,6 +81,9 @@ pub struct Progress { /// When a leader receives a reply, the previous inflights should /// be freed by calling inflights.freeTo. pub ins: Inflights, + + /// Only logs replicated to different group will be committed if any group is configured. + pub commit_group_id: u64, } impl Progress { @@ -95,6 +98,7 @@ impl Progress { pending_request_snapshot: 0, recent_active: false, ins: Inflights::new(ins_size), + commit_group_id: 0, } } diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index 26ba30b9d..61275b25f 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -14,9 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cell::RefCell; - use slog::Logger; +use std::cmp; use crate::eraftpb::{ConfState, SnapshotMetadata}; use crate::errors::{Error, Result}; @@ -142,7 +141,8 @@ pub struct ProgressSet { // A preallocated buffer for sorting in the maximal_committed_index function. // You should not depend on these values unless you just set them. // We use a cell to avoid taking a `&mut self`. - sort_buffer: RefCell>, + sort_buffer: Vec<(u64, u64)>, + group_commit: bool, pub(crate) logger: Logger, } @@ -159,12 +159,23 @@ impl ProgressSet { voters + learners, DefaultHashBuilder::default(), ), - sort_buffer: RefCell::from(Vec::with_capacity(voters)), + sort_buffer: Vec::with_capacity(voters), configuration: Configuration::with_capacity(voters, learners), + group_commit: false, logger, } } + /// Configures group commit. + pub fn enable_group_commit(&mut self, enable: bool) { + self.group_commit = enable; + } + + /// Whether enable group commit. + pub fn group_commit(&self) -> bool { + self.group_commit + } + fn clear(&mut self) { self.progress.clear(); self.configuration.voters.clear(); @@ -369,21 +380,47 @@ impl ProgressSet { ); } - /// Returns the maximal committed index for the cluster. + /// Returns the maximal committed index for the cluster. The bool flag indicates whether + /// the index is computed by group commit algorithm successfully. /// /// Eg. If the matched indexes are [2,2,2,4,5], it will return 2. - pub fn maximal_committed_index(&self) -> u64 { - let mut matched = self.sort_buffer.borrow_mut(); + /// If the matched indexes and groups are `[(1, 1), (2, 2), (3, 2)]`, it will return 1. + pub fn maximal_committed_index(&mut self) -> (u64, bool) { + let matched = &mut self.sort_buffer; matched.clear(); + let progress = &self.progress; self.configuration.voters().iter().for_each(|id| { - let peer = &self.progress[id]; - matched.push(peer.matched); + let p = &progress[id]; + matched.push((p.matched, p.commit_group_id)); }); // Reverse sort. - matched.sort_by(|a, b| b.cmp(a)); + matched.sort_by(|a, b| b.0.cmp(&a.0)); let quorum = crate::majority(matched.len()); - matched[quorum - 1] + if !self.group_commit { + return (matched[quorum - 1].0, false); + } + let (quorum_commit_index, mut checked_group_id) = matched[quorum - 1]; + let mut single_group = true; + for (index, group_id) in matched.iter() { + if *group_id == 0 { + single_group = false; + continue; + } + if checked_group_id == 0 { + checked_group_id = *group_id; + continue; + } + if checked_group_id == *group_id { + continue; + } + return (cmp::min(*index, quorum_commit_index), true); + } + if single_group { + (matched[quorum - 1].0, false) + } else { + (matched.last().unwrap().0, false) + } } /// Returns the Candidate's eligibility in the current election. diff --git a/src/raft.rs b/src/raft.rs index cba1983b4..742f873da 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -395,6 +395,93 @@ impl Raft { self.batch_append = batch_append; } + /// Configures group commit. + /// + /// If group commit is enabled, only logs replicated to at least two + /// different groups are committed. + /// + /// You should use `assign_commit_groups` to configure peer groups. + pub fn enable_group_commit(&mut self, enable: bool) { + self.mut_prs().enable_group_commit(enable); + if StateRole::Leader == self.state && !enable && self.maybe_commit() { + self.bcast_append(); + } + } + + /// Whether enable group commit. + pub fn group_commit(&self) -> bool { + self.prs().group_commit() + } + + /// Assigns groups to peers. + /// + /// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0. + /// + /// The group information is only stored in memory. So you need to configure + /// it every time a raft state machine is initialized or a snapshot is applied. + pub fn assign_commit_groups(&mut self, ids: &[(u64, u64)]) { + let prs = self.mut_prs(); + for (peer_id, group_id) in ids { + assert!(*group_id > 0); + if let Some(pr) = prs.get_mut(*peer_id) { + pr.commit_group_id = *group_id; + } else { + continue; + } + } + if StateRole::Leader == self.state && self.group_commit() && self.maybe_commit() { + self.bcast_append(); + } + } + + /// Removes all commit group configurations. + pub fn clear_commit_group(&mut self) { + for (_, pr) in self.mut_prs().iter_mut() { + pr.commit_group_id = 0; + } + } + + /// Checks whether the raft group is using group commit and consistent + /// over group. + /// + /// If it can't get a correct answer, `None` is returned. + pub fn check_group_commit_consistent(&mut self) -> Option { + if self.state != StateRole::Leader { + return None; + } + // Previous leader may have reach consistency already. + // + // check applied_index instead of committed_index to avoid pending conf change. + if !self.apply_to_current_term() { + return None; + } + let (index, use_group_commit) = self.mut_prs().maximal_committed_index(); + debug!( + self.logger, + "check group commit consistent"; + "index" => index, + "use_group_commit" => use_group_commit, + "committed" => self.raft_log.committed + ); + Some(use_group_commit && index == self.raft_log.committed) + } + + /// Checks if logs are committed to its term. + /// + /// The check is useful usually when raft is leader. + pub fn commit_to_current_term(&self) -> bool { + self.raft_log + .term(self.raft_log.committed) + .map_or(false, |t| t == self.term) + } + + /// Checks if logs are applied to current term. + pub fn apply_to_current_term(&self) -> bool { + self.raft_log + .term(self.raft_log.applied) + .map_or(false, |t| t == self.term) + } + // send persists state to stable storage and then sends to its mailbox. fn send(&mut self, mut m: Message) { debug!( @@ -652,7 +739,7 @@ impl Raft { /// Attempts to advance the commit index. Returns true if the commit index /// changed (in which case the caller should call `r.bcast_append`). pub fn maybe_commit(&mut self) -> bool { - let mci = self.prs().maximal_committed_index(); + let mci = self.mut_prs().maximal_committed_index().0; self.raft_log.maybe_commit(mci, self.term) } @@ -1543,7 +1630,7 @@ impl Raft { return Ok(()); } MessageType::MsgReadIndex => { - if self.raft_log.term(self.raft_log.committed).unwrap_or(0) != self.term { + if !self.commit_to_current_term() { // Reject read only request when this leader has not committed any log entry // in its term. return Ok(()); diff --git a/src/raft_log.rs b/src/raft_log.rs index 83973cf1f..33c560c5b 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -420,7 +420,7 @@ impl RaftLog { /// Attempts to commit the index and term and returns whether it did. pub fn maybe_commit(&mut self, max_index: u64, term: u64) -> bool { - if max_index > self.committed && self.term(max_index).unwrap_or(0) == term { + if max_index > self.committed && self.term(max_index).map_or(false, |t| t == term) { debug!( self.unstable.logger, "committing index {index}",