Skip to content

Commit

Permalink
Feature: add Raft::trigger_snapshot() to manually trigger to build sn…
Browse files Browse the repository at this point in the history
…apshot at once
  • Loading branch information
drmingdrmer committed Aug 7, 2022
1 parent 2285992 commit f437cda
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
3 changes: 2 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

/// Trigger a log compaction (snapshot) job if needed.
/// If force is True, it will skip the threshold check and start creating snapshot as demanded.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn trigger_log_compaction_if_needed(&mut self, force: bool) {
if self.snapshot_state.is_some() {
return;
Expand Down Expand Up @@ -1348,6 +1348,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let log_id = self.write_entry(EntryPayload::Blank, None).await?;
tracing::debug!(log_id = display(&log_id), "ExternalCommand: sent heartbeat log");
}
ExternalCommand::Snapshot => self.trigger_log_compaction_if_needed(true).await,
}
}
RaftMsg::Tick { i } => {
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
self.send_external_command(ExternalCommand::Heartbeat, "trigger_heartbeat").await
}

/// Trigger to build a snapshot at once and return at once.
///
/// Returns error when RaftCore has Fatal error, e.g. shut down or having storage error.
pub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>> {
self.send_external_command(ExternalCommand::Snapshot, "trigger_snapshot").await
}

async fn send_external_command(
&self,
cmd: ExternalCommand,
Expand Down Expand Up @@ -1036,6 +1043,8 @@ pub(crate) enum ExternalCommand {
Elect,
/// Emit a heartbeat message, only if the node is leader.
Heartbeat,
/// Trigger to build a snapshot
Snapshot,
}

/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2).
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/snapshot/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
mod fixtures;

mod t20_api_install_snapshot;
mod t20_trigger_snapshot;
mod t23_snapshot_chunk_size;
mod t24_snapshot_ge_half_threshold;
mod t25_snapshot_line_rate_to_snapshot;
Expand Down
64 changes: 64 additions & 0 deletions openraft/tests/snapshot/t20_trigger_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::Config;
use openraft::LeaderId;
use openraft::LogId;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Manually trigger a snapshot with `Raft::trigger_snapshot()` on Leader and Follower.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn snapshot_chunk_size() -> anyhow::Result<()> {
let config = Arc::new(
Config {
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!("--- trigger snapshot for node-1");
{
let n1 = router.get_raft_handle(&1)?;
n1.trigger_snapshot().await?;

router
.wait(&1, timeout())
.snapshot(LogId::new(LeaderId::new(1, 0), log_index), "node-1 snapshot")
.await?;
}

tracing::info!("--- send some logs");
{
router.client_request_many(0, "0", 10).await?;
log_index += 10;

router.wait(&0, timeout()).log(Some(log_index), "node-0 write logs").await?;
router.wait(&1, timeout()).log(Some(log_index), "node-1 write logs").await?;
}

tracing::info!("--- trigger snapshot for node-0");
{
let n0 = router.get_raft_handle(&0)?;
n0.trigger_snapshot().await?;

router
.wait(&0, timeout())
.snapshot(LogId::new(LeaderId::new(1, 0), log_index), "node-0 snapshot")
.await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit f437cda

Please # to comment.