From a010fddda6294ee3155e15df15a6b67bd27b33a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 12 Jul 2022 16:24:37 +0800 Subject: [PATCH] Change: Stop replication to removed node at once when new membership is seen Before this commit, when membership changes, e.g., from a joint config `[(1,2,3), (3,4,5)]` to uniform config `[3,4,5]`(assuming the leader is `3`), the leader stops replication to `1,2` when `[3,4,5]` is committed. This is an unnecessarily complicated solution. It is OK for the leader to stop replication to `1,2` as soon as config `[3,4,5]` is seen, instead of when config `[3,4,5]` is committed. - If the leader(`3`) finally committed `[3,4,5]`, it will eventually stop replication to `1,2`. - If the leader(`3`) crashes before committing `[3,4,5]`: - And a new leader sees the membership config log `[3,4,5]`, it will continue to commit it and finally stop replication to `1,2`. - Or a new leader does not see membership config log `[3,4,5]`, it will re-establish replication to `1,2`. In any case, stopping replication at once is OK. One of the considerations about this modification is: The nodes, e.g., `1,2` do not know they have been removed from the cluster: - Removed node will enter the candidate state and keeps increasing its term and electing itself. This won't affect the working cluster: - The nodes in the working cluster have greater logs; thus, the election will never succeed. - The leader won't try to communicate with the removed nodes thus it won't see their higher `term`. - Removed nodes should be shut down finally. No matter whether the leader replicates the membership without these removed nodes to them, there should always be an external process that shuts them down. Because there is no guarantee that a removed node can receive the membership log in a finite time. Changes: - Change: remove config `remove_replication`, since replication will be removed at once. - Refactor: Engine outputs `Command::UpdateReplicationStream` to inform the Runtime to update replication, when membership changes. - Refactor: remove `ReplicationState.failures`, replication does not need count failures to remove it. - Refactor: remove `ReplicationState.matched`: the **matched** log id has been tracked by `Engine.state.leader.progress`. - Fix: #446 --- openraft/src/config/config.rs | 53 ----- openraft/src/config/config_test.rs | 24 --- openraft/src/config/error.rs | 3 - openraft/src/config/mod.rs | 1 - openraft/src/core/admin.rs | 187 ++++-------------- openraft/src/core/client.rs | 19 +- openraft/src/core/leader_state.rs | 12 +- openraft/src/core/raft_core.rs | 7 +- openraft/src/core/replication.rs | 38 +--- openraft/src/core/replication_state.rs | 25 +-- openraft/src/engine/command.rs | 16 +- openraft/src/engine/engine_impl.rs | 35 +++- .../src/engine/leader_append_entries_test.rs | 18 +- .../update_effective_membership_test.rs | 15 +- openraft/src/lib.rs | 1 - .../membership/t15_add_remove_follower.rs | 17 +- .../tests/membership/t40_removed_follower.rs | 14 +- .../t45_remove_unreachable_follower.rs | 15 +- openraft/tests/metrics/t30_leader_metrics.rs | 24 +-- 19 files changed, 187 insertions(+), 337 deletions(-) diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs index fbd401e8f..4d2882d12 100644 --- a/openraft/src/config/config.rs +++ b/openraft/src/config/config.rs @@ -53,50 +53,6 @@ fn parse_snapshot_policy(src: &str) -> Result { Ok(SnapshotPolicy::LogsSinceLast(n_logs)) } -/// Policy to remove a replication. -#[derive(Clone, Debug, PartialEq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] -pub enum RemoveReplicationPolicy { - /// Leader will remove a replication to a node that is removed from membership, - /// if the `committed` index advanced too many the index of the **uniform** membership log in which the node is - /// removed. - CommittedAdvance(u64), - - /// Leader removes a replication if it encountered the specified number of network failures. - MaxNetworkFailures(u64), -} - -fn parse_remove_replication_policy(src: &str) -> Result { - let elts = src.split(':').collect::>(); - if elts.len() != 2 { - return Err(ConfigError::InvalidRemoveReplicationPolicy { - syntax: "committed_advance:|max_network_failures:".to_string(), - invalid: src.to_string(), - }); - } - - if elts[0] == "committed_advance" { - let n_logs = elts[1].parse::().map_err(|e| ConfigError::InvalidNumber { - invalid: src.to_string(), - reason: e.to_string(), - })?; - return Ok(RemoveReplicationPolicy::CommittedAdvance(n_logs)); - } - - if elts[0] == "max_network_failures" { - let n = elts[1].parse::().map_err(|e| ConfigError::InvalidNumber { - invalid: src.to_string(), - reason: e.to_string(), - })?; - return Ok(RemoveReplicationPolicy::MaxNetworkFailures(n)); - } - - Err(ConfigError::InvalidRemoveReplicationPolicy { - syntax: "committed_advance:|max_network_failures:".to_string(), - invalid: src.to_string(), - }) -} - /// The runtime configuration for a Raft node. /// /// The default values used by this type should generally work well for Raft clusters which will @@ -178,15 +134,6 @@ pub struct Config { /// The minimal number of applied logs to purge in a batch. #[clap(long, default_value = "1")] pub purge_batch_size: u64, - - /// Policy to remove a replication stream for an unreachable removed node. - #[clap( - long, - env = "RAFT_FORCE_REMOVE_REPLICATION", - default_value = "max_network_failures:10", - parse(try_from_str=parse_remove_replication_policy) - )] - pub remove_replication: RemoveReplicationPolicy, } impl Default for Config { diff --git a/openraft/src/config/config_test.rs b/openraft/src/config/config_test.rs index 31c611346..288214fea 100644 --- a/openraft/src/config/config_test.rs +++ b/openraft/src/config/config_test.rs @@ -1,4 +1,3 @@ -use crate::config::config::RemoveReplicationPolicy; use crate::config::error::ConfigError; use crate::Config; use crate::SnapshotPolicy; @@ -59,7 +58,6 @@ fn test_build() -> anyhow::Result<()> { "--snapshot-policy=since_last:203", "--snapshot-max-chunk-size=204", "--max-applied-log-to-keep=205", - "--remove-replication=max_network_failures:206", "--purge-batch-size=207", ])?; @@ -73,29 +71,7 @@ fn test_build() -> anyhow::Result<()> { assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy); assert_eq!(204, config.snapshot_max_chunk_size); assert_eq!(205, config.max_applied_log_to_keep); - assert_eq!( - RemoveReplicationPolicy::MaxNetworkFailures(206), - config.remove_replication - ); assert_eq!(207, config.purge_batch_size); Ok(()) } - -#[test] -fn test_option_remove_replication() -> anyhow::Result<()> { - let config = Config::build(&["foo", "--remove-replication=max_network_failures:206"])?; - - assert_eq!( - RemoveReplicationPolicy::MaxNetworkFailures(206), - config.remove_replication - ); - - let config = Config::build(&["foo", "--remove-replication=committed_advance:206"])?; - - assert_eq!( - RemoveReplicationPolicy::CommittedAdvance(206), - config.remove_replication - ); - Ok(()) -} diff --git a/openraft/src/config/error.rs b/openraft/src/config/error.rs index f4becbcdd..7946e6789 100644 --- a/openraft/src/config/error.rs +++ b/openraft/src/config/error.rs @@ -18,7 +18,4 @@ pub enum ConfigError { #[error("{reason} when parsing {invalid:?}")] InvalidNumber { invalid: String, reason: String }, - - #[error("remove replication policy string is invalid: '{invalid:?}' expect: '{syntax}'")] - InvalidRemoveReplicationPolicy { invalid: String, syntax: String }, } diff --git a/openraft/src/config/mod.rs b/openraft/src/config/mod.rs index 357776c78..2bec5bfd1 100644 --- a/openraft/src/config/mod.rs +++ b/openraft/src/config/mod.rs @@ -4,6 +4,5 @@ mod error; #[cfg(test)] mod config_test; pub use config::Config; -pub use config::RemoveReplicationPolicy; pub use config::SnapshotPolicy; pub use error::ConfigError; diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index c56ed9a5b..fb42731c7 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -4,9 +4,7 @@ use std::option::Option::None; use tracing::Level; -use crate::config::RemoveReplicationPolicy; use crate::core::replication_state::replication_lag; -use crate::core::replication_state::ReplicationState; use crate::core::Expectation; use crate::core::LeaderState; use crate::core::ServerState; @@ -20,10 +18,10 @@ use crate::error::InProgress; use crate::error::LearnerIsLagging; use crate::error::LearnerNotFound; use crate::metrics::RemoveTarget; +use crate::progress::Progress; use crate::raft::AddLearnerResponse; use crate::raft::ClientWriteResponse; use crate::raft::RaftRespTx; -use crate::raft_types::LogIdOptionExt; use crate::raft_types::RaftLogId; use crate::runtime::RaftRuntime; use crate::summary::MessageSummary; @@ -38,23 +36,6 @@ use crate::RaftTypeConfig; use crate::StorageError; impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderState<'a, C, N, S> { - // add node into learner,return true if the node is already a member or learner - #[tracing::instrument(level = "debug", skip(self))] - async fn write_add_learner_entry( - &mut self, - target: C::NodeId, - node: Option, - ) -> Result, AddLearnerError> { - let curr = &self.core.engine.state.membership_state.effective.membership; - let new_membership = curr.add_learner(target, node)?; - - tracing::debug!(?new_membership, "new_config"); - - let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?; - - Ok(log_id) - } - /// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding /// on the given channel. /// @@ -79,51 +60,41 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS // Ensure the node doesn't already exist in the current // config, in the set of new nodes already being synced, or in the nodes being removed. - // TODO: remove this - if target == self.core.id { - tracing::debug!("target node is this node"); + + let curr = &self.core.engine.state.membership_state.effective; + if curr.contains(&target) { + let matched = if let Some(l) = &self.core.engine.state.leader { + *l.progress.get(&target) + } else { + unreachable!("it has to be a leader!!!"); + }; + + tracing::debug!( + "target {:?} already member or learner, can't add; matched:{:?}", + target, + matched + ); let _ = tx.send(Ok(AddLearnerResponse { membership_log_id: self.core.engine.state.membership_state.effective.log_id, - matched: self.core.engine.state.last_log_id(), + matched, })); return Ok(()); } - let curr = &self.core.engine.state.membership_state.effective; - if curr.contains(&target) { - tracing::debug!("target {:?} already member or learner, can't add", target); - - if let Some(t) = self.nodes.get(&target) { - tracing::debug!("target node is already a cluster member or is being synced"); - let _ = tx.send(Ok(AddLearnerResponse { - membership_log_id: self.core.engine.state.membership_state.effective.log_id, - matched: t.matched, - })); - return Ok(()); - } else { - unreachable!( - "node {} in membership but there is no replication stream for it", - target - ) - } - } - - // TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication - // stream. - let res = self.write_add_learner_entry(target, node).await; - let log_id = match res { + let curr = &self.core.engine.state.membership_state.effective.membership; + let res = curr.add_learner(target, node); + let new_membership = match res { Ok(x) => x, Err(e) => { - let _ = tx.send(Err(e)); + let _ = tx.send(Err(AddLearnerError::MissingNodeInfo(e))); return Ok(()); } }; - // TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for - // sending vote requests etc? - let state = self.spawn_replication_stream(target).await; - self.nodes.insert(target, state); + tracing::debug!(?new_membership, "new_membership with added learner: {}", target); + + let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?; tracing::debug!( "after add target node {} as learner {:?}", @@ -238,7 +209,12 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS Expectation::AtLineRate => { // Expect to be at line rate but not. - let matched = self.nodes.get(node_id).map(|x| x.matched).unwrap(); + let matched = if let Some(l) = &self.core.engine.state.leader { + *l.progress.get(node_id) + } else { + unreachable!("it has to be a leader!!!"); + }; + let distance = replication_lag(&matched, &last_log_id); if distance <= self.core.config.replication_lag_threshold { @@ -317,64 +293,28 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS /// This is ony called by leader. #[tracing::instrument(level = "debug", skip(self))] pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) { - let index = log_id.index; - // Step down if needed. - if !self.core.engine.state.membership_state.effective.membership.is_voter(&self.core.id) { + + let _ = log_id; + + // TODO: Leader does not need to step down. It can keep working. + // This requires to separate Leader(Proposer) and Acceptors. + if !self.core.engine.state.membership_state.effective.is_voter(&self.core.id) { tracing::debug!("raft node is stepping down"); // TODO(xp): transfer leadership self.core.set_target_state(ServerState::Learner); - return; + self.core.engine.metrics_flags.set_cluster_changed(); } - - let membership = &self.core.engine.state.membership_state.effective.membership; - - // remove nodes which not included in nodes and learners - for (id, state) in self.nodes.iter_mut() { - if membership.contains(id) { - continue; - } - - tracing::info!( - "set remove_after_commit for {} = {}, membership: {:?}", - id, - index, - self.core.engine.state.membership_state.effective - ); - - state.remove_since = Some(index) - } - - let targets = self.nodes.keys().cloned().collect::>(); - for target in targets { - self.try_remove_replication(target).await; - } - - self.core.engine.metrics_flags.set_replication_changed(); } /// Remove a replication if the membership that does not include it has committed. /// /// Return true if removed. #[tracing::instrument(level = "trace", skip(self))] - pub async fn try_remove_replication(&mut self, target: C::NodeId) -> bool { - tracing::debug!(target = display(target), "try_remove_replication"); + pub async fn remove_replication(&mut self, target: C::NodeId) -> bool { + tracing::info!("removed_replication to: {}", target); - { - let n = self.nodes.get(&target); - - if let Some(n) = n { - if !self.need_to_remove_replication(n) { - return false; - } - } else { - tracing::warn!("trying to remove absent replication to {}", target); - return false; - } - } - - tracing::info!("removed replication to: {}", target); let repl_state = self.nodes.remove(&target); if let Some(s) = repl_state { let handle = s.repl_stream.handle; @@ -385,6 +325,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS tracing::debug!("joining removed replication: {}", target); let _x = handle.await; tracing::info!("Done joining removed replication : {}", target); + } else { + unreachable!("try to nonexistent replication to {}", target); } self.replication_metrics.update(RemoveTarget { target }); @@ -394,53 +336,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS true } - - fn need_to_remove_replication(&self, node: &ReplicationState) -> bool { - tracing::debug!(node=?node, "check if to remove a replication"); - - let cfg = &self.core.config; - let policy = &cfg.remove_replication; - - let st = &self.core.engine.state; - let committed = st.committed; - - // `remove_since` is set only when the uniform membership log is committed. - // Do not remove replication if it is not committed. - let since = if let Some(since) = node.remove_since { - since - } else { - return false; - }; - - if node.matched.index() >= Some(since) { - tracing::debug!( - node = debug(node), - committed = debug(committed), - "remove replication: uniform membership log committed and replicated to target" - ); - return true; - } - - match policy { - RemoveReplicationPolicy::CommittedAdvance(n) => { - // TODO(xp): test this. but not for now. It is meaningless without blank-log heartbeat. - if committed.next_index() - since > *n { - tracing::debug!( - node = debug(node), - committed = debug(committed), - "remove replication: committed index is head of remove_since too much" - ); - return true; - } - } - RemoveReplicationPolicy::MaxNetworkFailures(n) => { - if node.failures >= *n { - tracing::debug!(node = debug(node), "remove replication: too many replication failure"); - return true; - } - } - } - - false - } } diff --git a/openraft/src/core/client.rs b/openraft/src/core/client.rs index 8d756480a..ed4d4e842 100644 --- a/openraft/src/core/client.rs +++ b/openraft/src/core/client.rs @@ -13,6 +13,7 @@ use crate::error::CheckIsLeaderError; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; +use crate::progress::Progress; use crate::quorum::QuorumSet; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; @@ -56,22 +57,30 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS // Spawn parallel requests, all with the standard timeout for heartbeats. let mut pending = FuturesUnordered::new(); - let membership = &self.core.engine.state.membership_state.effective.membership; - for (target, node) in self.nodes.iter() { - if !membership.is_voter(target) { + let voter_progresses = if let Some(l) = &self.core.engine.state.leader { + l.progress + .iter() + .filter(|(id, _v)| l.progress.is_voter(id) == Some(true)) + .copied() + .collect::>() + } else { + unreachable!("it has to be a leader!!!"); + }; + + for (target, matched) in voter_progresses { + if target == self.core.id { continue; } let rpc = AppendEntriesRequest { vote: self.core.engine.state.vote, - prev_log_id: node.matched, + prev_log_id: matched, entries: vec![], leader_commit: self.core.engine.state.committed, }; let my_id = self.core.id; - let target = *target; let target_node = self.core.engine.state.membership_state.effective.get_node(&target).cloned(); let mut network = self.core.network.connect(target, target_node.as_ref()).await; diff --git a/openraft/src/core/leader_state.rs b/openraft/src/core/leader_state.rs index 49bee22ad..64b3f8f25 100644 --- a/openraft/src/core/leader_state.rs +++ b/openraft/src/core/leader_state.rs @@ -75,6 +75,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS node_ids.filter(|elem| elem != &self.core.id).collect::>() }; + // TODO(xp): make this Engine::Command driven. for target in targets { let state = self.spawn_replication_stream(target).await; self.nodes.insert(target, state); @@ -198,9 +199,14 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> RaftRun self.replicate_entry(*input_entries[last].get_log_id()); } } - Command::UpdateMembership { .. } => { - // TODO: rebuild replication streams. not used yet. Currently replication stream management is done - // before this step. + Command::UpdateReplicationStreams { remove, add } => { + for (node_id, _matched) in remove.iter() { + self.remove_replication(*node_id).await; + } + for (node_id, _matched) in add.iter() { + let state = self.spawn_replication_stream(*node_id).await; + self.nodes.insert(*node_id, state); + } } _ => self.core.run_command(input_entries, curr, cmd).await?, } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 8221b2976..2e962e8fc 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -979,7 +979,12 @@ impl, S: RaftStorage> RaftRuntime Command::ReplicateInputEntries { .. } => { unreachable!("leader specific command") } - Command::UpdateMembership { .. } => {} + Command::UpdateReplicationStreams { .. } => { + unreachable!("leader specific command") + } + Command::UpdateMembership { .. } => { + // TODO: not used + } } Ok(()) diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs index a244566d7..f9dc8c57a 100644 --- a/openraft/src/core/replication.rs +++ b/openraft/src/core/replication.rs @@ -38,12 +38,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS self.replication_tx.clone(), ); - ReplicationState { - matched: None, - repl_stream, - remove_since: None, - failures: 0, - } + ReplicationState { repl_stream } } /// Handle a replication event coming from one of the replication streams. @@ -97,42 +92,21 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS ) -> Result<(), StorageError> { // Update target's match index & check if it is awaiting removal. - let state = if let Some(state) = self.nodes.get_mut(&target) { - state - } else { + // TODO(xp): a leader has to refuse a message from a previous leader. + if !self.nodes.contains_key(&target) { return Ok(()); }; - tracing::debug!("state.matched: {:?}, update to matched: {:?}", state.matched, result); + tracing::debug!("update matched: {:?}", result); let matched = match result { - Ok(matched) => { - // - state.failures = 0; - matched - } + Ok(matched) => matched, Err(_err_str) => { - state.failures += 1; - - self.try_remove_replication(target).await; return Ok(()); } }; - assert!(Some(matched) >= state.matched, "the matched increments monotonically"); - - state.matched = Some(matched); - - // Drop replication stream if needed. - if self.try_remove_replication(target).await { - // nothing to do - } else { - self.update_replication_metrics(target, matched); - } - - if Some(matched) <= self.core.engine.state.committed { - return Ok(()); - } + self.update_replication_metrics(target, matched); self.core.engine.update_progress(target, Some(matched)); self.run_engine_commands(&[]).await?; diff --git a/openraft/src/core/replication_state.rs b/openraft/src/core/replication_state.rs index 36b3b4811..5998fcaa4 100644 --- a/openraft/src/core/replication_state.rs +++ b/openraft/src/core/replication_state.rs @@ -4,39 +4,16 @@ use std::fmt::Formatter; use crate::raft_types::LogIdOptionExt; use crate::replication::ReplicationStream; use crate::LogId; -use crate::MessageSummary; use crate::NodeId; /// A struct tracking the state of a replication stream from the perspective of the Raft actor. pub(crate) struct ReplicationState { - pub matched: Option>, - - pub remove_since: Option, - pub repl_stream: ReplicationStream, - - /// Count of replication failures. - /// - /// It will be reset once a successful replication is done. - pub failures: u64, -} - -impl MessageSummary> for ReplicationState { - fn summary(&self) -> String { - format!( - "matched: {:?}, remove_after_commit: {:?}, failures: {}", - self.matched, self.remove_since, self.failures - ) - } } impl Debug for ReplicationState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReplicationState") - .field("matched", &self.matched) - .field("remove_since", &self.remove_since) - .field("failures", &self.failures) - .finish() + f.debug_struct("ReplicationState").finish() } } diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index abd3afb31..b889f3c1e 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -39,8 +39,19 @@ pub(crate) enum Command { /// Replicate a `range` of entries in the input buffer. ReplicateInputEntries { range: Range }, - /// Membership config changed, need to update replication stream etc. - UpdateMembership { membership: Arc> }, + /// Membership config changed, need to update replication streams. + UpdateMembership { + // TODO: not used yet. + membership: Arc>, + }, + + /// Membership config changed, need to update replication streams. + UpdateReplicationStreams { + /// Replication to remove. + remove: Vec<(NID, Option>)>, + /// Replication to add. + add: Vec<(NID, Option>)>, + }, /// Move the cursor pointing to an entry in the input buffer. MoveInputCursorBy { n: usize }, @@ -87,6 +98,7 @@ impl Command { Command::FollowerCommit { .. } => flags.set_data_changed(), Command::ReplicateInputEntries { .. } => {} Command::UpdateMembership { .. } => flags.set_cluster_changed(), + Command::UpdateReplicationStreams { .. } => flags.set_replication_changed(), Command::MoveInputCursorBy { .. } => {} Command::SaveVote { .. } => flags.set_data_changed(), Command::SendVote { .. } => {} diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d24e32866..ce658702b 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::Arc; use crate::core::ServerState; @@ -625,24 +626,50 @@ impl Engine { pub(crate) fn update_effective_membership(&mut self, log_id: &LogId, m: &Membership) { tracing::debug!("update effective membership: log_id:{} {}", log_id, m.summary()); + self.metrics_flags.set_cluster_changed(); + let server_state = self.calc_server_state(); let em = Arc::new(EffectiveMembership::new(Some(*log_id), m.clone())); self.state.membership_state.effective = em.clone(); + self.push_command(Command::UpdateMembership { + membership: self.state.membership_state.effective.clone(), + }); + // If membership changes, the progress should be upgraded. if let Some(leader) = &mut self.state.leader { let old_progress = leader.progress.clone(); + let old_repls = old_progress.iter().copied().collect::>(); + let learner_ids = em.learner_ids().collect::>(); leader.progress = old_progress.upgrade_quorum_set(em, &learner_ids); - } - self.push_command(Command::UpdateMembership { - membership: self.state.membership_state.effective.clone(), - }); + // If it is leader, update replication to reflect membership change. + + let new_repls = leader.progress.iter().copied().collect::>(); + + // TODO: test + let mut add = vec![]; + let mut remove = vec![]; + for (node_id, matched) in new_repls.iter() { + if !old_repls.contains_key(node_id) { + add.push((*node_id, *matched)); + } + } + + for (node_id, matched) in old_repls.iter() { + // A leader that is removed will be shut down when this membership log is committed. + if !new_repls.contains_key(node_id) && node_id != &self.id { + remove.push((*node_id, *matched)); + } + } + + self.push_command(Command::UpdateReplicationStreams { remove, add }); + } // Leader should not quit at once. // A leader should always keep replicating logs. diff --git a/openraft/src/engine/leader_append_entries_test.rs b/openraft/src/engine/leader_append_entries_test.rs index 50f16c56a..4cba188de 100644 --- a/openraft/src/engine/leader_append_entries_test.rs +++ b/openraft/src/engine/leader_append_entries_test.rs @@ -261,7 +261,7 @@ fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Res assert_eq!( MetricsChangeFlags { - leader: false, + leader: true, other_metrics: true }, eng.metrics_flags @@ -283,6 +283,10 @@ fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Res m34() )), }, + Command::UpdateReplicationStreams { + remove: vec![], + add: vec![(3, None), (4, None)] + }, Command::ReplicateInputEntries { range: 0..3 }, Command::MoveInputCursorBy { n: 3 }, ], @@ -336,7 +340,7 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow assert_eq!( MetricsChangeFlags { - leader: false, + leader: true, other_metrics: true }, eng.metrics_flags @@ -359,6 +363,10 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow m1_2() )), }, + Command::UpdateReplicationStreams { + remove: vec![], + add: vec![(2, None)] + }, // second commit upto the end. Command::ReplicateCommitted { committed: Some(log_id(3, 6)) @@ -422,7 +430,7 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a assert_eq!( MetricsChangeFlags { - leader: false, + leader: true, other_metrics: true }, eng.metrics_flags @@ -437,6 +445,10 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a m1_2() )), }, + Command::UpdateReplicationStreams { + remove: vec![(3, None)], + add: vec![(2, None)] + }, // It is correct to commit if the membership change ot a one node cluster. Command::ReplicateCommitted { committed: Some(log_id(3, 6)) diff --git a/openraft/src/engine/update_effective_membership_test.rs b/openraft/src/engine/update_effective_membership_test.rs index 7fde351d0..6af5d73f4 100644 --- a/openraft/src/engine/update_effective_membership_test.rs +++ b/openraft/src/engine/update_effective_membership_test.rs @@ -119,16 +119,23 @@ fn test_update_effective_membership_for_leader() -> anyhow::Result<()> { assert_eq!( MetricsChangeFlags { - leader: false, + leader: true, other_metrics: true }, eng.metrics_flags ); assert_eq!( - vec![Command::UpdateMembership { - membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 4)), m34())), - },], + vec![ + // + Command::UpdateMembership { + membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 4)), m34())), + }, + Command::UpdateReplicationStreams { + add: vec![(4, None)], + remove: vec![], // node-2 is leader, won't be removed + } + ], eng.commands ); diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index fcb293940..29adff3eb 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -55,7 +55,6 @@ pub use metrics::ReplicationTargetMetrics; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; pub use crate::config::ConfigError; -pub use crate::config::RemoveReplicationPolicy; pub use crate::config::SnapshotPolicy; pub use crate::core::ServerState; pub use crate::defensive::DefensiveCheck; diff --git a/openraft/tests/membership/t15_add_remove_follower.rs b/openraft/tests/membership/t15_add_remove_follower.rs index 7bb322c66..9da276329 100644 --- a/openraft/tests/membership/t15_add_remove_follower.rs +++ b/openraft/tests/membership/t15_add_remove_follower.rs @@ -18,30 +18,29 @@ use crate::fixtures::RaftRouter; /// - asserts node-4 becomes learner and the leader stops sending logs to it. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn add_remove_voter() -> Result<()> { - let cluster_of_5 = btreeset![0, 1, 2, 3, 4]; - let cluster_of_4 = btreeset![0, 1, 2, 3]; + let c01234 = btreeset![0, 1, 2, 3, 4]; + let c0123 = btreeset![0, 1, 2, 3]; let config = Arc::new(Config::default().validate()?); let mut router = RaftRouter::new(config.clone()); - let mut log_index = router.new_nodes_from_single(cluster_of_5.clone(), btreeset! {}).await?; + let mut log_index = router.new_nodes_from_single(c01234.clone(), btreeset! {}).await?; tracing::info!("--- write 100 logs"); { router.client_request_many(0, "client", 100).await?; log_index += 100; - router.wait_for_log(&cluster_of_5, Some(log_index), timeout(), "write 100 logs").await?; + router.wait_for_log(&c01234, Some(log_index), timeout(), "write 100 logs").await?; } tracing::info!("--- remove n{}", 4); { let node = router.get_raft_handle(&0)?; - node.change_membership(cluster_of_4.clone(), true, false).await?; + node.change_membership(c0123.clone(), true, false).await?; log_index += 2; // two member-change logs - router.wait_for_log(&cluster_of_4, Some(log_index), timeout(), "removed node-4").await?; - router.wait(&4, timeout()).state(ServerState::Learner, "").await?; + router.wait_for_log(&c0123, Some(log_index), timeout(), "removed node-4 from membership").await?; } tracing::info!("--- write another 100 logs"); @@ -50,7 +49,7 @@ async fn add_remove_voter() -> Result<()> { log_index += 100; } - router.wait_for_log(&cluster_of_4, Some(log_index), timeout(), "4 nodes recv logs 100~200").await?; + router.wait_for_log(&c0123, Some(log_index), timeout(), "4 nodes recv logs 100~200").await?; tracing::info!("--- log will not be sync to removed node"); { @@ -58,6 +57,8 @@ async fn add_remove_voter() -> Result<()> { assert!(x[4].last_log_index < Some(log_index - 50)); } + router.wait(&4, timeout()).state(ServerState::Candidate, "node-4 tries to elect").await?; + Ok(()) } diff --git a/openraft/tests/membership/t40_removed_follower.rs b/openraft/tests/membership/t40_removed_follower.rs index 9f40a3282..50298eaed 100644 --- a/openraft/tests/membership/t40_removed_follower.rs +++ b/openraft/tests/membership/t40_removed_follower.rs @@ -35,12 +35,22 @@ async fn stop_replication_to_removed_follower() -> Result<()> { node.change_membership(btreeset![0, 3, 4], true, false).await?; log_index += 2; - for i in 0..5 { + for i in [0, 3, 4] { router .wait(&i, timeout()) .metrics( |x| x.last_log_index >= Some(log_index), - "all nodes recv change-membership logs", + "new cluster nodes recv 2 change-membership logs", + ) + .await?; + } + + for i in [1, 2] { + router + .wait(&i, timeout()) + .metrics( + |x| x.last_log_index >= Some(log_index - 1), + "removed nodes recv at least 1 change-membership log", ) .await?; } diff --git a/openraft/tests/membership/t45_remove_unreachable_follower.rs b/openraft/tests/membership/t45_remove_unreachable_follower.rs index bf7c1fd20..686606a5d 100644 --- a/openraft/tests/membership/t45_remove_unreachable_follower.rs +++ b/openraft/tests/membership/t45_remove_unreachable_follower.rs @@ -13,7 +13,8 @@ use crate::fixtures::RaftRouter; #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn stop_replication_to_removed_unreachable_follower_network_failure() -> Result<()> { // If the uniform membership is committed and replication to a node encountered 2 network failure, just remove it. - let config = Arc::new(Config::build(&["foo", "--remove-replication=max_network_failures:2"])?); + // let config = Arc::new(Config::build(&["foo", "--remove-replication=max_network_failures:2"])?); + let config = Arc::new(Config::build(&["foo"])?); let mut router = RaftRouter::new(config.clone()); router.new_raft_node(0); @@ -36,15 +37,23 @@ async fn stop_replication_to_removed_unreachable_follower_network_failure() -> R node.change_membership(btreeset![0, 1, 2], true, false).await?; log_index += 2; - for i in &[0, 1, 2, 3] { + for i in &[0, 1, 2] { router .wait(i, timeout()) .metrics( |x| x.last_log_index >= Some(log_index), - "0,1,2,3 recv change-membership logs", + "0,1,2 recv 2 change-membership logs", ) .await?; } + + router + .wait(&3, timeout()) + .metrics( + |x| x.last_log_index >= Some(log_index - 1), + "node-3 recv at least 1 change-membership log", + ) + .await?; } tracing::info!("--- replication to node 4 will be removed"); diff --git a/openraft/tests/metrics/t30_leader_metrics.rs b/openraft/tests/metrics/t30_leader_metrics.rs index 8652784f4..11d04dc47 100644 --- a/openraft/tests/metrics/t30_leader_metrics.rs +++ b/openraft/tests/metrics/t30_leader_metrics.rs @@ -32,8 +32,8 @@ use crate::fixtures::RaftRouter; /// - asserts node-4 becomes learner and the leader stops sending logs to it. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn leader_metrics() -> Result<()> { - let all_members = btreeset![0, 1, 2, 3, 4]; - let left_members = btreeset![0, 1, 2, 3]; + let c01234 = btreeset![0, 1, 2, 3, 4]; + let c0123 = btreeset![0, 1, 2, 3]; // Setup test dependencies. let config = Arc::new(Config::default().validate()?); @@ -90,15 +90,15 @@ async fn leader_metrics() -> Result<()> { } } log_index += 4; // 4 add_learner log - router.wait_for_log(&all_members, Some(log_index), timeout(), "add learner 1,2,3,4").await?; + router.wait_for_log(&c01234, Some(log_index), timeout(), "add learner 1,2,3,4").await?; tracing::info!("--- changing cluster config to 012"); let node = router.get_raft_handle(&0)?; - node.change_membership(all_members.clone(), true, false).await?; + node.change_membership(c01234.clone(), true, false).await?; log_index += 2; // 2 member-change logs - router.wait_for_log(&all_members, Some(log_index), timeout(), "change members to 0,1,2,3,4").await?; + router.wait_for_log(&c01234, Some(log_index), timeout(), "change members to 0,1,2,3,4").await?; router.assert_stable_cluster(Some(1), Some(log_index)); // Still in term 1, so leader is still node 0. @@ -126,22 +126,12 @@ async fn leader_metrics() -> Result<()> { tracing::info!("--- remove n{}", 4); { let node = router.get_raft_handle(&0)?; - node.change_membership(left_members.clone(), true, false).await?; + node.change_membership(c0123.clone(), true, false).await?; log_index += 2; // two member-change logs - tracing::info!("--- n{} should revert to learner", 4); - router - .wait_for_metrics( - &4, - |x| x.state == ServerState::Learner, - timeout(), - &format!("n{}.state -> {:?}", 4, ServerState::Learner), - ) - .await?; - router .wait_for_log( - &left_members, + &c0123, Some(log_index), timeout(), "other nodes should commit the membership change log",