Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

*: add quorum package #380

Merged
merged 1 commit into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2929,7 +2929,7 @@ fn test_restore() {
s.get_metadata().term
);
assert_eq!(
sm.prs().voter_ids(),
*sm.prs().voter_ids(),
s.get_metadata()
.get_conf_state()
.voters
Expand Down Expand Up @@ -3143,7 +3143,7 @@ fn test_add_node() -> Result<()> {
let mut r = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
r.add_node(2)?;
assert_eq!(
r.prs().voter_ids(),
*r.prs().voter_ids(),
vec![1, 2].into_iter().collect::<HashSet<_>>()
);

Expand Down Expand Up @@ -3237,7 +3237,7 @@ fn test_raft_nodes() {
let r = new_test_raft(1, ids, 10, 1, new_storage(), &l);
let voter_ids = r.prs().voter_ids();
let wids = wids.into_iter().collect::<HashSet<_>>();
if voter_ids != wids {
if *voter_ids != wids {
panic!("#{}: nodes = {:?}, want {:?}", i, voter_ids, wids);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ macro_rules! fatal {
mod config;
mod errors;
mod log_unstable;
mod quorum;
#[cfg(test)]
pub mod raft;
#[cfg(not(test))]
Expand All @@ -488,9 +489,10 @@ pub mod util;
pub use self::config::Config;
pub use self::errors::{Error, Result, StorageError};
pub use self::log_unstable::Unstable;
pub use self::quorum::majority::Configuration as MajorityConfig;
pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::tracker::{Configuration, Inflights, Progress, ProgressSet, ProgressState};
pub use self::tracker::{Inflights, Progress, ProgressSet, ProgressState};

#[allow(deprecated)]
pub use self::raw_node::is_empty_snap;
Expand Down
56 changes: 56 additions & 0 deletions src/quorum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

pub mod majority;

use std::collections::HashMap;
use std::fmt::{self, Debug, Display, Formatter};

/// VoteResult indicates the outcome of a vote.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum VoteResult {
/// Pending indicates that the decision of the vote depends on future
/// votes, i.e. neither "yes" or "no" has reached quorum yet.
Pending,
// Lost indicates that the quorum has voted "no".
Lost,
// Won indicates that the quorum has voted "yes".
Won,
}

/// Index is a Raft log position.
#[derive(Default, Clone, Copy)]
pub struct Index {
pub index: u64,
pub group_id: u64,
}

impl Display for Index {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if self.index != u64::MAX {
write!(f, "[{}]{}", self.group_id, self.index)
} else {
write!(f, "[{}]∞", self.group_id)
}
}
}

impl Debug for Index {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(self, f)
}
}

pub trait AckedIndexer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be better to add some comments for this public trait

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a public trait. It can only be accessed by the crate. We have #![deny(missing_docs)] check.

fn acked_index(&self, voter_id: u64) -> Option<Index>;
}

pub type AckIndexer = HashMap<u64, Index>;

impl AckedIndexer for AckIndexer {
#[inline]
fn acked_index(&self, voter: u64) -> Option<Index> {
self.get(&voter).cloned()
}
}
132 changes: 132 additions & 0 deletions src/quorum/majority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use super::{AckedIndexer, Index, VoteResult};
use crate::{DefaultHashBuilder, HashSet};
use std::mem::MaybeUninit;
use std::{cmp, slice, u64};

/// A set of IDs that uses majority quorums to make decisions.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Configuration {
pub(crate) voters: HashSet<u64>,
}

impl Configuration {
/// Creates a new configuration using the given IDs.
pub fn new(voters: HashSet<u64>) -> Configuration {
Configuration { voters }
}

/// Creates an empty configuration with given capacity.
pub fn with_capacity(cap: usize) -> Configuration {
Configuration {
voters: HashSet::with_capacity_and_hasher(cap, DefaultHashBuilder::default()),
}
}

/// Returns the MajorityConfig as a sorted slice.
pub fn slice(&self) -> Vec<u64> {
let mut voters: Vec<_> = self.voters.iter().cloned().collect();
voters.sort();
voters
}

/// Computes the committed index from those supplied via the
/// provided AckedIndexer (for the active config).
///
/// The bool flag indicates whether the index is computed by group commit algorithm
/// successfully.
///
/// Eg. If the matched indexes are [2,2,2,4,5], it will return 2.
/// If the matched indexes and groups are `[(1, 1), (2, 2), (3, 2)]`, it will return 1.
pub fn committed_index(&self, use_group_commit: bool, l: &impl AckedIndexer) -> (u64, bool) {
if self.voters.is_empty() {
// This plays well with joint quorums which, when one half is the zero
// MajorityConfig, should behave like the other half.
return (u64::MAX, false);
}

let mut stack_arr: [MaybeUninit<Index>; 7] = unsafe { MaybeUninit::uninit().assume_init() };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer SmallVec for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use SmallVec on purpose. On the one hand, it's such a small piece of code that doesn't need to introduce another dependency; on the other hand, it has less condition branch as every operation is inlined and optimized when using array.

let mut heap_arr;
let matched = if self.voters.len() <= 7 {
for (i, v) in self.voters.iter().enumerate() {
stack_arr[i] = MaybeUninit::new(l.acked_index(*v).unwrap_or_default());
}
unsafe {
slice::from_raw_parts_mut(stack_arr.as_mut_ptr() as *mut _, self.voters.len())
}
} else {
let mut buf = Vec::with_capacity(self.voters.len());
for v in &self.voters {
buf.push(l.acked_index(*v).unwrap_or_default());
}
heap_arr = Some(buf);
heap_arr.as_mut().unwrap().as_mut_slice()
};
// Reverse sort.
matched.sort_by(|a, b| b.index.cmp(&a.index));

let quorum = crate::majority(matched.len());
let quorum_index = matched[quorum - 1];
if !use_group_commit {
return (quorum_index.index, false);
}
let (quorum_commit_index, mut checked_group_id) =
(quorum_index.index, quorum_index.group_id);
let mut single_group = true;
for m in matched.iter() {
if m.group_id == 0 {
single_group = false;
continue;
}
if checked_group_id == 0 {
checked_group_id = m.group_id;
continue;
}
if checked_group_id == m.group_id {
continue;
}
return (cmp::min(m.index, quorum_commit_index), true);
}
if single_group {
(quorum_commit_index, false)
} else {
(matched.last().unwrap().index, false)
}
}

/// Takes a mapping of voters to yes/no (true/false) votes and returns
/// a result indicating whether the vote is pending (i.e. neither a quorum of
/// yes/no has been reached), won (a quorum of yes has been reached), or lost (a
/// quorum of no has been reached).
pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
if self.voters.is_empty() {
// By convention, the elections on an empty config win. This comes in
// handy with joint quorums because it'll make a half-populated joint
// quorum behave like a majority quorum.
return VoteResult::Won;
}

let (mut yes, mut missing) = (0, 0);
for v in &self.voters {
match check(*v) {
Some(true) => yes += 1,
None => missing += 1,
_ => (),
}
}
let q = crate::majority(self.voters.len());
if yes >= q {
VoteResult::Won
} else if yes + missing >= q {
VoteResult::Pending
} else {
VoteResult::Lost
}
}

/// Clears all IDs.
pub fn clear(&mut self) {
self.voters.clear();
}
}
59 changes: 29 additions & 30 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::raft_log::RaftLog;
use super::read_only::{ReadOnly, ReadOnlyOption, ReadState};
use super::storage::Storage;
use super::Config;
use crate::tracker::CandidacyStatus;
use crate::quorum::VoteResult;
use crate::{util, HashMap, HashSet};
use crate::{Progress, ProgressSet, ProgressState};

Expand Down Expand Up @@ -1069,29 +1069,28 @@ impl<T: Storage> Raft<T> {
}

// Only send vote request to voters.
self.prs
.voter_ids()
.iter()
.filter(|&id| *id != self_id)
.for_each(|&id| {
info!(
self.logger,
"[logterm: {log_term}, index: {log_index}] sent request to {id}",
log_term = self.raft_log.last_term(),
log_index = self.raft_log.last_index(),
id = id;
"term" => self.term,
"msg" => ?vote_msg,
);
let mut m = new_message(id, vote_msg, None);
m.term = term;
m.index = self.raft_log.last_index();
m.log_term = self.raft_log.last_term();
if campaign_type == CAMPAIGN_TRANSFER {
m.context = campaign_type.to_vec();
}
self.r.send(m, &mut self.msgs);
});
for id in self.prs.voter_ids() {
if *id == self_id {
continue;
}
info!(
self.logger,
"[logterm: {log_term}, index: {log_index}] sent request to {id}",
log_term = self.raft_log.last_term(),
log_index = self.raft_log.last_index(),
id = id;
"term" => self.term,
"msg" => ?vote_msg,
);
let mut m = new_message(*id, vote_msg, None);
m.term = term;
m.index = self.raft_log.last_index();
m.log_term = self.raft_log.last_term();
if campaign_type == CAMPAIGN_TRANSFER {
m.context = campaign_type.to_vec();
}
self.r.send(m, &mut self.msgs);
}
}

/// Sets the vote of `id` to `vote`.
Expand Down Expand Up @@ -1780,8 +1779,8 @@ impl<T: Storage> Raft<T> {

/// Check if it can become leader.
fn check_votes(&mut self) -> Option<bool> {
match self.prs().candidacy_status(&self.votes) {
CandidacyStatus::Elected => {
match self.prs().vote_result(&self.votes) {
VoteResult::Won => {
if self.state == StateRole::PreCandidate {
self.campaign(CAMPAIGN_ELECTION);
} else {
Expand All @@ -1790,14 +1789,14 @@ impl<T: Storage> Raft<T> {
}
Some(true)
}
CandidacyStatus::Ineligible => {
VoteResult::Lost => {
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.term > self.term; reuse self.term
let term = self.term;
self.become_follower(term, INVALID_ID);
Some(false)
}
CandidacyStatus::Eligible => None,
VoteResult::Pending => None,
}
}

Expand Down Expand Up @@ -2144,9 +2143,9 @@ impl<T: Storage> Raft<T> {
let next_idx = self.raft_log.last_index() + 1;
self.prs.restore_snapmeta(meta, next_idx, self.max_inflight);
self.prs.get_mut(self.id).unwrap().matched = next_idx - 1;
if self.prs.configuration().voters().contains(&self.id) {
if self.prs.voter_ids().contains(&self.id) {
self.promotable = true;
} else if self.prs.configuration().learners().contains(&self.id) {
} else if self.prs.learner_ids().contains(&self.id) {
self.promotable = false;
}

Expand Down
Loading