From c8fccb2225862370ac5e4e7e27c9632f82f332d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 19 Feb 2023 15:17:45 +0800 Subject: [PATCH] Fix: when adding a learner, ensure the last membership is committed Previously, when adding a learner to a Raft cluster, the last membership was not always marked as committed, which could cause issues when a follower tried to truncate logs by reverting to the last committed membership. To prevent this issue, we have updated the code to ensure the last membership is committed when adding a learner. In addition to this fix, we have also made several refactoring changes, including refining method names for trait `Coherent`, renaming `Membership::next_safe()` to `next_coherent()` for consistency, and updating enum `ChangeMembers` to include more variants for adding and removing learners. We have also removed `RaftCore::add_learner()` in favor of using `change_membership()` for all membership operations, and added a `ChangeHandler` to build new membership configurations for change-membership requests. Finally, we have updated the `Membership` API with a new method `new_with_nodes()` for building a new membership configuration, and moved the validation check out into a separate function, `ensure_valid()`. Validation is now done only when needed. --- openraft/src/change_members.rs | 29 +- openraft/src/core/raft_core.rs | 66 ++-- openraft/src/membership/change_handler.rs | 161 ++++++++ openraft/src/membership/membership.rs | 356 +++++++++++++++--- openraft/src/membership/membership_state.rs | 63 +--- .../src/membership/membership_state_test.rs | 69 ---- openraft/src/membership/membership_test.rs | 69 ++-- openraft/src/membership/mod.rs | 2 + openraft/src/quorum/coherent.rs | 7 +- openraft/src/quorum/coherent_impl.rs | 8 +- openraft/src/quorum/coherent_test.rs | 76 ++-- openraft/src/raft.rs | 18 +- openraft/tests/membership/t10_add_learner.rs | 62 +++ .../membership/t16_change_membership_cases.rs | 30 +- .../tests/membership/t20_change_membership.rs | 2 + 15 files changed, 660 insertions(+), 358 deletions(-) create mode 100644 openraft/src/membership/change_handler.rs diff --git a/openraft/src/change_members.rs b/openraft/src/change_members.rs index 91e5caddb..f87cee536 100644 --- a/openraft/src/change_members.rs +++ b/openraft/src/change_members.rs @@ -1,35 +1,30 @@ +use std::collections::BTreeMap; use std::collections::BTreeSet; +use crate::Node; use crate::NodeId; #[derive(Debug, Clone)] #[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum ChangeMembers { - Add(BTreeSet), - Remove(BTreeSet), - Replace(BTreeSet), +pub enum ChangeMembers { + AddVoter(BTreeSet), + RemoveVoter(BTreeSet), + ReplaceAllVoters(BTreeSet), + AddNodes(BTreeMap), + RemoveNodes(BTreeSet), + ReplaceAllNodes(BTreeMap), } /// Convert a series of ids to a `Replace` operation. -impl From for ChangeMembers +impl From for ChangeMembers where NID: NodeId, + N: Node, I: IntoIterator, { fn from(r: I) -> Self { let ids = r.into_iter().collect::>(); - ChangeMembers::Replace(ids) - } -} - -impl ChangeMembers { - /// Apply the `ChangeMembers` to `old` node set, return new node set - pub fn apply_to(self, old: &BTreeSet) -> BTreeSet { - match self { - ChangeMembers::Replace(c) => c, - ChangeMembers::Add(add_members) => old.union(&add_members).cloned().collect::>(), - ChangeMembers::Remove(remove_members) => old.difference(&remove_members).cloned().collect::>(), - } + ChangeMembers::ReplaceAllVoters(ids) } } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 1905b6051..2b9b63d3f 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -11,6 +11,7 @@ use futures::future::Either; use futures::stream::FuturesUnordered; use futures::StreamExt; use futures::TryFutureExt; +use maplit::btreemap; use maplit::btreeset; use pin_utils::pin_mut; use tokio::io::AsyncRead; @@ -350,48 +351,31 @@ impl, S: RaftStorage> RaftCore, - ) -> Result<(), Fatal> { - // TODO: move these logic to Engine? - let curr = &self.engine.state.membership_state.effective().membership; - let new_membership = curr.add_learner(target, node); - - tracing::debug!(?new_membership, "new_membership with added learner: {}", target); - - self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?; - - Ok(()) - } - - /// Submit change-membership by writing a Membership log entry, if the `expect` is satisfied. + /// Changing membership includes changing voters config or adding/removing learners: /// - /// If `turn_to_learner` is `true`, removed `voter` will becomes `learner`. Otherwise they will - /// be just removed. + /// - To change voters config, it will build a new **joint** config. If it already a joint + /// config, it returns the final uniform config. + /// - Adding a learner does not affect election, thus it does not need to enter joint consensus. + /// But it still has to wait for the previous membership to commit. Otherwise a second + /// proposed membership implies the previous one is committed. + // --- + // TODO: This limit can be removed if membership_state is replaced by a list of membership logs. + // Because allowing this requires the engine to be able to store more than 2 + // membership logs. And it does not need to wait for the previous membership log to commit + // to propose the new membership log. #[tracing::instrument(level = "debug", skip(self, tx))] pub(super) async fn change_membership( &mut self, - changes: ChangeMembers, - turn_to_learner: bool, + changes: ChangeMembers, + retain: bool, tx: RaftRespTx, ClientWriteError>, ) -> Result<(), Fatal> { - let res = self.engine.state.membership_state.create_updated_membership(changes, turn_to_learner); + let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { Ok(x) => x, Err(e) => { @@ -1046,18 +1030,10 @@ impl, S: RaftStorage> RaftCore { - if self.engine.state.is_leader(&self.engine.config.id) { - self.add_learner(id, node, tx).await?; - } else { - self.reject_with_forward_to_leader(tx); - } + self.change_membership(ChangeMembers::AddNodes(btreemap! {id=>node}), true, tx).await?; } - RaftMsg::ChangeMembership { - changes, - turn_to_learner, - tx, - } => { - self.change_membership(changes, turn_to_learner, tx).await?; + RaftMsg::ChangeMembership { changes, retain, tx } => { + self.change_membership(changes, retain, tx).await?; } RaftMsg::ExternalRequest { req } => { req(&self.engine.state, &mut self.storage, &mut self.network); diff --git a/openraft/src/membership/change_handler.rs b/openraft/src/membership/change_handler.rs new file mode 100644 index 000000000..34e3693ee --- /dev/null +++ b/openraft/src/membership/change_handler.rs @@ -0,0 +1,161 @@ +use crate::error::ChangeMembershipError; +use crate::error::InProgress; +use crate::ChangeMembers; +use crate::Membership; +use crate::MembershipState; +use crate::Node; +use crate::NodeId; + +pub(crate) struct ChangeHandler<'m, NID, N> +where + NID: NodeId, + N: Node, +{ + pub(crate) state: &'m MembershipState, +} + +impl<'m, NID, N> ChangeHandler<'m, NID, N> +where + NID: NodeId, + N: Node, +{ + /// Builds a new membership configuration by applying changes to the current configuration. + /// + /// * `changes`: The changes to apply to the current membership configuration. + /// * `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that + /// continue to receive log replication from the leader. + /// + /// A Result containing the new membership configuration if the operation succeeds, or a + /// `ChangeMembershipError` if an error occurs. + /// + /// This function ensures that the cluster will have at least one voter in the new membership + /// configuration. + pub(crate) fn apply( + &self, + change: ChangeMembers, + retain: bool, + ) -> Result, ChangeMembershipError> { + self.ensure_committed()?; + + let new_membership = self.state.effective().membership.clone().change(change, retain)?; + Ok(new_membership) + } + + /// Ensures that the latest membership has been committed. + /// + /// Returns Ok if the last membership is committed, or an InProgress error + /// otherwise, to indicate a change-membership request should be rejected. + pub(crate) fn ensure_committed(&self) -> Result<(), InProgress> { + let effective = self.state.effective(); + let committed = self.state.committed(); + + if effective.log_id == committed.log_id { + // Ok: last membership(effective) is committed + Ok(()) + } else { + Err(InProgress { + committed: committed.log_id, + membership_log_id: effective.log_id, + }) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use maplit::btreemap; + use maplit::btreeset; + + use crate::error::ChangeMembershipError; + use crate::error::EmptyMembership; + use crate::error::InProgress; + use crate::error::LearnerNotFound; + use crate::testing::log_id; + use crate::ChangeMembers; + use crate::EffectiveMembership; + use crate::Membership; + use crate::MembershipState; + + /// Create an Arc + fn effmem(term: u64, index: u64, m: Membership) -> Arc> { + let lid = Some(log_id(term, index)); + Arc::new(EffectiveMembership::new(lid, m)) + } + + fn m1() -> Membership { + Membership::new(vec![btreeset! {1}], None) + } + + fn m12() -> Membership { + Membership::new(vec![btreeset! {1,2}], None) + } + + fn m123_345() -> Membership { + Membership::new(vec![btreeset! {1,2,3}, btreeset! {3,4,5}], None) + } + + #[test] + fn test_apply_not_committed() -> anyhow::Result<()> { + let new = || MembershipState::new(effmem(2, 2, m1()), effmem(3, 4, m123_345())); + let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {1}), false); + + assert_eq!( + Err(ChangeMembershipError::InProgress(InProgress { + committed: Some(log_id(2, 2)), + membership_log_id: Some(log_id(3, 4)) + })), + res + ); + + Ok(()) + } + + #[test] + fn test_apply_empty_voters() -> anyhow::Result<()> { + let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1())); + let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1}), false); + + assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res); + + Ok(()) + } + + #[test] + fn test_apply_learner_not_found() -> anyhow::Result<()> { + let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1())); + let res = new().change_handler().apply(ChangeMembers::AddVoter(btreeset! {2}), false); + + assert_eq!( + Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })), + res + ); + + Ok(()) + } + + #[test] + fn test_apply_retain_learner() -> anyhow::Result<()> { + let new = || MembershipState::new(effmem(3, 4, m12()), effmem(3, 4, m123_345())); + + // Do not leave removed voters as learner + let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), false); + assert_eq!( + Ok(Membership::new(vec![btreeset! {3,4,5}], btreemap! {3=>(),4=>(),5=>()})), + res + ); + + // Leave removed voters as learner + let res = new().change_handler().apply(ChangeMembers::RemoveVoter(btreeset! {1,2}), true); + assert_eq!( + Ok(Membership::new( + vec![btreeset! {3,4,5}], + btreemap! {1=>(),2=>(),3=>(),4=>(),5=>()} + )), + res + ); + + Ok(()) + } +} diff --git a/openraft/src/membership/membership.rs b/openraft/src/membership/membership.rs index af8a31eb9..942e74a94 100644 --- a/openraft/src/membership/membership.rs +++ b/openraft/src/membership/membership.rs @@ -1,8 +1,11 @@ +use core::fmt; use std::collections::BTreeMap; use std::collections::BTreeSet; use maplit::btreemap; +use crate::error::ChangeMembershipError; +use crate::error::EmptyMembership; use crate::error::LearnerNotFound; use crate::membership::NodeRole; use crate::node::Node; @@ -10,6 +13,7 @@ use crate::quorum::AsJoint; use crate::quorum::FindCoherent; use crate::quorum::Joint; use crate::quorum::QuorumSet; +use crate::ChangeMembers; use crate::MessageSummary; use crate::NodeId; @@ -133,7 +137,17 @@ where { fn from(b: BTreeMap) -> Self { let member_ids = b.keys().cloned().collect::>(); - Membership::with_nodes(vec![member_ids], b) + Membership::new_with_nodes(vec![member_ids], b) + } +} + +impl fmt::Display for Membership +where + N: Node, + NID: NodeId, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.summary()) } } @@ -156,8 +170,8 @@ where } res.push(format!("{}", node_id)); - let n = self.get_node(node_id).unwrap(); - res.push(format!(":{{{:?}}}", n)); + let n = self.get_node(node_id).map(|x| format!("{:?}", x)).unwrap_or_else(|| "None".to_string()); + res.push(format!(":{{{}}}", n)); } res.push("}".to_string()); } @@ -174,8 +188,8 @@ where res.push(format!("{}", learner_id)); - let n = self.get_node(learner_id).unwrap(); - res.push(format!(":{{{:?}}}", n)); + let n = self.get_node(learner_id).map(|x| format!("{:?}", x)).unwrap_or_else(|| "None".to_string()); + res.push(format!(":{{{}}}", n)); } res.push("]".to_string()); res.join("") @@ -207,23 +221,9 @@ where /// - `BTreeSet` provides learner node ids whose `Node` data are `Node::default()`, /// - `BTreeMap` provides nodes for every node id. Node ids that are not in /// `configs` are learners. - /// - /// Every node id in `configs` has to present in `nodes`. This is the only difference from - /// [`Membership::new`]. - pub(crate) fn with_nodes(configs: Vec>, nodes: T) -> Self + pub(crate) fn new_with_nodes(configs: Vec>, nodes: T) -> Self where T: IntoNodes { let nodes = nodes.into_nodes(); - - if cfg!(debug_assertions) { - for voter_id in configs.as_joint().ids() { - assert!( - nodes.contains_key(&voter_id), - "nodes has to contain voter id {}", - voter_id - ); - } - } - Membership { configs, nodes } } @@ -249,12 +249,37 @@ where self.configs.len() > 1 } - pub(crate) fn add_learner(&self, node_id: NID, node: N) -> Self { - let configs = self.configs.clone(); + /// Ensure the membership config is valid: + /// - No empty sub-config in it. + /// - Every voter has a corresponding Node. + pub(crate) fn ensure_valid(&self) -> Result<(), ChangeMembershipError> { + self.ensure_non_empty_config()?; + self.ensure_voter_nodes().map_err(|nid| LearnerNotFound { node_id: nid })?; + Ok(()) + } + + /// Ensures that none of the sub config in this joint config are empty. + pub(crate) fn ensure_non_empty_config(&self) -> Result<(), EmptyMembership> { + for c in self.get_joint_config().iter() { + if c.is_empty() { + return Err(EmptyMembership {}); + } + } + + Ok(()) + } - let nodes = Self::extend_nodes(self.nodes.clone(), &btreemap! {node_id=>node}); + /// Ensures that every vote has a corresponding Node. + /// + /// If a voter is found not having a Node, it returns the voter node id in an `Err()` + pub(crate) fn ensure_voter_nodes(&self) -> Result<(), NID> { + for voter_id in self.voter_ids() { + if !self.nodes.contains_key(&voter_id) { + return Err(voter_id); + } + } - Self::with_nodes(configs, nodes) + Ok(()) } } @@ -327,7 +352,11 @@ where N: Node, NID: NodeId, { - /// Returns the next safe membership to change to while the expected final membership is `goal`. + /// Returns the next coherent membership to change to, while the expected final membership is + /// `goal`. + /// + /// `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that + /// continue to receive log replication from the leader. /// /// E.g.(`cicj` is a joint membership of `ci` and `cj`): /// - `c1.next_step(c1)` returns `c1` @@ -339,39 +368,17 @@ where /// With this method the membership change algo is simplified to: /// ```ignore /// while curr != goal { - /// let next = curr.next_step(goal); + /// let next = curr.next_coherent(goal); /// change_membership(next); /// curr = next; /// } /// ``` - pub(crate) fn next_safe(&self, goal: T, removed_to_learner: bool) -> Result> - where T: IntoNodes { - let goal = if goal.has_nodes() { - goal.into_nodes() - } else { - // If `goal` does not contains Node, inherit them from current config. - - let mut voters_map = BTreeMap::new(); - - // There has to be corresponding `Node` for every voter_id - for node_id in goal.node_ids().iter() { - let n = self.get_node(node_id); - if let Some(n) = n { - voters_map.insert(*node_id, n.clone()); - } else { - return Err(LearnerNotFound { node_id: *node_id }); - } - } - voters_map - }; - - let goal_ids = goal.keys().copied().collect::>(); - - let config = Joint::from(self.configs.clone()).find_coherent(goal_ids).children().clone(); + pub(crate) fn next_coherent(&self, goal: BTreeSet, retain: bool) -> Self { + let config = Joint::from(self.configs.clone()).find_coherent(goal).children().clone(); - let mut nodes = Self::extend_nodes(self.nodes.clone(), &goal); + let mut nodes = self.nodes.clone(); - if !removed_to_learner { + if !retain { let old_voter_ids = self.configs.as_joint().ids().collect::>(); let new_voter_ids = config.as_joint().ids().collect::>(); @@ -380,7 +387,58 @@ where } }; - Ok(Membership::with_nodes(config, nodes)) + Membership::new_with_nodes(config, nodes) + } + + /// Apply a change-membership request and return a new instance. + /// + /// It ensures that the returned instance is valid. + /// + /// `retain` specifies whether to retain the removed voters as a learners, i.e., nodes that + /// continue to receive log replication from the leader. + pub(crate) fn change( + mut self, + change: ChangeMembers, + retain: bool, + ) -> Result> { + tracing::debug!(change = debug(&change), "{}", func_name!()); + + let last = self.get_joint_config().last().unwrap(); + + let new_membership = match change { + ChangeMembers::AddVoter(add_voter_ids) => { + let new_voter_ids = last.union(&add_voter_ids).copied().collect::>(); + self.next_coherent(new_voter_ids, retain) + } + ChangeMembers::RemoveVoter(remove_voter_ids) => { + let new_voter_ids = last.difference(&remove_voter_ids).copied().collect::>(); + self.next_coherent(new_voter_ids, retain) + } + ChangeMembers::ReplaceAllVoters(all_voter_ids) => self.next_coherent(all_voter_ids, retain), + ChangeMembers::AddNodes(add_nodes) => { + // When adding nodes, do not override existing node + for (node_id, node) in add_nodes.into_iter() { + self.nodes.entry(node_id).or_insert(node); + } + self + } + ChangeMembers::RemoveNodes(remove_node_ids) => { + for node_id in remove_node_ids.iter() { + self.nodes.remove(node_id); + } + self + } + ChangeMembers::ReplaceAllNodes(all_nodes) => { + self.nodes = all_nodes; + self + } + }; + + tracing::debug!(new_membership = display(&new_membership), "new membership"); + + new_membership.ensure_valid()?; + + Ok(new_membership) } /// Build a QuorumSet from current joint config @@ -392,3 +450,195 @@ where Joint::new(qs) } } + +#[cfg(test)] +mod tests { + use maplit::btreemap; + use maplit::btreeset; + + use crate::error::ChangeMembershipError; + use crate::error::EmptyMembership; + use crate::error::LearnerNotFound; + use crate::ChangeMembers; + use crate::Membership; + + #[test] + fn test_membership_ensure_voter_nodes() -> anyhow::Result<()> { + let m = Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>()}, + }; + assert_eq!(Err(2), m.ensure_voter_nodes()); + Ok(()) + } + + #[test] + fn test_membership_change() -> anyhow::Result<()> { + let m = || Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),3=>()}, + }; + + // Add: no such learner + { + let res = m().change(ChangeMembers::AddVoter(btreeset! {4}), true); + assert_eq!( + Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 4 })), + res + ); + } + + // Add: ok + { + let res = m().change(ChangeMembers::AddVoter(btreeset! {3}), true); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}, btreeset! {1,2,3}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // Remove: no such voter + { + let res = m().change(ChangeMembers::RemoveVoter(btreeset! {5}), true); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // Remove: become empty + { + let res = m().change(ChangeMembers::RemoveVoter(btreeset! {1,2}), true); + assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res); + } + + // Remove: OK retain + { + let res = m().change(ChangeMembers::RemoveVoter(btreeset! {1}), true); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}, btreeset! {2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // Remove: OK, not retain; learner not removed + { + let res = m().change(ChangeMembers::RemoveVoter(btreeset! {1}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}, btreeset! {2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // Remove: OK, not retain; learner removed + { + let mem = Membership:: { + configs: vec![btreeset! {1,2}, btreeset! {2}], + nodes: btreemap! {1=>(),2=>(),3=>()}, + }; + let res = mem.change(ChangeMembers::RemoveVoter(btreeset! {1}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {2}], + nodes: btreemap! {2=>(),3=>()} + }), + res + ); + } + + // Replace: + { + let res = m().change(ChangeMembers::ReplaceAllVoters(btreeset! {2}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}, btreeset! {2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // AddNodes: existent voter + { + let res = m().change(ChangeMembers::AddNodes(btreemap! {2=>()}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // AddNodes: existent learner + { + let res = m().change(ChangeMembers::AddNodes(btreemap! {3=>()}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),3=>()} + }), + res + ); + } + + // AddNodes: Ok + { + let res = m().change(ChangeMembers::AddNodes(btreemap! {4=>()}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),3=>(), 4=>()} + }), + res + ); + } + + // RemoveNodes: can not remove node for voter + { + let res = m().change(ChangeMembers::RemoveNodes(btreeset! {2}), false); + assert_eq!( + Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })), + res + ); + } + + // RemoveNodes: Ok + { + let res = m().change(ChangeMembers::RemoveNodes(btreeset! {3}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>()} + }), + res + ); + } + + // ReplaceAllNodes: Ok + { + let res = m().change(ChangeMembers::ReplaceAllNodes(btreemap! {1=>(),2=>(),4=>()}), false); + assert_eq!( + Ok(Membership:: { + configs: vec![btreeset! {1,2}], + nodes: btreemap! {1=>(),2=>(),4=>()} + }), + res + ); + } + + Ok(()) + } +} diff --git a/openraft/src/membership/membership_state.rs b/openraft/src/membership/membership_state.rs index 38d559436..12ae6fabe 100644 --- a/openraft/src/membership/membership_state.rs +++ b/openraft/src/membership/membership_state.rs @@ -1,17 +1,13 @@ use std::error::Error; use std::sync::Arc; -use crate::error::ChangeMembershipError; -use crate::error::EmptyMembership; -use crate::error::InProgress; use crate::less_equal; +use crate::membership::ChangeHandler; use crate::node::Node; use crate::validate::Validate; -use crate::ChangeMembers; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; -use crate::Membership; use crate::MessageSummary; use crate::NodeId; @@ -80,59 +76,6 @@ where self.effective.membership.is_voter(id) } - /// Builds a new membership configuration by applying changes to the current configuration. - /// - /// * `changes`: The changes to apply to the current membership configuration. - /// * `convert_removed_to_learner`: Indicates whether the removed voter should be left in the - /// membership configuration as a learner. - /// - /// A Result containing the new membership configuration if the operation succeeds, or a - /// `ChangeMembershipError` if an error occurs. - /// - /// This function ensures that the cluster will have at least one voter in the new membership - /// configuration. - pub(crate) fn create_updated_membership( - &self, - changes: ChangeMembers, - convert_removed_to_learner: bool, - ) -> Result, ChangeMembershipError> { - let effective = self.effective(); - - self.ensure_last_membership_committed()?; - - let last = effective.membership.get_joint_config().last().unwrap(); - let new_voter_ids = changes.apply_to(last); - - // Ensure cluster will have at least one voter. - if new_voter_ids.is_empty() { - return Err(EmptyMembership {}.into()); - } - - let new_membership = effective.membership.next_safe(new_voter_ids, convert_removed_to_learner)?; - - tracing::debug!(?new_membership, "new membership config"); - Ok(new_membership) - } - - /// Ensures that the latest membership configuration has been committed. - /// - /// Returns Ok if the last membership configuration is committed, or an InProgress error - /// otherwise. - pub(crate) fn ensure_last_membership_committed(&self) -> Result<(), InProgress> { - let effective = self.effective(); - let committed = self.committed(); - - if self.committed().log_id == self.effective().log_id { - // Ok: last membership(effective) is committed - Ok(()) - } else { - Err(InProgress { - committed: committed.log_id, - membership_log_id: effective.log_id, - }) - } - } - /// Update membership state if the specified committed_log_id is greater than `self.effective` pub(crate) fn commit(&mut self, committed_log_id: &Option>) { if committed_log_id >= &self.effective().log_id { @@ -256,6 +199,10 @@ where pub fn effective(&self) -> &Arc> { &self.effective } + + pub(crate) fn change_handler(&self) -> ChangeHandler { + ChangeHandler { state: self } + } } impl Validate for MembershipState diff --git a/openraft/src/membership/membership_state_test.rs b/openraft/src/membership/membership_state_test.rs index 7aeb555c5..73d089f0c 100644 --- a/openraft/src/membership/membership_state_test.rs +++ b/openraft/src/membership/membership_state_test.rs @@ -1,14 +1,8 @@ use std::sync::Arc; -use maplit::btreemap; use maplit::btreeset; -use crate::error::ChangeMembershipError; -use crate::error::EmptyMembership; -use crate::error::InProgress; -use crate::error::LearnerNotFound; use crate::testing::log_id; -use crate::ChangeMembers; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; @@ -180,66 +174,3 @@ fn test_membership_state_truncate() -> anyhow::Result<()> { Ok(()) } - -#[test] -fn test_membership_state_next_membership_not_committed() -> anyhow::Result<()> { - let new = || MembershipState::new(effmem(2, 2, m1()), effmem(3, 4, m123_345())); - let res = new().create_updated_membership(ChangeMembers::Add(btreeset! {1}), false); - - assert_eq!( - Err(ChangeMembershipError::InProgress(InProgress { - committed: Some(log_id(2, 2)), - membership_log_id: Some(log_id(3, 4)) - })), - res - ); - - Ok(()) -} - -#[test] -fn test_membership_state_create_updated_membership_empty_voters() -> anyhow::Result<()> { - let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1())); - let res = new().create_updated_membership(ChangeMembers::Remove(btreeset! {1}), false); - - assert_eq!(Err(ChangeMembershipError::EmptyMembership(EmptyMembership {})), res); - - Ok(()) -} - -#[test] -fn test_membership_state_create_updated_membership_learner_not_found() -> anyhow::Result<()> { - let new = || MembershipState::new(effmem(3, 4, m1()), effmem(3, 4, m1())); - let res = new().create_updated_membership(ChangeMembers::Add(btreeset! {2}), false); - - assert_eq!( - Err(ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: 2 })), - res - ); - - Ok(()) -} - -#[test] -fn test_membership_state_create_updated_membership_removed_to_learner() -> anyhow::Result<()> { - let new = || MembershipState::new(effmem(3, 4, m12()), effmem(3, 4, m123_345())); - - // Do not leave removed voters as learner - let res = new().create_updated_membership(ChangeMembers::Remove(btreeset! {1,2}), false); - assert_eq!( - Ok(Membership::new(vec![btreeset! {3,4,5}], btreemap! {3=>(),4=>(),5=>()})), - res - ); - - // Leave removed voters as learner - let res = new().create_updated_membership(ChangeMembers::Remove(btreeset! {1,2}), true); - assert_eq!( - Ok(Membership::new( - vec![btreeset! {3,4,5}], - btreemap! {1=>(),2=>(),3=>(),4=>(),5=>()} - )), - res - ); - - Ok(()) -} diff --git a/openraft/src/membership/membership_test.rs b/openraft/src/membership/membership_test.rs index b86a6de29..3de9a4439 100644 --- a/openraft/src/membership/membership_test.rs +++ b/openraft/src/membership/membership_test.rs @@ -5,8 +5,8 @@ use std::fmt::Formatter; use maplit::btreemap; use maplit::btreeset; -use crate::error::LearnerNotFound; use crate::membership::IntoNodes; +use crate::ChangeMembers; use crate::Membership; use crate::MessageSummary; @@ -42,7 +42,7 @@ fn test_membership_summary() -> anyhow::Result<()> { let m = Membership::::new(vec![btreeset! {1,2}, btreeset! {3}], Some(btreeset! {4})); assert_eq!("members:[{1:{()},2:{()}},{3:{()}}],learners:[4:{()}]", m.summary()); - let m = Membership::::with_nodes(vec![btreeset! {1,2}, btreeset! {3}], btreemap! { + let m = Membership::::new_with_nodes(vec![btreeset! {1,2}, btreeset! {3}], btreemap! { 1=>node("127.0.0.1", "k1"), 2=>node("127.0.0.2", "k2"), 3=>node("127.0.0.3", "k3"), @@ -91,7 +91,7 @@ fn test_membership_with_learners() -> anyhow::Result<()> { // test multi membership with learners { let m1_2 = Membership::::new(vec![btreeset! {1}], Some(btreeset! {2})); - let m1_23 = m1_2.add_learner(3, ()); + let m1_23 = m1_2.clone().change(ChangeMembers::AddNodes(btreemap! {3=>()}), true)?; // test learner and membership assert_eq!(vec![1], m1_2.voter_ids().collect::>()); @@ -102,12 +102,13 @@ fn test_membership_with_learners() -> anyhow::Result<()> { // Adding a member as learner has no effect: - let m = m1_23.add_learner(1, ()); + let m = m1_23.clone().change(ChangeMembers::AddNodes(btreemap! {1=>()}), true)?; + // let m = m1_23.add_learner(1, ()); assert_eq!(vec![1], m.voter_ids().collect::>()); // Adding a existent learner has no effect: - let m = m1_23.add_learner(3, ()); + let m = m1_23.change(ChangeMembers::AddNodes(btreemap! {3=>()}), true)?; assert_eq!(vec![1], m.voter_ids().collect::>()); assert_eq!(btreeset! {2,3}, m.learner_ids().collect()); } @@ -129,21 +130,21 @@ fn test_membership_add_learner() -> anyhow::Result<()> { data: Default::default(), }; - let m_1_2 = Membership::::with_nodes( + let m_1_2 = Membership::::new_with_nodes( vec![btreeset! {1}, btreeset! {2}], btreemap! {1=>node("1"), 2=>node("2")}, ); // Add learner that presents in old cluster has no effect. - let res = m_1_2.add_learner(1, node("3")); + let res = m_1_2.clone().change(ChangeMembers::AddNodes(btreemap! {1=>node("3")}), true)?; assert_eq!(m_1_2, res); // Success to add a learner - let m_1_2_3 = m_1_2.add_learner(3, node("3")); + let m_1_2_3 = m_1_2.change(ChangeMembers::AddNodes(btreemap! {3=>node("3")}), true)?; assert_eq!( - Membership::::with_nodes( + Membership::::new_with_nodes( vec![btreeset! {1}, btreeset! {2}], btreemap! {1=>node("1"), 2=>node("2"), 3=>node("3")} ), @@ -180,7 +181,7 @@ fn test_membership_extend_nodes() -> anyhow::Result<()> { #[test] fn test_membership_with_nodes() -> anyhow::Result<()> { let node = TestNode::default; - let with_nodes = |nodes| Membership::::with_nodes(vec![btreeset! {1}, btreeset! {2}], nodes); + let with_nodes = |nodes| Membership::::new_with_nodes(vec![btreeset! {1}, btreeset! {2}], nodes); let res = with_nodes(btreemap! {1=>node(), 2=>node()}); assert_eq!( @@ -197,17 +198,8 @@ fn test_membership_with_nodes() -> anyhow::Result<()> { Ok(()) } -// TODO: rename -#[test] -#[should_panic] -fn test_membership_with_nodes_panic() { - // Panic if debug_assertions is enabled: - // voter ids set is {1,2}, while the nodes set is {1} - Membership::::with_nodes(vec![btreeset! {1}, btreeset! {2}], btreemap! {1=>TestNode::default()}); -} - #[test] -fn test_membership_next_safe() -> anyhow::Result<()> { +fn test_membership_next_coherent() -> anyhow::Result<()> { let nodes = || btreeset! {1,2,3,4,5,6,7,8,9}.into_nodes(); let c1 = || btreeset! {1,2,3}; let c2 = || btreeset! {3,4,5}; @@ -219,19 +211,19 @@ fn test_membership_next_safe() -> anyhow::Result<()> { let m1 = new_mem(vec![c1()], nodes()); let m12 = new_mem(vec![c1(), c2()], nodes()); - assert_eq!(m1, m1.next_safe(c1(), false)?); - assert_eq!(m12, m1.next_safe(c2(), false)?); + assert_eq!(m1, m1.next_coherent(c1(), false)); + assert_eq!(m12, m1.next_coherent(c2(), false)); assert_eq!( new_mem(vec![c1()], btreeset! {1,2,3,6,7,8,9}.into_nodes()), - m12.next_safe(c1(), false)? + m12.next_coherent(c1(), false) ); assert_eq!( new_mem(vec![c2()], btreeset! {3,4,5,6,7,8,9}.into_nodes()), - m12.next_safe(c2(), false)? + m12.next_coherent(c2(), false) ); assert_eq!( new_mem(vec![c2(), c3()], btreeset! {3,4,5,6,7,8,9}.into_nodes()), - m12.next_safe(c3(), false)? + m12.next_coherent(c3(), false) ); // Turn removed members to learners @@ -240,30 +232,13 @@ fn test_membership_next_safe() -> anyhow::Result<()> { let learners = || btreeset! {1, 2, 3, 4, 5}; let m23_with_learners_old = Membership::::new(vec![c2(), c3()], Some(old_learners())); let m23_with_learners_new = Membership::::new(vec![c3()], Some(learners())); - assert_eq!(m23_with_learners_new, m23_with_learners_old.next_safe(c3(), true)?); - - Ok(()) -} - -#[test] -fn test_membership_next_safe_learner_not_found() -> anyhow::Result<()> { - let c1 = || btreeset! {1,2,3}; - let c2 = || btreeset! {3,4,5}; - let c3 = || btreeset! {7}; - - #[allow(clippy::redundant_closure)] - let new_mem = |voter_ids, ns| Membership::::new(voter_ids, ns); - - let m12 = new_mem(vec![c1(), c2()], None); - - let res = m12.next_safe(c3(), false); - assert_eq!(Err(LearnerNotFound { node_id: 7 }), res); + assert_eq!(m23_with_learners_new, m23_with_learners_old.next_coherent(c3(), true)); Ok(()) } #[test] -fn test_membership_next_safe_with_nodes() -> anyhow::Result<()> { +fn test_membership_next_coherent_with_nodes() -> anyhow::Result<()> { let node = |s: &str| TestNode { addr: s.to_string(), data: Default::default(), @@ -272,11 +247,11 @@ fn test_membership_next_safe_with_nodes() -> anyhow::Result<()> { let c1 = || btreeset! {1}; let c2 = || btreeset! {2}; - let initial = Membership::::with_nodes(vec![c1(), c2()], btreemap! {1=>node("1"), 2=>node("2")}); + let initial = Membership::::new_with_nodes(vec![c1(), c2()], btreemap! {1=>node("1"), 2=>node("2")}); // joint [{2}, {1,2}] - let res = initial.next_safe(btreeset! {1,2}, false)?; + let res = initial.next_coherent(btreeset! {1,2}, false); assert_eq!( btreemap! {1=>node("1"), 2=>node("2")}, res.nodes().map(|(nid, n)| (*nid, n.clone())).collect::>() @@ -284,7 +259,7 @@ fn test_membership_next_safe_with_nodes() -> anyhow::Result<()> { // Removed to learner - let res = initial.next_safe(btreeset! {1}, true)?; + let res = initial.next_coherent(btreeset! {1}, true); assert_eq!( btreemap! {1=>node("1"), 2=>node("2")}, res.nodes().map(|(nid, n)| (*nid, n.clone())).collect::>() diff --git a/openraft/src/membership/mod.rs b/openraft/src/membership/mod.rs index 3606b6607..333b7d303 100644 --- a/openraft/src/membership/mod.rs +++ b/openraft/src/membership/mod.rs @@ -1,3 +1,4 @@ +mod change_handler; mod effective_membership; #[allow(clippy::module_inception)] mod membership; mod membership_state; @@ -11,6 +12,7 @@ mod bench; #[cfg(test)] mod membership_state_test; #[cfg(test)] mod membership_test; +pub(crate) use change_handler::ChangeHandler; pub use effective_membership::EffectiveMembership; pub use membership::IntoNodes; pub use membership::Membership; diff --git a/openraft/src/quorum/coherent.rs b/openraft/src/quorum/coherent.rs index 0f4733d01..319b69e15 100644 --- a/openraft/src/quorum/coherent.rs +++ b/openraft/src/quorum/coherent.rs @@ -1,7 +1,8 @@ use crate::quorum::QuorumSet; /// **Coherent** quorum set A and B is defined as: `∀ qᵢ ∈ A, ∀ qⱼ ∈ B: qᵢ ∩ qⱼ != ø`, i.e., `A ~ -/// B`. A distributed consensus protocol such as openraft is only allowed to switch membership +/// B`. +/// A distributed consensus protocol such as openraft is only allowed to switch membership /// between two **coherent** quorum sets. Being coherent is one of the two restrictions. The other /// restriction is to disable other smaller candidate to elect. pub(crate) trait Coherent @@ -10,8 +11,8 @@ where Self: QuorumSet, Other: QuorumSet, { - /// Returns if two QuorumSet are coherent. - fn is_coherent(&self, other: &Other) -> bool; + /// Returns `true` if this QuorumSet is coherent with the other quorum set. + fn is_coherent_with(&self, other: &Other) -> bool; } pub(crate) trait FindCoherent diff --git a/openraft/src/quorum/coherent_impl.rs b/openraft/src/quorum/coherent_impl.rs index 052a98db7..0084a9860 100644 --- a/openraft/src/quorum/coherent_impl.rs +++ b/openraft/src/quorum/coherent_impl.rs @@ -12,7 +12,7 @@ where /// /// Read more about: /// [safe-membership-change](https://datafuselabs.github.io/openraft/dynamic-membership.html#the-safe-to-relation) - fn is_coherent(&self, other: &Joint>) -> bool { + fn is_coherent_with(&self, other: &Joint>) -> bool { for a in self.children() { for b in other.children() { if a == b { @@ -30,7 +30,7 @@ where ID: PartialOrd + Ord + 'static, QS: QuorumSet + PartialEq, { - fn is_coherent(&self, other: &Joint) -> bool { + fn is_coherent_with(&self, other: &Joint) -> bool { for a in self.children().iter() { for b in other.children().iter() { if a == b { @@ -48,7 +48,7 @@ where ID: PartialOrd + Ord + 'static, QS: QuorumSet + PartialEq, { - fn is_coherent(&self, other: &QS) -> bool { + fn is_coherent_with(&self, other: &QS) -> bool { for a in self.children().iter() { if a == other { return true; @@ -66,7 +66,7 @@ where QS: QuorumSet + PartialEq + Clone, { fn find_coherent(&self, other: QS) -> Self { - if self.is_coherent(&other) { + if self.is_coherent_with(&other) { Joint::from(vec![other]) } else { Joint::from(vec![self.children().last().unwrap().clone(), other]) diff --git a/openraft/src/quorum/coherent_test.rs b/openraft/src/quorum/coherent_test.rs index ae3f20177..271dec884 100644 --- a/openraft/src/quorum/coherent_test.rs +++ b/openraft/src/quorum/coherent_test.rs @@ -16,25 +16,25 @@ fn test_is_coherent_vec() -> anyhow::Result<()> { let j123_345 = Joint::from(vec![s123(), s345()]); let j345_789 = Joint::from(vec![s345(), s789()]); - assert!(j123.is_coherent(&j123)); - assert!(!j123.is_coherent(&j345)); - assert!(j123.is_coherent(&j123_345)); - assert!(!j123.is_coherent(&j345_789)); - - assert!(!j345.is_coherent(&j123)); - assert!(j345.is_coherent(&j345)); - assert!(j345.is_coherent(&j123_345)); - assert!(j345.is_coherent(&j345_789)); - - assert!(j123_345.is_coherent(&j123)); - assert!(j123_345.is_coherent(&j345)); - assert!(j123_345.is_coherent(&j123_345)); - assert!(j123_345.is_coherent(&j345_789)); - - assert!(!j345_789.is_coherent(&j123)); - assert!(j345_789.is_coherent(&j345)); - assert!(j345_789.is_coherent(&j123_345)); - assert!(j345_789.is_coherent(&j345_789)); + assert!(j123.is_coherent_with(&j123)); + assert!(!j123.is_coherent_with(&j345)); + assert!(j123.is_coherent_with(&j123_345)); + assert!(!j123.is_coherent_with(&j345_789)); + + assert!(!j345.is_coherent_with(&j123)); + assert!(j345.is_coherent_with(&j345)); + assert!(j345.is_coherent_with(&j123_345)); + assert!(j345.is_coherent_with(&j345_789)); + + assert!(j123_345.is_coherent_with(&j123)); + assert!(j123_345.is_coherent_with(&j345)); + assert!(j123_345.is_coherent_with(&j123_345)); + assert!(j123_345.is_coherent_with(&j345_789)); + + assert!(!j345_789.is_coherent_with(&j123)); + assert!(j345_789.is_coherent_with(&j345)); + assert!(j345_789.is_coherent_with(&j123_345)); + assert!(j345_789.is_coherent_with(&j345_789)); Ok(()) } @@ -55,25 +55,25 @@ fn test_is_coherent_slice() -> anyhow::Result<()> { let j123_345 = v123_345.as_joint(); let j345_789 = v345_789.as_joint(); - assert!(j123.is_coherent(&j123)); - assert!(!j123.is_coherent(&j345)); - assert!(j123.is_coherent(&j123_345)); - assert!(!j123.is_coherent(&j345_789)); - - assert!(!j345.is_coherent(&j123)); - assert!(j345.is_coherent(&j345)); - assert!(j345.is_coherent(&j123_345)); - assert!(j345.is_coherent(&j345_789)); - - assert!(j123_345.is_coherent(&j123)); - assert!(j123_345.is_coherent(&j345)); - assert!(j123_345.is_coherent(&j123_345)); - assert!(j123_345.is_coherent(&j345_789)); - - assert!(!j345_789.is_coherent(&j123)); - assert!(j345_789.is_coherent(&j345)); - assert!(j345_789.is_coherent(&j123_345)); - assert!(j345_789.is_coherent(&j345_789)); + assert!(j123.is_coherent_with(&j123)); + assert!(!j123.is_coherent_with(&j345)); + assert!(j123.is_coherent_with(&j123_345)); + assert!(!j123.is_coherent_with(&j345_789)); + + assert!(!j345.is_coherent_with(&j123)); + assert!(j345.is_coherent_with(&j345)); + assert!(j345.is_coherent_with(&j123_345)); + assert!(j345.is_coherent_with(&j345_789)); + + assert!(j123_345.is_coherent_with(&j123)); + assert!(j123_345.is_coherent_with(&j345)); + assert!(j123_345.is_coherent_with(&j123_345)); + assert!(j123_345.is_coherent_with(&j345_789)); + + assert!(!j345_789.is_coherent_with(&j123)); + assert!(j345_789.is_coherent_with(&j345)); + assert!(j345_789.is_coherent_with(&j123_345)); + assert!(j345_789.is_coherent_with(&j345_789)); Ok(()) } diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 4437cb5a0..5d7371a5b 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -607,10 +607,10 @@ impl, S: RaftStorage> Raft>, + members: impl Into>, turn_to_learner: bool, ) -> Result, RaftError>> { - let changes: ChangeMembers = members.into(); + let changes: ChangeMembers = members.into(); tracing::info!( changes = debug(&changes), @@ -625,7 +625,7 @@ impl, S: RaftStorage> Raft, S: RaftStorage> Raft, S: RaftStor }, ChangeMembership { - changes: ChangeMembers, + changes: ChangeMembers, - /// If `turn_to_learner` is `true`, then all the members which do not exist in the new - /// membership will be turned into learners, otherwise they will be removed. - turn_to_learner: bool, + /// If `retain` is `true`, then the voters that are not in the new + /// config will be converted into learners, otherwise they will be removed. + retain: bool, tx: RaftRespTx, ClientWriteError>, }, @@ -991,7 +991,7 @@ where } RaftMsg::ChangeMembership { changes: members, - turn_to_learner, + retain: turn_to_learner, .. } => { format!( diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index 2baff7e42..3f21d82ce 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -3,7 +3,12 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::error::ChangeMembershipError; +use openraft::error::ClientWriteError; +use openraft::error::InProgress; +use openraft::CommittedLeaderId; use openraft::Config; +use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::StorageHelper; @@ -136,6 +141,56 @@ async fn add_learner_non_blocking() -> Result<()> { Ok(()) } +/// When the previous membership is not yet committed, add-learner should fail. +/// +/// Because adding learner is also a change-membership operation, a new membership config log will +/// let raft consider the previous membership config log as committed, which is actually not. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn add_learner_when_previous_membership_not_committed() -> Result<()> { + let config = Arc::new( + Config { + enable_tick: false, + ..Default::default() + } + .validate()?, + ); + let mut router = RaftRouter::new(config.clone()); + + let log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; + + tracing::info!("--- block replication to prevent committing any log"); + { + router.isolate_node(1); + + let node = router.get_raft_handle(&0)?; + tokio::spawn(async move { + let res = node.change_membership(btreeset![0, 1], false).await; + tracing::info!("do not expect res: {:?}", res); + unreachable!("do not expect any res"); + }); + + sleep(Duration::from_millis(500)).await; + } + + tracing::info!("--- add new node node-1, in non blocking mode"); + { + let node = router.get_raft_handle(&0)?; + let res = node.add_learner(2, (), true).await; + tracing::debug!("res: {:?}", res); + + let err = res.unwrap_err().into_api_error().unwrap(); + assert_eq!( + ClientWriteError::ChangeMembershipError(ChangeMembershipError::InProgress(InProgress { + committed: Some(log_id(1, 0, 2)), + membership_log_id: Some(log_id(1, 0, log_index + 1)) + })), + err + ); + } + + Ok(()) +} + /// add a learner, then shutdown the leader to make leader transferred, /// check after new leader come, the learner can receive new log. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] @@ -232,3 +287,10 @@ async fn check_learner_after_leader_transferred() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(3_000)) } + +pub fn log_id(term: u64, node_id: u64, index: u64) -> LogId { + LogId:: { + leader_id: CommittedLeaderId::new(term, node_id), + index, + } +} diff --git a/openraft/tests/membership/t16_change_membership_cases.rs b/openraft/tests/membership/t16_change_membership_cases.rs index 4f0493c9e..65fe8f9cb 100644 --- a/openraft/tests/membership/t16_change_membership_cases.rs +++ b/openraft/tests/membership/t16_change_membership_cases.rs @@ -15,47 +15,47 @@ use crate::fixtures::RaftRouter; #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m0_change_m12() -> anyhow::Result<()> { - change_from_to(btreeset! {0}, ChangeMembers::Replace(btreeset! {1,2})).await + change_from_to(btreeset! {0}, btreeset! {1,2}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m0_change_m123() -> anyhow::Result<()> { - change_from_to(btreeset! {0}, ChangeMembers::Replace(btreeset! {1,2,3})).await + change_from_to(btreeset! {0}, btreeset! {1,2,3}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m01_change_m12() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1}, ChangeMembers::Replace(btreeset! {1,2})).await + change_from_to(btreeset! {0, 1}, btreeset! {1,2}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m01_change_m1() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1}, ChangeMembers::Replace(btreeset! {1})).await + change_from_to(btreeset! {0, 1}, btreeset! {1}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m01_change_m2() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1}, ChangeMembers::Replace(btreeset! {2})).await + change_from_to(btreeset! {0, 1}, btreeset! {2}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m01_change_m3() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1}, ChangeMembers::Replace(btreeset! {3})).await + change_from_to(btreeset! {0, 1}, btreeset! {3}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m012_change_m4() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1, 2}, ChangeMembers::Replace(btreeset! {4})).await + change_from_to(btreeset! {0, 1, 2}, btreeset! {4}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m012_change_m456() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1, 2}, ChangeMembers::Replace(btreeset! {4,5,6})).await + change_from_to(btreeset! {0, 1, 2}, btreeset! {4,5,6}).await } #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn m01234_change_m0123() -> anyhow::Result<()> { - change_from_to(btreeset! {0, 1, 2, 3, 4}, ChangeMembers::Replace(btreeset! {0,1,2,3})).await + change_from_to(btreeset! {0, 1, 2, 3, 4}, btreeset! {0,1,2,3}).await } // --- add --- @@ -98,8 +98,8 @@ async fn m012_remove_m13() -> anyhow::Result<()> { } #[tracing::instrument(level = "debug")] -async fn change_from_to(old: BTreeSet, change_members: ChangeMembers) -> anyhow::Result<()> { - let new = change_members.apply_to(&old); +async fn change_from_to(old: BTreeSet, change_members: BTreeSet) -> anyhow::Result<()> { + let new = change_members; let mes = format!("from {:?} to {:?}", old, new); @@ -244,11 +244,11 @@ async fn change_from_to(old: BTreeSet, change_members: ChangeMembers< /// Test change-membership by adding voters. #[tracing::instrument(level = "debug")] async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::Result<()> { - let change = ChangeMembers::Add(add.iter().copied().collect()); + let change = ChangeMembers::AddVoter(add.iter().copied().collect()); let mes = format!("from {:?} {:?}", old, change); - let new = change.clone().apply_to(&old); + let new = old.clone().union(&add.iter().copied().collect()).copied().collect::>(); let only_in_new = new.difference(&old); let config = Arc::new( @@ -312,11 +312,11 @@ async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::R #[tracing::instrument(level = "debug")] async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> anyhow::Result<()> { - let change = ChangeMembers::Remove(remove.iter().copied().collect()); + let change = ChangeMembers::RemoveVoter(remove.iter().copied().collect()); let mes = format!("from {:?} {:?}", old, change); - let new = change.clone().apply_to(&old); + let new = old.clone().difference(&remove.iter().copied().collect()).copied().collect::>(); let only_in_old = old.difference(&new); let config = Arc::new( diff --git a/openraft/tests/membership/t20_change_membership.rs b/openraft/tests/membership/t20_change_membership.rs index d322c352c..37edb0fcd 100644 --- a/openraft/tests/membership/t20_change_membership.rs +++ b/openraft/tests/membership/t20_change_membership.rs @@ -112,6 +112,8 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { { let res = leader.change_membership(btreeset! {0,1}, false).await; let raft_err = res.unwrap_err(); + tracing::debug!("raft_err: {:?}", raft_err); + match raft_err.api_error().unwrap() { ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err)) => { assert_eq!(1, err.node_id);