Skip to content

Commit

Permalink
Change: Stop replication to removed node at once when new membership …
Browse files Browse the repository at this point in the history
…is seen

Before this commit, when membership changes, e.g., from a joint config
`[(1,2,3), (3,4,5)]` to uniform config `[3,4,5]`(assuming the leader is
`3`), the leader stops replication to `1,2` when `[3,4,5]` is
committed.

This is an unnecessarily complicated solution.
It is OK for the leader to stop replication to `1,2` as soon as config `[3,4,5]` is seen, instead of when config `[3,4,5]` is committed.

- If the leader(`3`) finally committed `[3,4,5]`, it will eventually stop replication to `1,2`.
- If the leader(`3`) crashes before committing `[3,4,5]`:
  - And a new leader sees the membership config log `[3,4,5]`, it will continue to commit it and finally stop replication to `1,2`.
  - Or a new leader does not see membership config log `[3,4,5]`, it will re-establish replication to `1,2`.

In any case, stopping replication at once is OK.

One of the considerations about this modification is:
The nodes, e.g., `1,2` do not know they have been removed from the cluster:

- Removed node will enter the candidate state and keeps increasing its term and electing itself.
  This won't affect the working cluster:

  - The nodes in the working cluster have greater logs; thus, the election will never succeed.

  - The leader won't try to communicate with the removed nodes thus it won't see their higher `term`.

- Removed nodes should be shut down finally. No matter whether the
  leader replicates the membership without these removed nodes to them,
  there should always be an external process that shuts them down.
  Because there is no guarantee that a removed node can receive the
  membership log in a finite time.

Changes:

- Change: remove config `remove_replication`, since replication will
  be removed at once.

- Refactor: Engine outputs `Command::UpdateReplicationStream` to inform
  the Runtime to update replication, when membership changes.

- Refactor: remove `ReplicationState.failures`, replication does not
  need count failures to remove it.

- Refactor: remove `ReplicationState.matched`: the **matched** log id
  has been tracked by `Engine.state.leader.progress`.

- Fix: #446
  • Loading branch information
drmingdrmer committed Jul 12, 2022
1 parent d67199b commit a010fdd
Show file tree
Hide file tree
Showing 19 changed files with 187 additions and 337 deletions.
53 changes: 0 additions & 53 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,50 +53,6 @@ fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
Ok(SnapshotPolicy::LogsSinceLast(n_logs))
}

/// Policy to remove a replication.
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum RemoveReplicationPolicy {
/// Leader will remove a replication to a node that is removed from membership,
/// if the `committed` index advanced too many the index of the **uniform** membership log in which the node is
/// removed.
CommittedAdvance(u64),

/// Leader removes a replication if it encountered the specified number of network failures.
MaxNetworkFailures(u64),
}

fn parse_remove_replication_policy(src: &str) -> Result<RemoveReplicationPolicy, ConfigError> {
let elts = src.split(':').collect::<Vec<_>>();
if elts.len() != 2 {
return Err(ConfigError::InvalidRemoveReplicationPolicy {
syntax: "committed_advance:<num>|max_network_failures:<num>".to_string(),
invalid: src.to_string(),
});
}

if elts[0] == "committed_advance" {
let n_logs = elts[1].parse::<u64>().map_err(|e| ConfigError::InvalidNumber {
invalid: src.to_string(),
reason: e.to_string(),
})?;
return Ok(RemoveReplicationPolicy::CommittedAdvance(n_logs));
}

if elts[0] == "max_network_failures" {
let n = elts[1].parse::<u64>().map_err(|e| ConfigError::InvalidNumber {
invalid: src.to_string(),
reason: e.to_string(),
})?;
return Ok(RemoveReplicationPolicy::MaxNetworkFailures(n));
}

Err(ConfigError::InvalidRemoveReplicationPolicy {
syntax: "committed_advance:<num>|max_network_failures:<num>".to_string(),
invalid: src.to_string(),
})
}

/// The runtime configuration for a Raft node.
///
/// The default values used by this type should generally work well for Raft clusters which will
Expand Down Expand Up @@ -178,15 +134,6 @@ pub struct Config {
/// The minimal number of applied logs to purge in a batch.
#[clap(long, default_value = "1")]
pub purge_batch_size: u64,

/// Policy to remove a replication stream for an unreachable removed node.
#[clap(
long,
env = "RAFT_FORCE_REMOVE_REPLICATION",
default_value = "max_network_failures:10",
parse(try_from_str=parse_remove_replication_policy)
)]
pub remove_replication: RemoveReplicationPolicy,
}

impl Default for Config {
Expand Down
24 changes: 0 additions & 24 deletions openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::config::config::RemoveReplicationPolicy;
use crate::config::error::ConfigError;
use crate::Config;
use crate::SnapshotPolicy;
Expand Down Expand Up @@ -59,7 +58,6 @@ fn test_build() -> anyhow::Result<()> {
"--snapshot-policy=since_last:203",
"--snapshot-max-chunk-size=204",
"--max-applied-log-to-keep=205",
"--remove-replication=max_network_failures:206",
"--purge-batch-size=207",
])?;

Expand All @@ -73,29 +71,7 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy);
assert_eq!(204, config.snapshot_max_chunk_size);
assert_eq!(205, config.max_applied_log_to_keep);
assert_eq!(
RemoveReplicationPolicy::MaxNetworkFailures(206),
config.remove_replication
);
assert_eq!(207, config.purge_batch_size);

Ok(())
}

#[test]
fn test_option_remove_replication() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--remove-replication=max_network_failures:206"])?;

assert_eq!(
RemoveReplicationPolicy::MaxNetworkFailures(206),
config.remove_replication
);

let config = Config::build(&["foo", "--remove-replication=committed_advance:206"])?;

assert_eq!(
RemoveReplicationPolicy::CommittedAdvance(206),
config.remove_replication
);
Ok(())
}
3 changes: 0 additions & 3 deletions openraft/src/config/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,4 @@ pub enum ConfigError {

#[error("{reason} when parsing {invalid:?}")]
InvalidNumber { invalid: String, reason: String },

#[error("remove replication policy string is invalid: '{invalid:?}' expect: '{syntax}'")]
InvalidRemoveReplicationPolicy { invalid: String, syntax: String },
}
1 change: 0 additions & 1 deletion openraft/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ mod error;
#[cfg(test)] mod config_test;

pub use config::Config;
pub use config::RemoveReplicationPolicy;
pub use config::SnapshotPolicy;
pub use error::ConfigError;
187 changes: 40 additions & 147 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::option::Option::None;

use tracing::Level;

use crate::config::RemoveReplicationPolicy;
use crate::core::replication_state::replication_lag;
use crate::core::replication_state::ReplicationState;
use crate::core::Expectation;
use crate::core::LeaderState;
use crate::core::ServerState;
Expand All @@ -20,10 +18,10 @@ use crate::error::InProgress;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::metrics::RemoveTarget;
use crate::progress::Progress;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::raft_types::RaftLogId;
use crate::runtime::RaftRuntime;
use crate::summary::MessageSummary;
Expand All @@ -38,23 +36,6 @@ use crate::RaftTypeConfig;
use crate::StorageError;

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
// add node into learner,return true if the node is already a member or learner
#[tracing::instrument(level = "debug", skip(self))]
async fn write_add_learner_entry(
&mut self,
target: C::NodeId,
node: Option<Node>,
) -> Result<LogId<C::NodeId>, AddLearnerError<C::NodeId>> {
let curr = &self.core.engine.state.membership_state.effective.membership;
let new_membership = curr.add_learner(target, node)?;

tracing::debug!(?new_membership, "new_config");

let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;

Ok(log_id)
}

/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
/// on the given channel.
///
Expand All @@ -79,51 +60,41 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

// Ensure the node doesn't already exist in the current
// config, in the set of new nodes already being synced, or in the nodes being removed.
// TODO: remove this
if target == self.core.id {
tracing::debug!("target node is this node");

let curr = &self.core.engine.state.membership_state.effective;
if curr.contains(&target) {
let matched = if let Some(l) = &self.core.engine.state.leader {
*l.progress.get(&target)
} else {
unreachable!("it has to be a leader!!!");
};

tracing::debug!(
"target {:?} already member or learner, can't add; matched:{:?}",
target,
matched
);

let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
matched: self.core.engine.state.last_log_id(),
matched,
}));
return Ok(());
}

let curr = &self.core.engine.state.membership_state.effective;
if curr.contains(&target) {
tracing::debug!("target {:?} already member or learner, can't add", target);

if let Some(t) = self.nodes.get(&target) {
tracing::debug!("target node is already a cluster member or is being synced");
let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
matched: t.matched,
}));
return Ok(());
} else {
unreachable!(
"node {} in membership but there is no replication stream for it",
target
)
}
}

// TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication
// stream.
let res = self.write_add_learner_entry(target, node).await;
let log_id = match res {
let curr = &self.core.engine.state.membership_state.effective.membership;
let res = curr.add_learner(target, node);
let new_membership = match res {
Ok(x) => x,
Err(e) => {
let _ = tx.send(Err(e));
let _ = tx.send(Err(AddLearnerError::MissingNodeInfo(e)));
return Ok(());
}
};

// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
// sending vote requests etc?
let state = self.spawn_replication_stream(target).await;
self.nodes.insert(target, state);
tracing::debug!(?new_membership, "new_membership with added learner: {}", target);

let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;

tracing::debug!(
"after add target node {} as learner {:?}",
Expand Down Expand Up @@ -238,7 +209,12 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
Expectation::AtLineRate => {
// Expect to be at line rate but not.

let matched = self.nodes.get(node_id).map(|x| x.matched).unwrap();
let matched = if let Some(l) = &self.core.engine.state.leader {
*l.progress.get(node_id)
} else {
unreachable!("it has to be a leader!!!");
};

let distance = replication_lag(&matched, &last_log_id);

if distance <= self.core.config.replication_lag_threshold {
Expand Down Expand Up @@ -317,64 +293,28 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
/// This is ony called by leader.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId<C::NodeId>) {
let index = log_id.index;

// Step down if needed.
if !self.core.engine.state.membership_state.effective.membership.is_voter(&self.core.id) {

let _ = log_id;

// TODO: Leader does not need to step down. It can keep working.
// This requires to separate Leader(Proposer) and Acceptors.
if !self.core.engine.state.membership_state.effective.is_voter(&self.core.id) {
tracing::debug!("raft node is stepping down");

// TODO(xp): transfer leadership
self.core.set_target_state(ServerState::Learner);
return;
self.core.engine.metrics_flags.set_cluster_changed();
}

let membership = &self.core.engine.state.membership_state.effective.membership;

// remove nodes which not included in nodes and learners
for (id, state) in self.nodes.iter_mut() {
if membership.contains(id) {
continue;
}

tracing::info!(
"set remove_after_commit for {} = {}, membership: {:?}",
id,
index,
self.core.engine.state.membership_state.effective
);

state.remove_since = Some(index)
}

let targets = self.nodes.keys().cloned().collect::<Vec<_>>();
for target in targets {
self.try_remove_replication(target).await;
}

self.core.engine.metrics_flags.set_replication_changed();
}

/// Remove a replication if the membership that does not include it has committed.
///
/// Return true if removed.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn try_remove_replication(&mut self, target: C::NodeId) -> bool {
tracing::debug!(target = display(target), "try_remove_replication");
pub async fn remove_replication(&mut self, target: C::NodeId) -> bool {
tracing::info!("removed_replication to: {}", target);

{
let n = self.nodes.get(&target);

if let Some(n) = n {
if !self.need_to_remove_replication(n) {
return false;
}
} else {
tracing::warn!("trying to remove absent replication to {}", target);
return false;
}
}

tracing::info!("removed replication to: {}", target);
let repl_state = self.nodes.remove(&target);
if let Some(s) = repl_state {
let handle = s.repl_stream.handle;
Expand All @@ -385,6 +325,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
tracing::info!("Done joining removed replication : {}", target);
} else {
unreachable!("try to nonexistent replication to {}", target);
}

self.replication_metrics.update(RemoveTarget { target });
Expand All @@ -394,53 +336,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

true
}

fn need_to_remove_replication(&self, node: &ReplicationState<C::NodeId>) -> bool {
tracing::debug!(node=?node, "check if to remove a replication");

let cfg = &self.core.config;
let policy = &cfg.remove_replication;

let st = &self.core.engine.state;
let committed = st.committed;

// `remove_since` is set only when the uniform membership log is committed.
// Do not remove replication if it is not committed.
let since = if let Some(since) = node.remove_since {
since
} else {
return false;
};

if node.matched.index() >= Some(since) {
tracing::debug!(
node = debug(node),
committed = debug(committed),
"remove replication: uniform membership log committed and replicated to target"
);
return true;
}

match policy {
RemoveReplicationPolicy::CommittedAdvance(n) => {
// TODO(xp): test this. but not for now. It is meaningless without blank-log heartbeat.
if committed.next_index() - since > *n {
tracing::debug!(
node = debug(node),
committed = debug(committed),
"remove replication: committed index is head of remove_since too much"
);
return true;
}
}
RemoveReplicationPolicy::MaxNetworkFailures(n) => {
if node.failures >= *n {
tracing::debug!(node = debug(node), "remove replication: too many replication failure");
return true;
}
}
}

false
}
}
Loading

0 comments on commit a010fdd

Please # to comment.