diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index e24baff3c..d5ec9bee7 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -845,7 +845,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore self.trigger_log_compaction_if_needed(true).await, } } RaftMsg::Tick { i } => { diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 244b2f34a..2ab3e79cc 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -267,6 +267,13 @@ impl, S: RaftStorage> Raft Result<(), Fatal> { + self.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await + } + async fn send_external_command( &self, cmd: ExternalCommand, @@ -1036,6 +1043,8 @@ pub(crate) enum ExternalCommand { Elect, /// Emit a heartbeat message, only if the node is leader. Heartbeat, + /// Trigger to build a snapshot + Snapshot, } /// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). diff --git a/openraft/tests/snapshot/main.rs b/openraft/tests/snapshot/main.rs index 9fb673424..68f3dd5d7 100644 --- a/openraft/tests/snapshot/main.rs +++ b/openraft/tests/snapshot/main.rs @@ -5,6 +5,7 @@ mod fixtures; mod t20_api_install_snapshot; +mod t20_trigger_snapshot; mod t23_snapshot_chunk_size; mod t24_snapshot_ge_half_threshold; mod t25_snapshot_line_rate_to_snapshot; diff --git a/openraft/tests/snapshot/t20_trigger_snapshot.rs b/openraft/tests/snapshot/t20_trigger_snapshot.rs new file mode 100644 index 000000000..e65f13601 --- /dev/null +++ b/openraft/tests/snapshot/t20_trigger_snapshot.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +use openraft::Config; +use openraft::LeaderId; +use openraft::LogId; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Manually trigger a snapshot with `Raft::trigger_snapshot()` on Leader and Follower. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn snapshot_chunk_size() -> anyhow::Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?; + + tracing::info!("--- trigger snapshot for node-1"); + { + let n1 = router.get_raft_handle(&1)?; + n1.trigger_snapshot().await?; + + router + .wait(&1, timeout()) + .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "node-1 snapshot") + .await?; + } + + tracing::info!("--- send some logs"); + { + router.client_request_many(0, "0", 10).await?; + log_index += 10; + + router.wait(&0, timeout()).log(Some(log_index), "node-0 write logs").await?; + router.wait(&1, timeout()).log(Some(log_index), "node-1 write logs").await?; + } + + tracing::info!("--- trigger snapshot for node-0"); + { + let n0 = router.get_raft_handle(&0)?; + n0.trigger_snapshot().await?; + + router + .wait(&0, timeout()) + .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "node-0 snapshot") + .await?; + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}