Skip to content

Commit

Permalink
Feature: add config: send_snapshot_timeout
Browse files Browse the repository at this point in the history
The timeouts for sending non-last and last snapshot segments may be
different: when a node receives the last segment, it installs the
snapshot thus it takes longer time.

Thus `send_snapshot_timeout` is for sending non-last segment.
The `install_snapshot_timeout` is for sending last segment, and is used
as default timeout for non-last segment if `send_snapshot_timeout` is 0.
  • Loading branch information
zach-schoenberger authored and drmingdrmer committed Nov 15, 2022
1 parent e86555c commit 2877be0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 7 deletions.
25 changes: 24 additions & 1 deletion openraft/src/config/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Raft runtime configuration.
use std::sync::atomic::AtomicBool;
use std::time::Duration;

use clap::Parser;
use rand::thread_rng;
Expand Down Expand Up @@ -99,10 +100,18 @@ pub struct Config {
#[clap(long, default_value = "50")]
pub heartbeat_interval: u64,

/// The timeout for sending a snapshot segment, in millisecond
/// The timeout for sending then installing the last snapshot segment,
/// in millisecond. It is also used as the timeout for sending a non-last segment, if `send_snapshot_timeout` is 0.
#[clap(long, default_value = "200")]
pub install_snapshot_timeout: u64,

/// The timeout for sending a **non-last** snapshot segment, in milliseconds.
///
/// It is disabled by default, by setting it to `0`.
/// The timeout for sending every segment is `install_snapshot_timeout`.
#[clap(long, default_value = "0")]
pub send_snapshot_timeout: u64,

/// The maximum number of entries per payload allowed to be transmitted during replication
///
/// If this is too low, it will take longer for the nodes to be brought up to
Expand Down Expand Up @@ -199,6 +208,20 @@ impl Config {
thread_rng().gen_range(self.election_timeout_min..self.election_timeout_max)
}

/// Get the timeout for sending and installing the last snapshot segment.
pub fn install_snapshot_timeout(&self) -> Duration {
Duration::from_millis(self.install_snapshot_timeout)
}

/// Get the timeout for sending a non-last snapshot segment.
pub fn send_snapshot_timeout(&self) -> Duration {
if self.send_snapshot_timeout > 0 {
Duration::from_millis(self.send_snapshot_timeout)
} else {
self.install_snapshot_timeout()
}
}

pub fn build(args: &[&str]) -> Result<Config, ConfigError> {
let config = <Self as Parser>::parse_from(args);
config.validate()
Expand Down
17 changes: 17 additions & 0 deletions openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::time::Duration;

use crate::config::error::ConfigError;
use crate::Config;
use crate::SnapshotPolicy;
Expand Down Expand Up @@ -52,6 +54,7 @@ fn test_build() -> anyhow::Result<()> {
"--election-timeout-min=10",
"--election-timeout-max=20",
"--heartbeat-interval=5",
"--send-snapshot-timeout=199",
"--install-snapshot-timeout=200",
"--max-payload-entries=201",
"--replication-lag-threshold=202",
Expand All @@ -65,6 +68,7 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(10, config.election_timeout_min);
assert_eq!(20, config.election_timeout_max);
assert_eq!(5, config.heartbeat_interval);
assert_eq!(199, config.send_snapshot_timeout);
assert_eq!(200, config.install_snapshot_timeout);
assert_eq!(201, config.max_payload_entries);
assert_eq!(202, config.replication_lag_threshold);
Expand All @@ -73,6 +77,19 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(205, config.max_in_snapshot_log_to_keep);
assert_eq!(207, config.purge_batch_size);

// Test config methods
{
let mut c = config;
assert_eq!(Duration::from_millis(199), c.send_snapshot_timeout());
assert_eq!(Duration::from_millis(200), c.install_snapshot_timeout());

c.send_snapshot_timeout = 0;
assert_eq!(
Duration::from_millis(200),
c.send_snapshot_timeout(),
"by default send_snapshot_timeout is install_snapshot_timeout"
);
}
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
} else {
unreachable!("It has to be Streaming")
};

if stream_changed {
self.begin_installing_snapshot(&req).await?;
}
Expand Down
13 changes: 7 additions & 6 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ pub(crate) struct ReplicationCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S
/// The last possible matching entry on a follower.
max_possible_matched_index: Option<u64>,

/// The timeout for sending snapshot segment.
install_snapshot_timeout: Duration,

/// if or not need to replicate log entries or states, e.g., `commit_index` etc.
need_to_replicate: bool,
}
Expand All @@ -147,7 +144,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
) -> ReplicationStream<C::NodeId> {
// other component to ReplicationStream
let (repl_tx, repl_rx) = mpsc::unbounded_channel();
let install_snapshot_timeout = Duration::from_millis(config.install_snapshot_timeout);

let this = Self {
target,
Expand All @@ -162,7 +158,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
max_possible_matched_index: progress_entry.max_possible_matching(),
raft_core_tx,
repl_rx,
install_snapshot_timeout,
need_to_replicate: true,
};

Expand Down Expand Up @@ -750,7 +745,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
"sending snapshot chunk"
);

let res = timeout(self.install_snapshot_timeout, self.network.send_install_snapshot(req)).await;
let snap_timeout = if done {
self.config.install_snapshot_timeout()
} else {
self.config.send_snapshot_timeout()
};

let res = timeout(snap_timeout, self.network.send_install_snapshot(req)).await;

let res = match res {
Ok(outer_res) => match outer_res {
Expand Down

0 comments on commit 2877be0

Please # to comment.