Skip to content

Commit

Permalink
Small API Improvements (#102)
Browse files Browse the repository at this point in the history
* Use impl trait in progress

* Make ProgressSet not panic.

Moves failable functions to returning `Result<(), Error>` and adds
useful error types for determining the problem.

Adds an `panic` to all calling code to preserve existing behavior.

* clippy/fmt run

* Reflect comment on checks in promote_learner

* Satisfy clippy
  • Loading branch information
A. Hobden authored Aug 22, 2018
1 parent 19973a0 commit 8b547c5
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 53 deletions.
9 changes: 8 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ quick_error! {
description(err.description())
display("protobuf error {:?}", err)
}

/// The node exists, but should not.
Exists(id: u64, set: &'static str) {
display("The node {} aleady exists in the {} set.", id, set)
}
/// The node does not exist, but should.
NotExists(id: u64, set: &'static str) {
display("The node {} is not in the {} set.", id, set)
}
}
}

Expand Down
24 changes: 12 additions & 12 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ mod test {
for (entries, offset, snapshot, wok, windex) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset: offset,
snapshot: snapshot,
offset,
snapshot,
..Default::default()
};
let index = u.maybe_first_index();
Expand All @@ -244,8 +244,8 @@ mod test {
for (entries, offset, snapshot, wok, windex) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset: offset,
snapshot: snapshot,
offset,
snapshot,
..Default::default()
};
let index = u.maybe_last_index();
Expand Down Expand Up @@ -305,8 +305,8 @@ mod test {
for (entries, offset, snapshot, index, wok, wterm) in tests {
let u = Unstable {
entries: entries.map_or(vec![], |entry| vec![entry]),
offset: offset,
snapshot: snapshot,
offset,
snapshot,
..Default::default()
};
let term = u.maybe_term(index);
Expand Down Expand Up @@ -402,9 +402,9 @@ mod test {

for (entries, offset, snapshot, index, term, woffset, wlen) in tests {
let mut u = Unstable {
entries: entries,
offset: offset,
snapshot: snapshot,
entries,
offset,
snapshot,
..Default::default()
};
u.stable_to(index, term);
Expand Down Expand Up @@ -469,9 +469,9 @@ mod test {

for (entries, offset, snapshot, to_append, woffset, wentries) in tests {
let mut u = Unstable {
entries: entries,
offset: offset,
snapshot: snapshot,
entries,
offset,
snapshot,
..Default::default()
};
u.truncate_and_append(&to_append);
Expand Down
60 changes: 29 additions & 31 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use errors::Error;
use fxhash::FxHashMap;
use std::cmp;
use std::collections::hash_map::{HashMap, Iter, IterMut};
use std::iter::Chain;
use std::collections::hash_map::HashMap;

/// The state of the progress.
#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -105,41 +105,37 @@ impl ProgressSet {
}

/// Returns an iterator across all the nodes and their progress.
pub fn iter(&self) -> Chain<Iter<u64, Progress>, Iter<u64, Progress>> {
pub fn iter(&self) -> impl Iterator<Item = (&u64, &Progress)> {
self.voters.iter().chain(&self.learners)
}

/// Returns a mutable iterator across all the nodes and their progress.
pub fn iter_mut(&mut self) -> Chain<IterMut<u64, Progress>, IterMut<u64, Progress>> {
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
self.voters.iter_mut().chain(&mut self.learners)
}

/// Adds a voter node
///
/// # Panics
///
/// Panics if the node already has been added.
pub fn insert_voter(&mut self, id: u64, pr: Progress) {
if self.learners.contains_key(&id) {
panic!("insert voter {} but already in learners", id);
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
}
if self.voters.insert(id, pr).is_some() {
panic!("insert voter {} twice", id);
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?;
}
self.voters.insert(id, pr);
Ok(())
}

/// Adds a learner to the cluster
///
/// # Panics
///
/// Panics if the node already has been added.
pub fn insert_learner(&mut self, id: u64, pr: Progress) {
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
panic!("insert learner {} but already in voters", id);
Err(Error::Exists(id, "voters"))?
}
if self.learners.insert(id, pr).is_some() {
panic!("insert learner {} twice", id);
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?
}
self.learners.insert(id, pr);
Ok(())
}

/// Removes the peer from the set of voters or learners.
Expand All @@ -151,17 +147,19 @@ impl ProgressSet {
}

/// Promote a learner to a peer.
///
/// # Panics
///
/// Panics if the node doesn't exist.
pub fn promote_learner(&mut self, id: u64) {
if let Some(mut pr) = self.learners.remove(&id) {
pr.is_learner = false;
self.voters.insert(id, pr);
return;
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?;
}
// We don't want to remove it unless it's there.
if self.learners.contains_key(&id) {
let mut learner = self.learners.remove(&id).unwrap(); // We just checked!
learner.is_learner = false;
self.voters.insert(id, learner);
Ok(())
} else {
Err(Error::NotExists(id, "learners"))
}
panic!("promote not exists learner: {}", id);
}
}

Expand Down
20 changes: 14 additions & 6 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,16 @@ impl<T: Storage> Raft<T> {
};
for p in peers {
let pr = new_progress(1, r.max_inflight);
r.mut_prs().insert_voter(*p, pr);
if let Err(e) = r.mut_prs().insert_voter(*p, pr) {
panic!("{}", e);
}
}
for p in learners {
let mut pr = new_progress(1, r.max_inflight);
pr.is_learner = true;
r.mut_prs().insert_learner(*p, pr);
if let Err(e) = r.mut_prs().insert_learner(*p, pr) {
panic!("{}", e);
};
if *p == r.id {
r.is_learner = true;
}
Expand Down Expand Up @@ -1852,7 +1856,9 @@ impl<T: Storage> Raft<T> {
// Ignore redundant add learner.
return;
}
self.mut_prs().promote_learner(id);
if let Err(e) = self.mut_prs().promote_learner(id) {
panic!("{}", e)
}
if id == self.id {
self.is_learner = false;
}
Expand Down Expand Up @@ -1903,9 +1909,11 @@ impl<T: Storage> Raft<T> {
p.matched = matched;
p.is_learner = is_learner;
if is_learner {
self.mut_prs().insert_learner(id, p);
} else {
self.mut_prs().insert_voter(id, p);
if let Err(e) = self.mut_prs().insert_learner(id, p) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(id, p) {
panic!("{}", e);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl MemStorageCore {
// truncate compacted entries
let te: &[Entry] = if first > ents[0].get_index() {
let start_ent = (first - ents[0].get_index()) as usize;
&ents[start_ent..ents.len()]
&ents[start_ent..]
} else {
ents
};
Expand Down
8 changes: 6 additions & 2 deletions tests/cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,16 @@ impl Interface {
is_learner: true,
..Default::default()
};
self.mut_prs().insert_learner(*id, progress);
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else {
let progress = Progress {
..Default::default()
};
self.mut_prs().insert_voter(*id, progress);
if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
}
let term = self.term;
Expand Down

0 comments on commit 8b547c5

Please # to comment.