Skip to content

Commit

Permalink
Use hashset for PR groups
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoverbear committed Sep 7, 2018
1 parent 79e9553 commit 8154d37
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 39 deletions.
16 changes: 13 additions & 3 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use DEFAULT_RAFT_SETS;

pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_new(c);
bench_progress_set_with_capacity(c);
bench_progress_set_insert_voter(c);
bench_progress_set_insert_learner(c);
bench_progress_set_promote_learner(c);
Expand All @@ -14,7 +15,7 @@ pub fn bench_progress_set(c: &mut Criterion) {
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::new();
let mut set = ProgressSet::with_capacity(voters, learners);
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Default::default()).ok();
});
Expand All @@ -25,16 +26,25 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
}

pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new());
};

c.bench_function(&format!("ProgressSet::new"), bench);
}

pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new());
b.iter(|| ProgressSet::with_capacity(voters, learners));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::new ({}, {})", voters, learners),
&format!("ProgressSet::with_capacity ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand Down
107 changes: 72 additions & 35 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
// limitations under the License.

use errors::Error;
use fxhash::FxHashMap;
use fxhash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::cmp;
use std::collections::{HashMap, HashSet};

/// The state of the progress.
#[derive(Debug, PartialEq, Clone, Copy)]
Expand All @@ -46,57 +47,97 @@ impl Default for ProgressState {
}
}

#[derive(Clone, Debug, Default)]
struct Configuration {
voters: FxHashSet<u64>,
learners: FxHashSet<u64>,
}

/// `ProgressSet` contains several `Progress`es,
/// which could be `Leader`, `Follower` and `Learner`.
#[derive(Default, Clone)]
pub struct ProgressSet {
progress: FxHashMap<u64, Progress>,
configuration: Configuration,
}

impl ProgressSet {
/// Creates a new ProgressSet.
pub fn new() -> Self {
ProgressSet {
progress: Default::default(),
configuration: Default::default(),
}
}

/// Create a progress sete with the specified sizes already reserved.
pub fn with_capacity(voters: usize, learners: usize) -> Self {
ProgressSet {
progress: HashMap::with_capacity_and_hasher(
voters + learners,
FxBuildHasher::default(),
),
configuration: Configuration {
voters: HashSet::with_capacity_and_hasher(
voters + learners,
FxBuildHasher::default(),
),
learners: HashSet::with_capacity_and_hasher(
voters + learners,
FxBuildHasher::default(),
),
},
}
}

/// Returns the status of voters.
pub fn voters(&self) -> impl Iterator<Item = (&u64, &Progress)> {
self.progress.iter().filter(|&(_, v)| !v.is_learner)
self.progress
.iter()
.filter(move |(k, _)| self.configuration.voters.contains(k))
}

/// Returns the status of learners.
pub fn learners(&self) -> impl Iterator<Item = (&u64, &Progress)> {
self.progress.iter().filter(|&(_, v)| v.is_learner)
self.progress
.iter()
.filter(move |(k, _)| self.configuration.learners.contains(k))
}

/// Returns the mutable status of voters.
pub fn voters_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
self.progress.iter_mut().filter(|(_, v)| !v.is_learner)
let configuration = &self.configuration;
self.progress
.iter_mut()
.filter(move |(k, _)| configuration.voters.contains(k))
}

/// Returns the mutable status of learners.
pub fn learners_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
self.progress.iter_mut().filter(|(_, v)| v.is_learner)
let configuration = &self.configuration;
self.progress
.iter_mut()
.filter(move |(k, _)| configuration.learners.contains(k))
}

/// Returns if the progress set contains a voter by the given id.
pub fn has_voter(&self, id: u64) -> bool {
if let Some(progress) = self.progress.get(&id) {
!progress.is_learner
} else {
false
}
self.configuration.voters.contains(&id)
}

/// Returns the number of voters in the current configuration.
pub fn num_voters(&self) -> usize {
self.configuration.voters.len()
}

/// Returns the number of learners in the current configuration.
pub fn num_learners(&self) -> usize {
self.configuration.learners.len()
}

/// Returns if the progress set contains a learner by the given id.
pub fn has_learner(&self, id: u64) -> bool {
if let Some(progress) = self.progress.get(&id) {
progress.is_learner
} else {
false
}
self.configuration.learners.contains(&id)
}

/// Returns if the progress set contains a peer by the given id.
Expand All @@ -106,12 +147,12 @@ impl ProgressSet {

/// Returns the ids of all known voters.
pub fn voter_ids(&self) -> impl Iterator<Item = &u64> {
self.voters().map(|(k, _v)| k)
self.configuration.voters.iter()
}

/// Returns the ids of all known learners.
pub fn learner_ids(&self) -> impl Iterator<Item = &u64> {
self.learners().map(|(k, _v)| k)
self.configuration.learners.iter()
}

/// Grabs a reference to the progress of a node.
Expand All @@ -136,40 +177,34 @@ impl ProgressSet {

/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
if let Some(progress) = self.progress.get(&id) {
Err(Error::Exists(
id,
if progress.is_learner {
"learners"
} else {
"voters"
},
))?;
if self.has_learner(id) {
Err(Error::Exists(id, "learners"))?;
} else if self.has_voter(id) {
Err(Error::Exists(id, "voters"))?;
}
pr.is_learner = false;
self.configuration.voters.insert(id);
self.progress.insert(id, pr);
Ok(())
}

/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
if let Some(progress) = self.progress.get(&id) {
Err(Error::Exists(
id,
if progress.is_learner {
"learners"
} else {
"voters"
},
))?;
if self.has_learner(id) {
Err(Error::Exists(id, "learners"))?;
} else if self.has_voter(id) {
Err(Error::Exists(id, "voters"))?;
}
pr.is_learner = true;
self.configuration.learners.insert(id);
self.progress.insert(id, pr);
Ok(())
}

/// Removes the peer from the set of voters or learners.
pub fn remove(&mut self, id: u64) -> Option<Progress> {
self.configuration.voters.remove(&id);
self.configuration.learners.remove(&id);
self.progress.remove(&id)
}

Expand All @@ -179,6 +214,8 @@ impl ProgressSet {
Some(ref progress) if !progress.is_learner => Err(Error::Exists(id, "voters"))?,
Some(progress) => {
progress.is_learner = false;
self.configuration.voters.insert(id);
self.configuration.learners.remove(&id);
Ok(())
}
None => Err(Error::NotExists(id, "learners")),
Expand Down
2 changes: 1 addition & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl<T: Storage> Raft<T> {
}

fn quorum(&self) -> usize {
quorum(self.prs().voters().count())
quorum(self.prs().num_voters())
}

/// For testing leader lease
Expand Down

0 comments on commit 8154d37

Please # to comment.