Skip to content

Commit

Permalink
Fix: when adding a learner, ensure the last membership is committed
Browse files Browse the repository at this point in the history
Previously, when adding a learner to a Raft cluster, the last membership was not
always marked as committed, which could cause issues when a follower tried to
truncate logs by reverting to the last committed membership. To prevent this
issue, we have updated the code to ensure the last membership is committed when
adding a learner.

In addition to this fix, we have also made several refactoring changes,
including refining method names for trait `Coherent`, renaming
`Membership::next_safe()` to `next_coherent()` for consistency, and updating
enum `ChangeMembers` to include more variants for adding and removing learners.
We have also removed `RaftCore::add_learner()` in favor of using
`change_membership()` for all membership operations, and added a `ChangeHandler`
to build new membership configurations for change-membership requests.

Finally, we have updated the `Membership` API with a new method `new_with_nodes()`
for building a new membership configuration, and moved the validation check
out into a separate function, `ensure_valid()`. Validation is now done only
when needed.
  • Loading branch information
drmingdrmer committed Feb 20, 2023
1 parent 0b145e2 commit c8fccb2
Show file tree
Hide file tree
Showing 15 changed files with 660 additions and 358 deletions.
29 changes: 12 additions & 17 deletions openraft/src/change_members.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use crate::Node;
use crate::NodeId;

#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum ChangeMembers<NID: NodeId> {
Add(BTreeSet<NID>),
Remove(BTreeSet<NID>),
Replace(BTreeSet<NID>),
pub enum ChangeMembers<NID: NodeId, N: Node> {
AddVoter(BTreeSet<NID>),
RemoveVoter(BTreeSet<NID>),
ReplaceAllVoters(BTreeSet<NID>),
AddNodes(BTreeMap<NID, N>),
RemoveNodes(BTreeSet<NID>),
ReplaceAllNodes(BTreeMap<NID, N>),
}

/// Convert a series of ids to a `Replace` operation.
impl<NID, I> From<I> for ChangeMembers<NID>
impl<NID, N, I> From<I> for ChangeMembers<NID, N>
where
NID: NodeId,
N: Node,
I: IntoIterator<Item = NID>,
{
fn from(r: I) -> Self {
let ids = r.into_iter().collect::<BTreeSet<NID>>();
ChangeMembers::Replace(ids)
}
}

impl<NID: NodeId> ChangeMembers<NID> {
/// Apply the `ChangeMembers` to `old` node set, return new node set
pub fn apply_to(self, old: &BTreeSet<NID>) -> BTreeSet<NID> {
match self {
ChangeMembers::Replace(c) => c,
ChangeMembers::Add(add_members) => old.union(&add_members).cloned().collect::<BTreeSet<_>>(),
ChangeMembers::Remove(remove_members) => old.difference(&remove_members).cloned().collect::<BTreeSet<_>>(),
}
ChangeMembers::ReplaceAllVoters(ids)
}
}
66 changes: 21 additions & 45 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::future::Either;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryFutureExt;
use maplit::btreemap;
use maplit::btreeset;
use pin_utils::pin_mut;
use tokio::io::AsyncRead;
Expand Down Expand Up @@ -350,48 +351,31 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
/// on the given channel.
/// Submit change-membership by writing a Membership log entry.
///
/// Adding a learner does not affect election, thus it does not need to enter joint consensus.
/// If `retain` is `true`, removed `voter` will becomes `learner`. Otherwise they will
/// be just removed.
///
/// TODO: It has to wait for the previous membership to commit.
/// TODO: Otherwise a second proposed membership implies the previous one is committed.
/// TODO: Test it.
/// TODO: This limit can be removed if membership_state is replaced by a list of membership
/// logs. TODO: Because allowing this requires the engine to be able to store more than 2
/// membership logs. And it does not need to wait for the previous membership log to commit
/// to propose the new membership log.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) async fn add_learner(
&mut self,
target: C::NodeId,
node: C::Node,
tx: ClientWriteTx<C>,
) -> Result<(), Fatal<C::NodeId>> {
// TODO: move these logic to Engine?
let curr = &self.engine.state.membership_state.effective().membership;
let new_membership = curr.add_learner(target, node);

tracing::debug!(?new_membership, "new_membership with added learner: {}", target);

self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?;

Ok(())
}

/// Submit change-membership by writing a Membership log entry, if the `expect` is satisfied.
/// Changing membership includes changing voters config or adding/removing learners:
///
/// If `turn_to_learner` is `true`, removed `voter` will becomes `learner`. Otherwise they will
/// be just removed.
/// - To change voters config, it will build a new **joint** config. If it already a joint
/// config, it returns the final uniform config.
/// - Adding a learner does not affect election, thus it does not need to enter joint consensus.
/// But it still has to wait for the previous membership to commit. Otherwise a second
/// proposed membership implies the previous one is committed.
// ---
// TODO: This limit can be removed if membership_state is replaced by a list of membership logs.
// Because allowing this requires the engine to be able to store more than 2
// membership logs. And it does not need to wait for the previous membership log to commit
// to propose the new membership log.
#[tracing::instrument(level = "debug", skip(self, tx))]
pub(super) async fn change_membership(
&mut self,
changes: ChangeMembers<C::NodeId>,
turn_to_learner: bool,
changes: ChangeMembers<C::NodeId, C::Node>,
retain: bool,
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
) -> Result<(), Fatal<C::NodeId>> {
let res = self.engine.state.membership_state.create_updated_membership(changes, turn_to_learner);
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
let new_membership = match res {
Ok(x) => x,
Err(e) => {
Expand Down Expand Up @@ -1046,18 +1030,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.handle_initialize(members, tx).await?;
}
RaftMsg::AddLearner { id, node, tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.add_learner(id, node, tx).await?;
} else {
self.reject_with_forward_to_leader(tx);
}
self.change_membership(ChangeMembers::AddNodes(btreemap! {id=>node}), true, tx).await?;
}
RaftMsg::ChangeMembership {
changes,
turn_to_learner,
tx,
} => {
self.change_membership(changes, turn_to_learner, tx).await?;
RaftMsg::ChangeMembership { changes, retain, tx } => {
self.change_membership(changes, retain, tx).await?;
}
RaftMsg::ExternalRequest { req } => {
req(&self.engine.state, &mut self.storage, &mut self.network);
Expand Down
161 changes: 161 additions & 0 deletions openraft/src/membership/change_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use crate::error::ChangeMembershipError;
use crate::error::InProgress;
use crate::ChangeMembers;
use crate::Membership;
use crate::MembershipState;
use crate::Node;
use crate::NodeId;

pub(crate) struct ChangeHandler<'m, NID, N>
where
NID: NodeId,
N: Node,
{
pub(crate) state: &'m MembershipState<NID, N>,
}

impl<'m, NID, N> ChangeHandler<'m, NID, N>
where
NID: NodeId,
N: Node,
{
/// Builds a new membership configuration by applying changes to the current configuration.
///
/// * `changes`: The changes to apply to the current membership configuration.
/// * `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that
/// continue to receive log replication from the leader.
///
/// A Result containing the new membership configuration if the operation succeeds, or a
/// `ChangeMembershipError` if an error occurs.
///
/// This function ensures that the cluster will have at least one voter in the new membership
/// configuration.
pub(crate) fn apply(
&self,
change: ChangeMembers<NID, N>,
retain: bool,
) -> Result<Membership<NID, N>, ChangeMembershipError<NID>> {
self.ensure_committed()?;

let new_membership = self.state.effective().membership.clone().change(change, retain)?;
Ok(new_membership)
}

/// Ensures that the latest membership has been committed.
///
/// Returns Ok if the last membership is committed, or an InProgress error
/// otherwise, to indicate a change-membership request should be rejected.
pub(crate) fn ensure_committed(&self) -> Result<(), InProgress<NID>> {
let effective = self.state.effective();
let committed = self.state.committed();

if effective.log_id == committed.log_id {
// Ok: last membership(effective) is committed
Ok(())
} else {
Err(InProgress {
committed: committed.log_id,
membership_log_id: effective.log_id,
})
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use maplit::btreemap;
use maplit::btreeset;

use crate::error::ChangeMembershipError;
use crate::error::EmptyMembership;
use crate::error::InProgress;
use crate::error::LearnerNotFound;
use crate::testing::log_id;
use crate::ChangeMembers;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;

/// Create an Arc<EffectiveMembership>
fn effmem(term: u64, index: u64, m: Membership<u64, ()>) -> Arc<EffectiveMembership<u64, ()>> {
let lid = Some(log_id(term, index));
Arc::new(EffectiveMembership::new(lid, m))
}

fn m1() -> Membership<u64, ()> {
Membership::new(vec![btreeset! {1}], None)
}

fn m12() -> Membership<u64, ()> {
Membership::new(vec![btreeset! {1,2}], None)
}

fn m123_345() -> Membership<u64, ()> {
Membership::new(vec![btreeset! {1,2,3}, btreeset! {3,4,5}], None)
}

#[test]
fn test_apply_not_committed() -> anyhow::Result<()> {
let new = || MembershipState::new(effmem(2, 2, m1()), effmem(3, 4, m123_345()));
let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {1}), false);

assert_eq!(
Err(ChangeMembershipError::InProgress(InProgress {
committed: Some(log_id(2, 2)),
membership_log_id: Some(log_id(3, 4))
})),
res
);

Ok(())
}

#[test]
fn test_apply_empty_voters() -> anyhow::Result<()> {
let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1}), false);

assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res);

Ok(())
}

#[test]
fn test_apply_learner_not_found() -> anyhow::Result<()> {
let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1()));
let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {2}), false);

assert_eq!(
Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })),
res
);

Ok(())
}

#[test]
fn test_apply_retain_learner() -> anyhow::Result<()> {
let new = || MembershipState::new(effmem(3, 4, m12()), effmem(3, 4, m123_345()));

// Do not leave removed voters as learner
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), false);
assert_eq!(
Ok(Membership::new(vec![btreeset! {3,4,5}], btreemap! {3=>(),4=>(),5=>()})),
res
);

// Leave removed voters as learner
let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), true);
assert_eq!(
Ok(Membership::new(
vec![btreeset! {3,4,5}],
btreemap! {1=>(),2=>(),3=>(),4=>(),5=>()}
)),
res
);

Ok(())
}
}
Loading

0 comments on commit c8fccb2

Please # to comment.