From 8772e69fd124ae675ef7c0ec50142ead330703cc Mon Sep 17 00:00:00 2001 From: Kevin Choubacha Date: Tue, 10 Jul 2018 00:05:25 +0200 Subject: [PATCH] Require documentation on public interfaces Previously, compiling would not warn or error for lack of documentation. Now the compiler will fail if public interfaces are left without documentation. To prevent the library from failing, all public interfaces have been documented. resolves #72 --- proto/eraftpb.proto | 10 ++++ src/errors.rs | 15 +++++ src/lib.rs | 3 + src/log_unstable.rs | 44 +++++++++----- src/progress.rs | 119 +++++++++++++++++++++++++------------ src/raft.rs | 139 ++++++++++++++++++++++++++++++++++---------- src/raft_log.rs | 117 +++++++++++++++++++++++++++---------- src/raw_node.rs | 119 ++++++++++++++++++++++--------------- src/read_only.rs | 24 ++++---- src/status.rs | 9 ++- src/storage.rs | 38 ++++++++---- src/util.rs | 31 ++++++++++ 12 files changed, 485 insertions(+), 183 deletions(-) diff --git a/proto/eraftpb.proto b/proto/eraftpb.proto index a38de98aa..e7c25f50c 100644 --- a/proto/eraftpb.proto +++ b/proto/eraftpb.proto @@ -6,6 +6,16 @@ enum EntryType { EntryConfChange = 1; } +// The entry is a type of change that needs to be applied. It contains two data fields. +// While the fields are built into the model; their usage is determined by the entry_type. +// +// For normal entries, the data field should contain the data change that should be applied. +// The context field can be used for any contextual data that might be relevant to the +// application of the data. +// +// For configuration changes, the data will contain the ConfChange message and the +// context will provide anything needed to assist the configuration change. The context +// if for the user to set and use in this case. message Entry { EntryType entry_type = 1; uint64 term = 2; diff --git a/src/errors.rs b/src/errors.rs index 51b2b1dfb..0b5935c69 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -17,30 +17,38 @@ use std::{cmp, io, result}; use protobuf::ProtobufError; quick_error! { + /// The base error type for raft #[derive(Debug)] pub enum Error { + /// An IO error occurred Io(err: io::Error) { from() cause(err) description(err.description()) } + /// A storage error occurred. Store(err: StorageError) { from() cause(err) description(err.description()) } + /// Raft cannot step the local message. StepLocalMsg { description("raft: cannot step raft local message") } + /// The raft peer is not found and thus cannot step. StepPeerNotFound { description("raft: cannot step as peer not found") } + /// The proposal of changes was dropped. ProposalDropped { description("raft: proposal dropped") } + /// The configuration is invalid. ConfigInvalid(desc: String) { description(desc) } + /// A Protobuf message failed in some manner. Codec(err: ProtobufError) { from() cause(err) @@ -67,20 +75,26 @@ impl cmp::PartialEq for Error { } quick_error! { + /// An error with the storage. #[derive(Debug)] pub enum StorageError { + /// The storage was compacted and not accessible Compacted { description("log compacted") } + /// The log is not available. Unavailable { description("log unavailable") } + /// The snapshot is out of date. SnapshotOutOfDate { description("snapshot out of date") } + /// The snapshot is being created. SnapshotTemporarilyUnavailable { description("snapshot is temporarily unavailable") } + /// Some other error occurred. Other(err: Box) { from() cause(err.as_ref()) @@ -106,6 +120,7 @@ impl cmp::PartialEq for StorageError { } } +/// A result type that wraps up the raft errors. pub type Result = result::Result; #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 7360b501e..ef189d497 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -215,6 +215,7 @@ For more information, check out an [example](examples/single_mem_node/main.rs#L1 #![cfg_attr(feature = "dev", feature(plugin))] #![cfg_attr(feature = "dev", plugin(clippy))] #![cfg_attr(not(feature = "dev"), allow(unknown_lints))] +#![deny(missing_docs)] extern crate fxhash; #[macro_use] @@ -224,6 +225,8 @@ extern crate protobuf; extern crate quick_error; extern crate rand; +/// This module supplies the needed message types. However, it is autogenerated and thus cannot be +/// documented by field. pub mod eraftpb; mod errors; mod log_unstable; diff --git a/src/log_unstable.rs b/src/log_unstable.rs index 67fd029a2..dea591e0d 100644 --- a/src/log_unstable.rs +++ b/src/log_unstable.rs @@ -1,3 +1,5 @@ +//! A representation of not-yet-committed log entries and state. + // Copyright 2016 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,22 +29,27 @@ use eraftpb::{Entry, Snapshot}; -// unstable.entries[i] has raft log position i+unstable.offset. -// Note that unstable.offset may be less than the highest log -// position in storage; this means that the next write to storage -// might need to truncate the log before persisting unstable.entries. +/// The unstable.entries[i] has raft log position i+unstable.offset. +/// Note that unstable.offset may be less than the highest log +/// position in storage; this means that the next write to storage +/// might need to truncate the log before persisting unstable.entries. #[derive(Debug, PartialEq, Default)] pub struct Unstable { - // the incoming unstable snapshot, if any. + /// The incoming unstable snapshot, if any. pub snapshot: Option, - // all entries that have not yet been written to storage. + + /// All entries that have not yet been written to storage. pub entries: Vec, + + /// The offset from the vector index. pub offset: u64, + /// The tag to use when logging. pub tag: String, } impl Unstable { + /// Creates a new log of unstable entries. pub fn new(offset: u64, tag: String) -> Unstable { Unstable { offset, @@ -51,16 +58,16 @@ impl Unstable { tag, } } - // maybe_first_index returns the index of the first possible entry in entries - // if it has a snapshot. + + /// Returns the index of the first possible entry in entries + /// if it has a snapshot. pub fn maybe_first_index(&self) -> Option { self.snapshot .as_ref() .map(|snap| snap.get_metadata().get_index() + 1) } - // maybe_last_index returns the last index if it has at least one - // unstable entry or snapshot. + /// Returns the last index if it has at least one unstable entry or snapshot. pub fn maybe_last_index(&self) -> Option { match self.entries.len() { 0 => self.snapshot @@ -70,8 +77,7 @@ impl Unstable { } } - // maybe_term returns the term of the entry at index idx, if there - // is any. + /// Returns the term of the entry at index idx, if there is any. pub fn maybe_term(&self, idx: u64) -> Option { if idx < self.offset { let snapshot = self.snapshot.as_ref()?; @@ -91,6 +97,8 @@ impl Unstable { } } + /// Moves the stable offset up to the index. Provided that the index + /// is in the same election term. pub fn stable_to(&mut self, idx: u64, term: u64) { let t = self.maybe_term(idx); if t.is_none() { @@ -104,6 +112,7 @@ impl Unstable { } } + /// Removes the snapshot from self if the index of the snapshot matches pub fn stable_snap_to(&mut self, idx: u64) { if self.snapshot.is_none() { return; @@ -113,13 +122,14 @@ impl Unstable { } } + /// From a given snapshot, restores the snapshot to self, but doesn't unpack. pub fn restore(&mut self, snap: Snapshot) { self.entries.clear(); self.offset = snap.get_metadata().get_index() + 1; self.snapshot = Some(snap); } - // append entries to unstable, truncate local block first if overlapped. + /// Append entries to unstable, truncate local block first if overlapped. pub fn truncate_and_append(&mut self, ents: &[Entry]) { let after = ents[0].get_index(); if after == self.offset + self.entries.len() as u64 { @@ -140,6 +150,12 @@ impl Unstable { } } + /// Returns a slice of entries between the high and low. + /// + /// # Panics + /// + /// Panics if the `lo` or `hi` are out of bounds. + /// Panics if `lo > hi`. pub fn slice(&self, lo: u64, hi: u64) -> &[Entry] { self.must_check_outofbounds(lo, hi); let l = lo as usize; @@ -148,6 +164,8 @@ impl Unstable { &self.entries[l - off..h - off] } + /// Asserts the `hi` and `lo` values against each other and against the + /// entries themselves. pub fn must_check_outofbounds(&self, lo: u64, hi: u64) { if lo > hi { panic!("{} invalid unstable.slice {} > {}", self.tag, lo, hi) diff --git a/src/progress.rs b/src/progress.rs index 5245e48ef..3e19e595b 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -30,10 +30,14 @@ use std::cmp; use std::collections::hash_map::{HashMap, Iter, IterMut}; use std::iter::Chain; +/// The state of the progress. #[derive(Debug, PartialEq, Clone, Copy)] pub enum ProgressState { + /// Whether it's probing. Probe, + /// Whether it's replicating. Replicate, + /// Whethers it's a snapshot. Snapshot, } @@ -52,6 +56,7 @@ pub struct ProgressSet { } impl ProgressSet { + /// Creates a new ProgressSet. pub fn new(voter_size: usize, learner_size: usize) -> Self { ProgressSet { voters: HashMap::with_capacity_and_hasher(voter_size, Default::default()), @@ -59,14 +64,17 @@ impl ProgressSet { } } + /// Returns the status of voters. pub fn voters(&self) -> &FxHashMap { &self.voters } + /// Returns the status of learners. pub fn learners(&self) -> &FxHashMap { &self.learners } + /// Returns the ids of all known nodes. pub fn nodes(&self) -> Vec { let mut nodes = Vec::with_capacity(self.voters.len()); nodes.extend(self.voters.keys()); @@ -74,6 +82,7 @@ impl ProgressSet { nodes } + /// Returns the ids of all known learners. pub fn learner_nodes(&self) -> Vec { let mut ids = Vec::with_capacity(self.learners.len()); ids.extend(self.learners.keys()); @@ -81,10 +90,12 @@ impl ProgressSet { ids } + /// Grabs a reference to the progress of a node. pub fn get(&self, id: u64) -> Option<&Progress> { self.voters.get(&id).or_else(|| self.learners.get(&id)) } + /// Grabs a mutable reference to the progress of a node. pub fn get_mut(&mut self, id: u64) -> Option<&mut Progress> { let progress = self.voters.get_mut(&id); if progress.is_none() { @@ -93,14 +104,21 @@ impl ProgressSet { progress } + /// Returns an iterator across all the nodes and their progress. pub fn iter(&self) -> Chain, Iter> { self.voters.iter().chain(&self.learners) } + /// Returns a mutable iterator across all the nodes and their progress. pub fn iter_mut(&mut self) -> Chain, IterMut> { self.voters.iter_mut().chain(&mut self.learners) } + /// Adds a voter node + /// + /// # Panics + /// + /// Panics if the node already has been added. pub fn insert_voter(&mut self, id: u64, pr: Progress) { if self.learners.contains_key(&id) { panic!("insert voter {} but already in learners", id); @@ -110,6 +128,11 @@ impl ProgressSet { } } + /// Adds a learner to the cluster + /// + /// # Panics + /// + /// Panics if the node already has been added. pub fn insert_learner(&mut self, id: u64, pr: Progress) { if self.voters.contains_key(&id) { panic!("insert learner {} but already in voters", id); @@ -119,6 +142,7 @@ impl ProgressSet { } } + /// Removes the peer from the set of voters or learners. pub fn remove(&mut self, id: u64) -> Option { match self.voters.remove(&id) { None => self.learners.remove(&id), @@ -126,6 +150,11 @@ impl ProgressSet { } } + /// Promote a learner to a peer. + /// + /// # Panics + /// + /// Panics if the node doesn't exist. pub fn promote_learner(&mut self, id: u64) { if let Some(mut pr) = self.learners.remove(&id) { pr.is_learner = false; @@ -136,45 +165,48 @@ impl ProgressSet { } } +/// The progress of catching up from a restart. #[derive(Debug, Default, Clone)] pub struct Progress { + /// How much state is matched. pub matched: u64, + /// The next index to apply pub next_idx: u64, - // When in ProgressStateProbe, leader sends at most one replication message - // per heartbeat interval. It also probes actual progress of the follower. - // - // When in ProgressStateReplicate, leader optimistically increases next - // to the latest entry sent after sending replication message. This is - // an optimized state for fast replicating log entries to the follower. - // - // When in ProgressStateSnapshot, leader should have sent out snapshot - // before and stops sending any replication message. + /// When in ProgressStateProbe, leader sends at most one replication message + /// per heartbeat interval. It also probes actual progress of the follower. + /// + /// When in ProgressStateReplicate, leader optimistically increases next + /// to the latest entry sent after sending replication message. This is + /// an optimized state for fast replicating log entries to the follower. + /// + /// When in ProgressStateSnapshot, leader should have sent out snapshot + /// before and stop sending any replication message. pub state: ProgressState, - // Paused is used in ProgressStateProbe. - // When Paused is true, raft should pause sending replication message to this peer. + /// Paused is used in ProgressStateProbe. + /// When Paused is true, raft should pause sending replication message to this peer. pub paused: bool, - // pending_snapshot is used in ProgressStateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + /// This field is used in ProgressStateSnapshot. + /// If there is a pending snapshot, the pendingSnapshot will be set to the + /// index of the snapshot. If pendingSnapshot is set, the replication process of + /// this Progress will be paused. raft will not resend snapshot until the pending one + /// is reported to be failed. pub pending_snapshot: u64, - // recent_active is true if the progress is recently active. Receiving any messages - // from the corresponding follower indicates the progress is active. - // RecentActive can be reset to false after an election timeout. + /// This is true if the progress is recently active. Receiving any messages + /// from the corresponding follower indicates the progress is active. + /// RecentActive can be reset to false after an election timeout. pub recent_active: bool, - // Inflights is a sliding window for the inflight messages. - // When inflights is full, no more message should be sent. - // When a leader sends out a message, the index of the last - // entry should be added to inflights. The index MUST be added - // into inflights in order. - // When a leader receives a reply, the previous inflights should - // be freed by calling inflights.freeTo. + /// Inflights is a sliding window for the inflight messages. + /// When inflights is full, no more message should be sent. + /// When a leader sends out a message, the index of the last + /// entry should be added to inflights. The index MUST be added + /// into inflights in order. + /// When a leader receives a reply, the previous inflights should + /// be freed by calling inflights.freeTo. pub ins: Inflights, - // Indicates the Progress is a learner or not. + /// Indicates the Progress is a learner or not. pub is_learner: bool, } @@ -186,6 +218,7 @@ impl Progress { self.ins.reset(); } + /// Changes the progress to a probe. pub fn become_probe(&mut self) { // If the original state is ProgressStateSnapshot, progress knows that // the pending snapshot has been sent to this peer successfully, then @@ -200,28 +233,31 @@ impl Progress { } } + /// Changes the progress to a Replicate. pub fn become_replicate(&mut self) { self.reset_state(ProgressState::Replicate); self.next_idx = self.matched + 1; } + /// Changes the progress to a snapshot. pub fn become_snapshot(&mut self, snapshot_idx: u64) { self.reset_state(ProgressState::Snapshot); self.pending_snapshot = snapshot_idx; } + /// Sets the snapshot to failure. pub fn snapshot_failure(&mut self) { self.pending_snapshot = 0; } - // maybe_snapshot_abort unsets pendingSnapshot if Match is equal or higher than - // the pendingSnapshot + /// Unsets pendingSnapshot if Match is equal or higher than + /// the pendingSnapshot pub fn maybe_snapshot_abort(&self) -> bool { self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot } - // maybe_update returns false if the given n index comes from an outdated message. - // Otherwise it updates the progress and returns true. + /// Returns false if the given n index comes from an outdated message. + /// Otherwise it updates the progress and returns true. pub fn maybe_update(&mut self, n: u64) -> bool { let need_update = self.matched < n; if need_update { @@ -236,12 +272,14 @@ impl Progress { need_update } + /// Optimistically advance the index pub fn optimistic_update(&mut self, n: u64) { self.next_idx = n + 1; } - // maybe_decr_to returns false if the given to index comes from an out of order message. - // Otherwise it decreases the progress next index to min(rejected, last) and returns true. + /// Returns false if the given index comes from an out of order message. + /// Otherwise it decreases the progress next index to min(rejected, last) + /// and returns true. pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool { if self.state == ProgressState::Replicate { // the rejection must be stale if the progress has matched and "rejected" @@ -266,6 +304,7 @@ impl Progress { true } + /// Determine whether progress is paused. pub fn is_paused(&self) -> bool { match self.state { ProgressState::Probe => self.paused, @@ -274,15 +313,18 @@ impl Progress { } } + /// Resume progress pub fn resume(&mut self) { self.paused = false; } + /// Pause progress. pub fn pause(&mut self) { self.paused = true; } } +/// A buffer of inflight messages. #[derive(Debug, Default, Clone, PartialEq)] pub struct Inflights { // the starting index in the buffer @@ -295,6 +337,7 @@ pub struct Inflights { } impl Inflights { + /// Creates a new buffer for inflight messages. pub fn new(cap: usize) -> Inflights { Inflights { buffer: Vec::with_capacity(cap), @@ -302,16 +345,17 @@ impl Inflights { } } - // full returns true if the inflights is full. + /// Returns true if the inflights is full. pub fn full(&self) -> bool { self.count == self.cap() } + /// The buffer capacity. pub fn cap(&self) -> usize { self.buffer.capacity() } - // add adds an inflight into inflights + /// Adds an inflight into inflights pub fn add(&mut self, inflight: u64) { if self.full() { panic!("cannot add into a full inflights") @@ -330,7 +374,7 @@ impl Inflights { self.count += 1; } - // free_to frees the inflights smaller or equal to the given `to` flight. + /// Frees the inflights smaller or equal to the given `to` flight. pub fn free_to(&mut self, to: u64) { if self.count == 0 || to < self.buffer[self.start] { // out of the left side of the window @@ -359,12 +403,13 @@ impl Inflights { self.start = idx; } + /// Frees the first buffer entry. pub fn free_first_one(&mut self) { let start = self.buffer[self.start]; self.free_to(start); } - // resets frees all inflights. + /// Frees all inflights. pub fn reset(&mut self) { self.count = 0; self.start = 0; diff --git a/src/raft.rs b/src/raft.rs index dbf35a2c1..2464400f4 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -47,11 +47,16 @@ const CAMPAIGN_ELECTION: &[u8] = b"CampaignElection"; // CAMPAIGN_TRANSFER represents the type of leader transfer. const CAMPAIGN_TRANSFER: &[u8] = b"CampaignTransfer"; +/// The role of the node. #[derive(Debug, PartialEq, Clone, Copy)] pub enum StateRole { + /// The node is a follower of the leader. Follower, + /// The node could become a leader. Candidate, + /// The node is a leader. Leader, + /// The node could become a candidate, if `prevote` is enabled. PreCandidate, } @@ -61,14 +66,14 @@ impl Default for StateRole { } } -// A constant represents invalid id of raft. +/// A constant represents invalid id of raft. pub const INVALID_ID: u64 = 0; -// A constant represents invalid index of raft log. +/// A constant represents invalid index of raft log. pub const INVALID_INDEX: u64 = 0; /// Config contains the parameters to start a raft. pub struct Config { - /// id is the identity of the local raft. It cannot be 0, and must be unique in the group. + /// The identity of the local raft. It cannot be 0, and must be unique in the group. pub id: u64, /// The IDs of all nodes (including self) in @@ -120,7 +125,7 @@ pub struct Config { /// quorum is not active for an electionTimeout. pub check_quorum: bool, - /// pre_vote enables the Pre-Vote algorithm described in raft thesis section + /// Enables the Pre-Vote algorithm described in raft thesis section /// 9.6. This prevents disruption when a node that has been partitioned away /// rejoins the cluster. pub pre_vote: bool, @@ -142,7 +147,7 @@ pub struct Config { /// May affect proposal forwarding and follower read. pub skip_bcast_commit: bool, - /// tag is only used for logging + /// A human-friendly tag used for logging. pub tag: String, } @@ -170,6 +175,7 @@ impl Default for Config { } impl Config { + /// Creates a new config. pub fn new(id: u64) -> Self { Self { id, @@ -178,6 +184,7 @@ impl Config { } } + /// The minimum number of ticks before an election. #[inline] pub fn min_election_tick(&self) -> usize { if self.min_election_tick == 0 { @@ -187,6 +194,7 @@ impl Config { } } + /// The maximum number of ticks before an election. #[inline] pub fn max_election_tick(&self) -> usize { if self.max_election_tick == 0 { @@ -196,6 +204,7 @@ impl Config { } } + /// Runs validations against the config. pub fn validate(&self) -> Result<()> { if self.id == INVALID_ID { return Err(Error::ConfigInvalid("invalid node id".to_owned())); @@ -239,42 +248,59 @@ impl Config { } } -// SoftState provides state that is useful for logging and debugging. -// The state is volatile and does not need to be persisted to the WAL. +/// SoftState provides state that is useful for logging and debugging. +/// The state is volatile and does not need to be persisted to the WAL. #[derive(Default, PartialEq, Debug)] pub struct SoftState { + /// The potential leader of the cluster. pub leader_id: u64, + /// The soft role this node may take. pub raft_state: StateRole, } +/// A struct that represents the raft consensus itself. Stores details concerning the current +/// and possible state the system can take. #[derive(Default)] pub struct Raft { + /// The current election term. pub term: u64, + + /// Which peer this raft is voting for. pub vote: u64, + /// The ID of this node. pub id: u64, + /// The current read states pub read_states: Vec, - /// the log + /// The log pub raft_log: RaftLog, + /// The maximum number of messages that can be inflight. pub max_inflight: usize, + + /// The maximum length (in bytes) of all the entries. pub max_msg_size: u64, + prs: Option, + /// The current role of this node. pub state: StateRole, + /// Whether this is a learner node. pub is_learner: bool, + /// The current votes. pub votes: FxHashMap, + /// The list of messages. pub msgs: Vec, - /// the leader id + /// The leader id pub leader_id: u64, - /// lead_transferee is id of the leader transfer target when its value is not None. + /// ID of the leader transfer target when its value is not None. /// Follow the procedure defined in raft thesis 3.10. pub lead_transferee: Option, @@ -286,18 +312,19 @@ pub struct Raft { /// value. pub pending_conf_index: u64, + /// The queue of read-only requests. pub read_only: ReadOnly, - /// number of ticks since it reached last electionTimeout when it is leader - /// or candidate. - /// number of ticks since it reached last electionTimeout or received a + /// Ticks since it reached last electionTimeout when it is leader or candidate. + /// Number of ticks since it reached last electionTimeout or received a /// valid message from current leader when it is a follower. pub election_elapsed: usize, - /// number of ticks since it reached last heartbeatTimeout. + /// Number of ticks since it reached last heartbeatTimeout. /// only leader keeps heartbeatElapsed. heartbeat_elapsed: usize, + /// Whether to check the quorum pub check_quorum: bool, #[doc(hidden)] pub pre_vote: bool, @@ -318,7 +345,7 @@ pub struct Raft { #[doc(hidden)] pub before_step_state: Option bool + Send>>, - /// tag is only used for logging + /// Tag is only used for logging tag: String, } @@ -344,7 +371,7 @@ fn new_message(to: u64, field_type: MessageType, from: Option) -> Message { m } -// vote_resp_msg_type maps vote and pre_vote message types to their correspond responses. +/// Maps vote and pre_vote message types to their correspond responses. pub fn vote_resp_msg_type(t: MessageType) -> MessageType { match t { MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse, @@ -353,12 +380,13 @@ pub fn vote_resp_msg_type(t: MessageType) -> MessageType { } } -// Calculate the quorum of a Raft cluster with the specified total nodes. +/// Calculate the quorum of a Raft cluster with the specified total nodes. pub fn quorum(total: usize) -> usize { total / 2 + 1 } impl Raft { + /// Creates a new raft for use on the node. pub fn new(c: &Config, store: T) -> Raft { c.validate().expect("configuration is invalid"); let rs = store.initial_state().expect(""); @@ -444,31 +472,37 @@ impl Raft { r } + /// Grabs an immutable reference to the store. #[inline] pub fn get_store(&self) -> &T { self.raft_log.get_store() } + /// Grabs a mutable reference to the store. #[inline] pub fn mut_store(&mut self) -> &mut T { self.raft_log.mut_store() } + /// Grabs a reference to the snapshot #[inline] pub fn get_snap(&self) -> Option<&Snapshot> { self.raft_log.get_unstable().snapshot.as_ref() } + /// Returns the number of pending read-only messages. #[inline] pub fn pending_read_count(&self) -> usize { self.read_only.pending_read_count() } + /// Returns how many read states exist. #[inline] pub fn ready_read_count(&self) -> usize { self.read_states.len() } + /// Returns a value representing the softstate at the time of calling. pub fn soft_state(&self) -> SoftState { SoftState { leader_id: self.leader_id, @@ -476,6 +510,7 @@ impl Raft { } } + /// Returns a value representing the hardstate at the time of calling. pub fn hard_state(&self) -> HardState { let mut hs = HardState::new(); hs.set_term(self.term); @@ -484,6 +519,7 @@ impl Raft { hs } + /// Returns whether the current raft is in lease. pub fn in_lease(&self) -> bool { self.state == StateRole::Leader && self.check_quorum } @@ -492,20 +528,23 @@ impl Raft { quorum(self.prs().voters().len()) } - // for testing leader lease + /// For testing leader lease pub fn set_randomized_election_timeout(&mut self, t: usize) { assert!(self.min_election_timeout <= t && t < self.max_election_timeout); self.randomized_election_timeout = t; } + /// Fetch the length of the election timeout. pub fn get_election_timeout(&self) -> usize { self.election_timeout } + /// Fetch the length of the heartbeat timeout pub fn get_heartbeat_timeout(&self) -> usize { self.heartbeat_timeout } + /// Return the length of the current randomized election timeout. pub fn get_randomized_election_timeout(&self) -> usize { self.randomized_election_timeout } @@ -637,7 +676,7 @@ impl Raft { } } - // send_append sends RPC, with entries to the given peer. + /// Sends RPC, with entries to the given peer. pub fn send_append(&mut self, to: u64, pr: &mut Progress) { if pr.is_paused() { return; @@ -676,8 +715,8 @@ impl Raft { self.send(m); } - // bcast_append sends RPC, with entries to all peers that are not up-to-date - // according to the progress recorded in r.prs(). + /// Sends RPC, with entries to all peers that are not up-to-date + /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; let mut prs = self.take_prs(); @@ -687,7 +726,7 @@ impl Raft { self.set_prs(prs); } - // bcast_heartbeat sends RPC, without entries to all the peers. + /// Sends RPC, without entries to all the peers. pub fn bcast_heartbeat(&mut self) { let ctx = self.read_only.last_pending_request_ctx(); self.bcast_heartbeat_with_ctx(ctx) @@ -703,9 +742,8 @@ impl Raft { self.set_prs(prs); } - // maybe_commit attempts to advance the commit index. Returns true if - // the commit index changed (in which case the caller should call - // r.bcast_append). + /// Attempts to advance the commit index. Returns true if the commit index + /// changed (in which case the caller should call `r.bcast_append`). pub fn maybe_commit(&mut self) -> bool { let mut mis_arr = [0; 5]; let mut mis_vec; @@ -724,6 +762,7 @@ impl Raft { self.raft_log.maybe_commit(mci, self.term) } + /// Resets the current node to a given term. pub fn reset(&mut self, term: u64) { if self.term != term { self.term = term; @@ -753,6 +792,8 @@ impl Raft { } } + /// Appends a slice of entries to the log. The entries are updated to match + /// the current index and term. pub fn append_entry(&mut self, es: &mut [Entry]) { let mut li = self.raft_log.last_index(); for (i, e) in es.iter_mut().enumerate() { @@ -779,9 +820,10 @@ impl Raft { } } - // tick_election is run by followers and candidates after self.election_timeout. // TODO: revoke pub when there is a better way to test. - // Returns true to indicate that there will probably be some readiness need to be handled. + /// Run by followers and candidates after self.election_timeout. + /// + /// Returns true to indicate that there will probably be some readiness need to be handled. pub fn tick_election(&mut self) -> bool { self.election_elapsed += 1; if !self.pass_election_timeout() || !self.promotable() { @@ -826,6 +868,7 @@ impl Raft { has_ready } + /// Converts this node to a follower. pub fn become_follower(&mut self, term: u64, leader_id: u64) { self.reset(term); self.leader_id = leader_id; @@ -834,6 +877,11 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. + /// Converts this node to a candidate + /// + /// # Panics + /// + /// Panics if a leader already exists. pub fn become_candidate(&mut self) { assert_ne!( self.state, @@ -848,6 +896,11 @@ impl Raft { info!("{} became candidate at term {}", self.tag, self.term); } + /// Converts this node to a pre-candidate + /// + /// # Panics + /// + /// Panics if a leader already exists. pub fn become_pre_candidate(&mut self) { assert_ne!( self.state, @@ -866,6 +919,11 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. + /// Makes this raft the leader. + /// + /// # Panics + /// + /// Panics if this is a follower node. pub fn become_leader(&mut self) { assert_ne!( self.state, @@ -961,6 +1019,8 @@ impl Raft { self.votes.values().filter(|x| **x).count() } + /// Steps the raft along via a message. This should be called everytime your raft receives a + /// message from a peer. pub fn step(&mut self, m: Message) -> Result<()> { // Handle the message term, which may result in our stepping down to a follower. @@ -1408,7 +1468,7 @@ impl Raft { pr.pause(); } - /// check message's progress to decide which action should be taken. + /// Check message's progress to decide which action should be taken. fn check_message_with_progress( &mut self, m: &mut Message, @@ -1772,6 +1832,7 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. + /// For a given message, append the entries to the log. pub fn handle_append_entries(&mut self, m: &Message) { if m.get_index() < self.raft_log.committed { let mut to_send = Message::new(); @@ -1814,6 +1875,7 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. + /// For a message, commit and send out heartbeat. pub fn handle_heartbeat(&mut self, mut m: Message) { self.raft_log.commit_to(m.get_commit()); let mut to_send = Message::new(); @@ -1920,8 +1982,8 @@ impl Raft { None } - // restore recovers the state machine from a snapshot. It restores the log and the - // configuration of state machine. + /// Recovers the state machine from a snapshot. It restores the log and the + /// configuration of state machine. pub fn restore(&mut self, snap: Snapshot) -> bool { if snap.get_metadata().get_index() < self.raft_log.committed { return false; @@ -1942,12 +2004,13 @@ impl Raft { self.pending_conf_index > self.raft_log.applied } + /// Specifies if the commit should be broadcast. pub fn should_bcast_commit(&self) -> bool { !self.skip_bcast_commit || self.has_pending_conf() } - // promotable indicates whether state machine can be promoted to leader, - // which is true when its own id is in progress list. + /// Indicates whether state machine can be promoted to leader, + /// which is true when its own id is in progress list. pub fn promotable(&self) -> bool { self.prs().voters().contains_key(&self.id) } @@ -1982,14 +2045,17 @@ impl Raft { self.mut_prs().get_mut(id).unwrap().recent_active = true; } + /// Adds a new node to the cluster. pub fn add_node(&mut self, id: u64) { self.add_voter_or_learner(id, false); } + /// Adds a learner node. pub fn add_learner(&mut self, id: u64) { self.add_voter_or_learner(id, true); } + /// Removes a node from the raft. pub fn remove_node(&mut self, id: u64) { self.mut_prs().remove(id); @@ -2009,6 +2075,7 @@ impl Raft { } } + /// Updates the progress of the learner or voter. pub fn set_progress(&mut self, id: u64, matched: u64, next_idx: u64, is_learner: bool) { let mut p = new_progress(next_idx, self.max_inflight); p.matched = matched; @@ -2020,23 +2087,28 @@ impl Raft { } } + /// Takes the progress set (destructively turns to `None`). pub fn take_prs(&mut self) -> ProgressSet { self.prs.take().unwrap() } + /// Sets the progress set. pub fn set_prs(&mut self, prs: ProgressSet) { self.prs = Some(prs); } + /// Returns a read-only reference to the progress set. pub fn prs(&self) -> &ProgressSet { self.prs.as_ref().unwrap() } + /// Returns a mutable reference to the progress set. pub fn mut_prs(&mut self) -> &mut ProgressSet { self.prs.as_mut().unwrap() } // TODO: revoke pub when there is a better way to test. + /// For a given hardstate, load the state into self. pub fn load_state(&mut self, hs: &HardState) { if hs.get_commit() < self.raft_log.committed || hs.get_commit() > self.raft_log.last_index() { @@ -2060,6 +2132,7 @@ impl Raft { self.election_elapsed >= self.randomized_election_timeout } + /// Regenerates and stores the election timeout. pub fn reset_randomized_election_timeout(&mut self) { let prev_timeout = self.randomized_election_timeout; let timeout = @@ -2092,11 +2165,13 @@ impl Raft { act >= self.quorum() } + /// Issues a message to timeout immediately. pub fn send_timeout_now(&mut self, to: u64) { let msg = new_message(to, MessageType::MsgTimeoutNow, None); self.send(msg); } + /// Stops the tranfer of a leader. pub fn abort_leader_transfer(&mut self) { self.lead_transferee = None; } diff --git a/src/raft_log.rs b/src/raft_log.rs index bb154bfe9..3fbaa1534 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -39,22 +39,24 @@ pub use util::NO_LIMIT; /// Raft log implementation #[derive(Default)] pub struct RaftLog { - // storage contains all stable entries since the last snapshot. + /// Contains all stable entries since the last snapshot. pub store: T, - // unstable contains all unstable entries and snapshot. - // they will be saved into storage. + /// Contains all unstable entries and snapshot. + /// they will be saved into storage. pub unstable: Unstable, - // committed is the highest log position that is known to be in - // stable storage on a quorum of nodes. + /// The highest log position that is known to be in stable storage + /// on a quorum of nodes. pub committed: u64, - // applied is the highest log position that the application has - // been instructed to apply to its state machine. - // Invariant: applied <= committed + /// The highest log position that the application has been instructed + /// to apply to its state machine. + /// + /// Invariant: applied <= committed pub applied: u64, + /// A tag associated with this raft for logging purposes. pub tag: String, } @@ -74,6 +76,7 @@ where } impl RaftLog { + /// Creates a new raft log with a given storage and tag. pub fn new(storage: T, tag: String) -> RaftLog { let first_index = storage.first_index().unwrap(); let last_index = storage.last_index().unwrap(); @@ -88,6 +91,11 @@ impl RaftLog { } } + /// Grabs the term from the last entry. + /// + /// # Panics + /// + /// Panics if there are entries but the last term has been discarded. pub fn last_term(&self) -> u64 { match self.term(self.last_index()) { Ok(t) => t, @@ -98,16 +106,19 @@ impl RaftLog { } } + /// Grab a read-only reference to the underlying storage. #[inline] pub fn get_store(&self) -> &T { &self.store } + /// Grab a mutable reference to the underlying storage. #[inline] pub fn mut_store(&mut self) -> &mut T { &mut self.store } + /// For a given index, finds the term associated with it. pub fn term(&self, idx: u64) -> Result { // the valid term range is [index of dummy entry, last index] let dummy_idx = self.first_index() - 1; @@ -128,6 +139,11 @@ impl RaftLog { } } + /// Returns th first index in the store that is available via entries + /// + /// # Panics + /// + /// Panics if the store doesn't have a first index. pub fn first_index(&self) -> u64 { match self.unstable.maybe_first_index() { Some(idx) => idx, @@ -135,6 +151,11 @@ impl RaftLog { } } + /// Returns the last index in the store that is available via entries. + /// + /// # Panics + /// + /// Panics if the store doesn't have a last index. pub fn last_index(&self) -> u64 { match self.unstable.maybe_last_index() { Some(idx) => idx, @@ -142,17 +163,22 @@ impl RaftLog { } } - // find_conflict finds the index of the conflict. - // It returns the first index of conflicting entries between the existing - // entries and the given entries, if there are any. - // If there is no conflicting entries, and the existing entries contain - // all the given entries, zero will be returned. - // If there is no conflicting entries, but the given entries contains new - // entries, the index of the first new entry will be returned. - // An entry is considered to be conflicting if it has the same index but - // a different term. - // The first entry MUST have an index equal to the argument 'from'. - // The index of the given entries MUST be continuously increasing. + /// Finds the index of the conflict. + /// + /// It returns the first index of conflicting entries between the existing + /// entries and the given entries, if there are any. + /// + /// If there are no conflicting entries, and the existing entries contain + /// all the given entries, zero will be returned. + /// + /// If there are no conflicting entries, but the given entries contains new + /// entries, the index of the first new entry will be returned. + /// + /// An entry is considered to be conflicting if it has the same index but + /// a different term. + /// + /// The first entry MUST have an index equal to the argument 'from'. + /// The index of the given entries MUST be continuously increasing. pub fn find_conflict(&self, ents: &[Entry]) -> u64 { for e in ents { if !self.match_term(e.get_index(), e.get_term()) { @@ -171,12 +197,17 @@ impl RaftLog { 0 } + /// Answers the question: Does this index belong to this term? pub fn match_term(&self, idx: u64, term: u64) -> bool { self.term(idx).map(|t| t == term).unwrap_or(false) } - // maybe_append returns None if the entries cannot be appended. Otherwise, - // it returns Some(last index of new entries). + /// Returns None if the entries cannot be appended. Otherwise, + /// it returns Some(last index of new entries). + /// + /// # Panics + /// + /// Panics if it finds a conflicting index. pub fn maybe_append( &mut self, idx: u64, @@ -203,6 +234,11 @@ impl RaftLog { None } + /// Sets the last committed value to the passed in value. + /// + /// # Panics + /// + /// Panics if the index goes past the last index. pub fn commit_to(&mut self, to_commit: u64) { // never decrease commit if self.committed >= to_commit { @@ -219,6 +255,11 @@ impl RaftLog { self.committed = to_commit; } + /// Advance the applied index to the passed in value. + /// + /// # Panics + /// + /// Panics if the value passed in is not new or known. pub fn applied_to(&mut self, idx: u64) { if idx == 0 { return; @@ -232,22 +273,27 @@ impl RaftLog { self.applied = idx; } + /// Returns the last applied index. pub fn get_applied(&self) -> u64 { self.applied } + /// Attempts to set the stable up to a given index. pub fn stable_to(&mut self, idx: u64, term: u64) { self.unstable.stable_to(idx, term) } + /// Snaps the unstable up to a current index. pub fn stable_snap_to(&mut self, idx: u64) { self.unstable.stable_snap_to(idx) } + /// Returns a reference to the unstable log. pub fn get_unstable(&self) -> &Unstable { &self.unstable } + /// Appends a set of entries to the unstable list. pub fn append(&mut self, ents: &[Entry]) -> u64 { if ents.is_empty() { return self.last_index(); @@ -264,6 +310,7 @@ impl RaftLog { self.last_index() } + /// Returns slice of entries that are not committed. pub fn unstable_entries(&self) -> Option<&[Entry]> { if self.unstable.entries.is_empty() { return None; @@ -271,6 +318,7 @@ impl RaftLog { Some(&self.unstable.entries) } + /// Returns entries starting from a particular index and not exceeding a bytesize. pub fn entries(&self, idx: u64, max_size: u64) -> Result> { let last = self.last_index(); if idx > last { @@ -279,6 +327,7 @@ impl RaftLog { self.slice(idx, last + 1, max_size) } + /// Returns all the entries. pub fn all_entries(&self) -> Vec { let first_index = self.first_index(); match self.entries(first_index, NO_LIMIT) { @@ -293,16 +342,17 @@ impl RaftLog { } } - // is_up_to_date determines if the given (lastIndex,term) log is more up-to-date - // by comparing the index and term of the last entry in the existing logs. - // If the logs have last entry with different terms, then the log with the - // later term is more up-to-date. If the logs end with the same term, then - // whichever log has the larger last_index is more up-to-date. If the logs are - // the same, the given log is up-to-date. + /// Determines if the given (lastIndex,term) log is more up-to-date + /// by comparing the index and term of the last entry in the existing logs. + /// If the logs have last entry with different terms, then the log with the + /// later term is more up-to-date. If the logs end with the same term, then + /// whichever log has the larger last_index is more up-to-date. If the logs are + /// the same, the given log is up-to-date. pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool { term > self.last_term() || (term == self.last_term() && last_index >= self.last_index()) } + /// Returns any entries since the a particular index. pub fn next_entries_since(&self, since_idx: u64) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); let committed = self.committed; @@ -315,22 +365,25 @@ impl RaftLog { None } - // next_entries returns all the available entries for execution. - // If applied is smaller than the index of snapshot, it returns all committed - // entries after the index of snapshot. + /// Returns all the available entries for execution. + /// If applied is smaller than the index of snapshot, it returns all committed + /// entries after the index of snapshot. pub fn next_entries(&self) -> Option> { self.next_entries_since(self.applied) } + /// Returns whether there are entries since a particular index. pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); self.committed + 1 > offset } + /// Returns whether there are new entries. pub fn has_next_entries(&self) -> bool { self.has_next_entries_since(self.applied) } + /// Returns the current snapshot pub fn snapshot(&self) -> Result { self.unstable .snapshot @@ -361,6 +414,7 @@ impl RaftLog { None } + /// Attempts to commit the index and term and returns whether it did. pub fn maybe_commit(&mut self, max_index: u64, term: u64) -> bool { if max_index > self.committed && self.term(max_index).unwrap_or(0) == term { self.commit_to(max_index); @@ -370,6 +424,8 @@ impl RaftLog { } } + /// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are + /// returned by value. The result is truncated to the max_size in bytes. pub fn slice(&self, low: u64, high: u64, max_size: u64) -> Result> { let err = self.must_check_outofbounds(low, high); if err.is_some() { @@ -413,6 +469,7 @@ impl RaftLog { Ok(ents) } + /// Restores the current log from a snapshot. pub fn restore(&mut self, snapshot: Snapshot) { info!( "{} log [{}] starts to restore snapshot [index: {}, term: {}]", diff --git a/src/raw_node.rs b/src/raw_node.rs index 400d8d2ea..011c8b791 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -1,3 +1,8 @@ +//! The raw node of the raft module. +//! +//! This module contains the value types for the node and it's connection to other +//! nodes but not the raft consensus itself. Generally, you'll interact with the +//! RawNode first and use it to access the inner workings of the consensus protocol. // Copyright 2016 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -39,15 +44,22 @@ use super::read_only::ReadState; use super::Status; use super::Storage; +/// Represents a Peer node in the cluster. #[derive(Debug, Default)] pub struct Peer { + /// The ID of the peer. pub id: u64, + /// If there is context associated with the peer (like connection information), it can be + /// serialized and stored here. pub context: Option>, } +/// The status of the snapshot. #[derive(Debug, PartialEq, Copy, Clone)] pub enum SnapshotStatus { + /// Represents that the snapshot is finished being created. Finish, + /// Indicates that the snapshot failed to build or is not ready. Failure, } @@ -73,51 +85,52 @@ fn is_response_msg(t: MessageType) -> bool { } } +/// For a given snapshot, determine if it's empty or not. pub fn is_empty_snap(s: &Snapshot) -> bool { s.get_metadata().get_index() == 0 } -// Ready encapsulates the entries and messages that are ready to read, -// be saved to stable storage, committed or sent to other peers. -// All fields in Ready are read-only. +/// Ready encapsulates the entries and messages that are ready to read, +/// be saved to stable storage, committed or sent to other peers. +/// All fields in Ready are read-only. #[derive(Default, Debug, PartialEq)] pub struct Ready { - // The current volatile state of a Node. - // SoftState will be nil if there is no update. - // It is not required to consume or store SoftState. + /// The current volatile state of a Node. + /// SoftState will be nil if there is no update. + /// It is not required to consume or store SoftState. pub ss: Option, - // The current state of a Node to be saved to stable storage BEFORE - // Messages are sent. - // HardState will be equal to empty state if there is no update. + /// The current state of a Node to be saved to stable storage BEFORE + /// Messages are sent. + /// HardState will be equal to empty state if there is no update. pub hs: Option, - // read_states states can be used for node to serve linearizable read requests locally - // when its applied index is greater than the index in ReadState. - // Note that the read_state will be returned when raft receives MsgReadIndex. - // The returned is only valid for the request that requested to read. + /// States can be used for node to serve linearizable read requests locally + /// when its applied index is greater than the index in ReadState. + /// Note that the read_state will be returned when raft receives MsgReadIndex. + /// The returned is only valid for the request that requested to read. pub read_states: Vec, - // Entries specifies entries to be saved to stable storage BEFORE - // Messages are sent. + /// Entries specifies entries to be saved to stable storage BEFORE + /// Messages are sent. pub entries: Vec, - // Snapshot specifies the snapshot to be saved to stable storage. + /// Snapshot specifies the snapshot to be saved to stable storage. pub snapshot: Snapshot, - // CommittedEntries specifies entries to be committed to a - // store/state-machine. These have previously been committed to stable - // store. + /// CommittedEntries specifies entries to be committed to a + /// store/state-machine. These have previously been committed to stable + /// store. pub committed_entries: Option>, - // Messages specifies outbound messages to be sent AFTER Entries are - // committed to stable storage. - // If it contains a MsgSnap message, the application MUST report back to raft - // when the snapshot has been received or has failed by calling ReportSnapshot. + /// Messages specifies outbound messages to be sent AFTER Entries are + /// committed to stable storage. + /// If it contains a MsgSnap message, the application MUST report back to raft + /// when the snapshot has been received or has failed by calling ReportSnapshot. pub messages: Vec, - // MustSync indicates whether the HardState and Entries must be synchronously - // written to disk or if an asynchronous write is permissible. + /// MustSync indicates whether the HardState and Entries must be synchronously + /// written to disk or if an asynchronous write is permissible. pub must_sync: bool, } @@ -162,10 +175,11 @@ impl Ready { } } -// RawNode is a thread-unsafe Node. -// The methods of this struct correspond to the methods of Node and are described -// more fully there. +/// RawNode is a thread-unsafe Node. +/// The methods of this struct correspond to the methods of Node and are described +/// more fully there. pub struct RawNode { + /// The internal raft state. pub raft: Raft, prev_ss: SoftState, prev_hs: HardState, @@ -243,21 +257,22 @@ impl RawNode { self.raft.raft_log.applied_to(applied); } - // Tick advances the internal logical clock by a single tick. - // - // Returns true to indicate that there will probably be some readiness need to be handled. + /// Tick advances the internal logical clock by a single tick. + /// + /// Returns true to indicate that there will probably be some readiness which + /// needs to be handled. pub fn tick(&mut self) -> bool { self.raft.tick() } - // Campaign causes this RawNode to transition to candidate state. + /// Campaign causes this RawNode to transition to candidate state. pub fn campaign(&mut self) -> Result<()> { let mut m = Message::new(); m.set_msg_type(MessageType::MsgHup); self.raft.step(m) } - // Propose proposes data be appended to the raft log. + /// Propose proposes data be appended to the raft log. pub fn propose(&mut self, context: Vec, data: Vec) -> Result<()> { let mut m = Message::new(); m.set_msg_type(MessageType::MsgPropose); @@ -269,7 +284,7 @@ impl RawNode { self.raft.step(m) } - // ProposeConfChange proposes a config change. + /// ProposeConfChange proposes a config change. #[allow(needless_pass_by_value)] pub fn propose_conf_change(&mut self, context: Vec, cc: ConfChange) -> Result<()> { let data = protobuf::Message::write_to_bytes(&cc)?; @@ -283,6 +298,7 @@ impl RawNode { self.raft.step(m) } + /// Takes the conf change and applies it. pub fn apply_conf_change(&mut self, cc: &ConfChange) -> ConfState { if cc.get_node_id() == INVALID_ID { let mut cs = ConfState::new(); @@ -302,7 +318,7 @@ impl RawNode { cs } - // Step advances the state machine using the given message. + /// Step advances the state machine using the given message. pub fn step(&mut self, m: Message) -> Result<()> { // ignore unexpected local messages receiving over network if is_local_msg(m.get_msg_type()) { @@ -314,6 +330,7 @@ impl RawNode { Err(Error::StepPeerNotFound) } + /// Given an index, creates a new Ready value from that index. pub fn ready_since(&mut self, applied_idx: u64) -> Ready { Ready::new( &mut self.raft, @@ -323,11 +340,12 @@ impl RawNode { ) } - // Ready returns the current point-in-time state of this RawNode. + /// Ready returns the current point-in-time state of this RawNode. pub fn ready(&mut self) -> Ready { Ready::new(&mut self.raft, &self.prev_ss, &self.prev_hs, None) } + /// Given an index, can determine if there is a ready state from that time. pub fn has_ready_since(&self, applied_idx: Option) -> bool { let raft = &self.raft; if !raft.msgs.is_empty() || raft.raft_log.unstable_entries().is_some() { @@ -356,19 +374,20 @@ impl RawNode { false } - // HasReady called when RawNode user need to check if any Ready pending. - // Checking logic in this method should be consistent with Ready.containsUpdates(). + /// HasReady called when RawNode user need to check if any Ready pending. + /// Checking logic in this method should be consistent with Ready.containsUpdates(). pub fn has_ready(&self) -> bool { self.has_ready_since(None) } + /// Grabs the snapshot from the raft if available. #[inline] pub fn get_snap(&self) -> Option<&Snapshot> { self.raft.get_snap() } - // Advance notifies the RawNode that the application has applied and saved progress in the - // last Ready results. + /// Advance notifies the RawNode that the application has applied and saved progress in the + /// last Ready results. pub fn advance(&mut self, rd: Ready) { self.advance_append(rd); @@ -387,20 +406,22 @@ impl RawNode { } } + /// Appends and commits the ready value. pub fn advance_append(&mut self, rd: Ready) { self.commit_ready(rd); } + /// Advance apply to the passed index. pub fn advance_apply(&mut self, applied: u64) { self.commit_apply(applied); } - // Status returns the current status of the given group. + /// Status returns the current status of the given group. pub fn status(&self) -> Status { Status::new(&self.raft) } - // ReportUnreachable reports the given node is not reachable for the last send. + /// ReportUnreachable reports the given node is not reachable for the last send. pub fn report_unreachable(&mut self, id: u64) { let mut m = Message::new(); m.set_msg_type(MessageType::MsgUnreachable); @@ -409,7 +430,7 @@ impl RawNode { self.raft.step(m).is_ok(); } - // ReportSnapshot reports the status of the sent snapshot. + /// ReportSnapshot reports the status of the sent snapshot. pub fn report_snapshot(&mut self, id: u64, status: SnapshotStatus) { let rej = status == SnapshotStatus::Failure; let mut m = Message::new(); @@ -420,7 +441,7 @@ impl RawNode { self.raft.step(m).is_ok(); } - // TransferLeader tries to transfer leadership to the given transferee. + /// TransferLeader tries to transfer leadership to the given transferee. pub fn transfer_leader(&mut self, transferee: u64) { let mut m = Message::new(); m.set_msg_type(MessageType::MsgTransferLeader); @@ -428,10 +449,10 @@ impl RawNode { self.raft.step(m).is_ok(); } - // ReadIndex requests a read state. The read state will be set in ready. - // Read State has a read index. Once the application advances further than the read - // index, any linearizable read requests issued before the read request can be - // processed safely. The read state will have the same rctx attched. + /// ReadIndex requests a read state. The read state will be set in ready. + /// Read State has a read index. Once the application advances further than the read + /// index, any linearizable read requests issued before the read request can be + /// processed safely. The read state will have the same rctx attached. pub fn read_index(&mut self, rctx: Vec) { let mut m = Message::new(); m.set_msg_type(MessageType::MsgReadIndex); @@ -441,11 +462,13 @@ impl RawNode { self.raft.step(m).is_ok(); } + /// Returns the store as an immutable reference. #[inline] pub fn get_store(&self) -> &T { self.raft.get_store() } + /// Returns the store as a mutable reference. #[inline] pub fn mut_store(&mut self) -> &mut T { self.raft.mut_store() diff --git a/src/read_only.rs b/src/read_only.rs index c2f1b692b..0dbd6a279 100644 --- a/src/read_only.rs +++ b/src/read_only.rs @@ -31,6 +31,7 @@ use eraftpb::Message; use fxhash::{FxHashMap, FxHashSet}; +/// Determines the relative safety of and consistency of read only requests. #[derive(Debug, PartialEq, Clone, Copy)] pub enum ReadOnlyOption { /// Safe guarantees the linearizability of the read only request by @@ -50,14 +51,16 @@ impl Default for ReadOnlyOption { } } -// ReadState provides state for read only query. -// It's caller's responsibility to send MsgReadIndex first before getting -// this state from ready. It's also caller's duty to differentiate if this -// state is what it requests through request_ctx, e.g. given a unique id as -// request_ctx. +/// ReadState provides state for read only query. +/// It's caller's responsibility to send MsgReadIndex first before getting +/// this state from ready. It's also caller's duty to differentiate if this +/// state is what it requests through request_ctx, e.g. given a unique id as +/// request_ctx. #[derive(Default, Debug, PartialEq, Clone)] pub struct ReadState { + /// The index of the read state. pub index: u64, + /// A datagram consisting of context about the request. pub request_ctx: Vec, } @@ -84,9 +87,11 @@ impl ReadOnly { } } - /// add_request adds a read only request into readonly struct. + /// Adds a read only request into readonly struct. + /// /// `index` is the commit index of the raft state machine when it received /// the read only request. + /// /// `m` is the original read only request message from the local or remote node. pub fn add_request(&mut self, index: u64, m: Message) { let ctx = { @@ -105,7 +110,7 @@ impl ReadOnly { self.read_index_queue.push_back(ctx); } - /// rev_ack notifies the ReadOnly struct that the raft state machine received + /// Notifies the ReadOnly struct that the raft state machine received /// an acknowledgment of the heartbeat that attached with the read only request /// context. pub fn recv_ack(&mut self, m: &Message) -> usize { @@ -119,7 +124,7 @@ impl ReadOnly { } } - /// advance advances the read only request queue kept by the ReadOnly struct. + /// Advances the read only request queue kept by the ReadOnly struct. /// It dequeues the requests until it finds the read only request that has /// the same context as the given `m`. pub fn advance(&mut self, m: &Message) -> Vec { @@ -139,8 +144,7 @@ impl ReadOnly { rss } - /// last_pending_request_ctx returns the context of the last pending read only - /// request in ReadOnly struct. + /// Returns the context of the last pending read only request in ReadOnly struct. pub fn last_pending_request_ctx(&self) -> Option> { self.read_index_queue.back().cloned() } diff --git a/src/status.rs b/src/status.rs index fef9d7662..9851971cd 100644 --- a/src/status.rs +++ b/src/status.rs @@ -32,18 +32,25 @@ use progress::Progress; use raft::{Raft, SoftState, StateRole}; use storage::Storage; +/// Represents the current status of the raft #[derive(Default)] pub struct Status { + /// The ID of the current node. pub id: u64, + /// The hardstate of the raft, representing voted state. pub hs: HardState, + /// The softstate of the raft, representing proposed state. pub ss: SoftState, + /// The index of the last entry to have been applied. pub applied: u64, + /// The progress towards catching up and applying logs. pub progress: FxHashMap, + /// The progress of learners in catching up and applying logs. pub learner_progress: FxHashMap, } impl Status { - // new gets a copy of the current raft status. + /// Gets a copy of the current raft status. pub fn new(raft: &Raft) -> Status { let mut s = Status { id: raft.id, diff --git a/src/storage.rs b/src/storage.rs index 13488ac6b..cffdec8d8 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,3 +1,9 @@ +//! Represents the storage trait and example implementation. +//! +//! The storage trait is used to house and eventually serialize the state of the system. +//! Custom implementations of this are normal and this is likely to be a key integration +//! point for your distributed storage. + // Copyright 2016 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -32,6 +38,8 @@ use eraftpb::{ConfState, Entry, HardState, Snapshot}; use errors::{Error, Result, StorageError}; use util; +/// Holds both the hard state (commit index, vote leader, term) and the configuration state +/// (Current node IDs) #[derive(Debug, Clone)] pub struct RaftState { /// Contains the last meta information including commit index, the vote leader, and the vote term. @@ -63,7 +71,7 @@ pub trait Storage { /// into the latest snapshot; if storage only contains the dummy entry the /// first log entry is not available). fn first_index(&self) -> Result; - /// last_index returns the index of the last entry in the log. + /// The index of the last entry in the log. fn last_index(&self) -> Result; /// Returns the most recent snapshot. /// @@ -73,6 +81,8 @@ pub trait Storage { fn snapshot(&self) -> Result; } +/// The Memory Storage Core instance holds the actual state of the storage struct. To access this +/// value, use the `rl` and `wl` functions on the main MemStorage implementation. pub struct MemStorageCore { hard_state: HardState, snapshot: Snapshot, @@ -93,7 +103,7 @@ impl Default for MemStorageCore { } impl MemStorageCore { - /// set_hardstate saves the current HardState. + /// Saves the current HardState. pub fn set_hardstate(&mut self, hs: HardState) { self.hard_state = hs; } @@ -102,8 +112,7 @@ impl MemStorageCore { self.entries[0].get_index() + self.entries.len() as u64 - 1 } - /// apply_snapshot overwrites the contents of this Storage object with - /// those of the given snapshot. + /// Overwrites the contents of this Storage object with those of the given snapshot. pub fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()> { // handle check for old snapshot being applied let index = self.snapshot.get_metadata().get_index(); @@ -120,7 +129,7 @@ impl MemStorageCore { Ok(()) } - /// create_snapshot makes a snapshot which can be retrieved with snapshot() and + /// Makes a snapshot which can be retrieved with snapshot() and /// can be used to reconstruct the state at that point. /// If any configuration changes have been made since the last compaction, /// the result of the last apply_conf_change must be passed in. @@ -153,7 +162,7 @@ impl MemStorageCore { Ok(&self.snapshot) } - /// compact discards all log entries prior to compact_index. + /// Discards all log entries prior to compact_index. /// It is the application's responsibility to not attempt to compact an index /// greater than RaftLog.applied. pub fn compact(&mut self, compact_index: u64) -> Result<()> { @@ -224,23 +233,28 @@ pub struct MemStorage { } impl MemStorage { + /// Returns a new memory storage value. pub fn new() -> MemStorage { MemStorage { ..Default::default() } } + /// Opens up a read lock on the storage and returns a guard handle. Use this + /// with functions that don't require mutation. pub fn rl(&self) -> RwLockReadGuard { self.core.read().unwrap() } + /// Opens up a write lock on the storage and returns guard handle. Use this + /// with functions that take a mutable reference to self. pub fn wl(&self) -> RwLockWriteGuard { self.core.write().unwrap() } } impl Storage for MemStorage { - /// initial_state implements the Storage trait. + /// Implements the Storage trait. fn initial_state(&self) -> Result { let core = self.rl(); Ok(RaftState { @@ -249,7 +263,7 @@ impl Storage for MemStorage { }) } - /// entries implements the Storage trait. + /// Implements the Storage trait. fn entries(&self, low: u64, high: u64, max_size: u64) -> Result> { let core = self.rl(); let offset = core.entries[0].get_index(); @@ -272,7 +286,7 @@ impl Storage for MemStorage { Ok(ents) } - /// term implements the Storage trait. + /// Implements the Storage trait. fn term(&self, idx: u64) -> Result { let core = self.rl(); let offset = core.entries[0].get_index(); @@ -285,19 +299,19 @@ impl Storage for MemStorage { Ok(core.entries[(idx - offset) as usize].get_term()) } - /// first_index implements the Storage trait. + /// Implements the Storage trait. fn first_index(&self) -> Result { let core = self.rl(); Ok(core.entries[0].get_index() + 1) } - /// last_index implements the Storage trait. + /// Implements the Storage trait. fn last_index(&self) -> Result { let core = self.rl(); Ok(core.inner_last_index()) } - /// snapshot implements the Storage trait. + /// Implements the Storage trait. fn snapshot(&self) -> Result { let core = self.rl(); Ok(core.snapshot.clone()) diff --git a/src/util.rs b/src/util.rs index 1f77942d1..f74a76390 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,6 @@ +//! This module contains a collection of various tools to use to manipulate +//! and control messages and data associated with raft. + // Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,8 +18,36 @@ use std::u64; use protobuf::Message; +/// A number to represent that there is no limit. pub const NO_LIMIT: u64 = u64::MAX; +/// Truncates the list of entries down to a specific byte-length of +/// all entries together. +/// +/// # Examples +/// +/// ``` +/// use raft::{util::limit_size, prelude::*}; +/// +/// let template = { +/// let mut entry = Entry::new(); +/// entry.set_data("*".repeat(100).into_bytes()); +/// entry +/// }; +/// +/// // Make a bunch of entries that are ~100 bytes long +/// let mut entries = vec![ +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// ]; +/// +/// assert_eq!(entries.len(), 5); +/// limit_size(&mut entries, 220); +/// assert_eq!(entries.len(), 2); +/// ``` pub fn limit_size(entries: &mut Vec, max: u64) { if max == NO_LIMIT || entries.len() <= 1 { return;