Skip to content

Commit

Permalink
add some wait_for func:
Browse files Browse the repository at this point in the history
- wait_for_log()
- wait_for_log_timeout()
- wait_for_state()
- wait_for_state_timeout()
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent ea53906 commit 668ad47
Showing 1 changed file with 50 additions and 23 deletions.
73 changes: 50 additions & 23 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use anyhow::{anyhow, Result};
use tokio::sync::RwLock;
use tracing_subscriber::prelude::*;

use async_raft::{Config, NodeId, Raft, RaftMetrics, RaftNetwork, State};
use async_raft::async_trait::async_trait;
use async_raft::error::{ChangeConfigError, ClientReadError, ClientWriteError};
use async_raft::raft::ClientWriteRequest;
use async_raft::raft::MembershipConfig;
use async_raft::raft::{AppendEntriesRequest, AppendEntriesResponse};
use async_raft::raft::{InstallSnapshotRequest, InstallSnapshotResponse};
use async_raft::raft::{VoteRequest, VoteResponse};
use async_raft::raft::ClientWriteRequest;
use async_raft::raft::MembershipConfig;
use async_raft::storage::RaftStorage;
use async_raft::{Config, NodeId, Raft, RaftMetrics, RaftNetwork, State};
use memstore::{ClientRequest as MemClientRequest, ClientResponse as MemClientResponse, MemStore};

/// A concrete Raft type used during testing.
Expand Down Expand Up @@ -119,16 +119,18 @@ impl RaftRouter {
/// Get a handle to the storage backend for the target node.
pub async fn get_storage_handle(&self, node_id: &NodeId) -> Result<Arc<MemStore>> {
let rt = self.routing_table.read().await;
let addr = rt.get(node_id).ok_or_else(||anyhow::anyhow!("could not find node {} in routing table", node_id))?;
let addr = rt
.get(node_id)
.ok_or_else(|| anyhow::anyhow!("could not find node {} in routing table", node_id))?;
let sto = addr.clone().1;
Ok(sto)
}

/// Wait for metrics until it satisfies some condition.
#[tracing::instrument(level = "info", skip(self, func))]
pub async fn wait_for_metrics<T>(&self, node_id: &NodeId, func: T, timeout: tokio::time::Duration, msg: &str) -> Result<RaftMetrics>
where
T: Fn(&RaftMetrics) -> bool,
where
T: Fn(&RaftMetrics) -> bool,
{
let rt = self.routing_table.read().await;
let node = rt.get(node_id).ok_or_else(|| anyhow::anyhow!("node {} not found", node_id))?;
Expand Down Expand Up @@ -157,30 +159,55 @@ impl RaftRouter {
}
}

/// Same as wait_for_log() but provides additional timeout argument.
#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_log_timeout(&self, node_ids: &HashSet<u64>, want_log: u64, timeout: tokio::time::Duration, msg: &str) -> Result<()> {
for i in node_ids.iter() {
self.wait_for_metrics(
&i,
|x| x.last_log_index == want_log,
timeout,
&format!("{} n{}.last_log_index -> {}", msg, i, want_log),
)
.await?;
self.wait_for_metrics(
&i,
|x| x.last_applied == want_log,
timeout,
&format!("{} n{}.last_applied -> {}", msg, i, want_log),
)
.await?;
}
Ok(())
}

/// Wait for specified nodes until they applied upto `want_log`(inclusive) logs.
#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_nodes_log(&self, node_ids: &HashSet<u64>, want_log: u64, timeout: tokio::time::Duration, msg: &str) -> Result<()> {
pub async fn wait_for_log(&self, node_ids: &HashSet<u64>, want_log: u64, msg: &str) -> Result<()> {
let timeout = tokio::time::Duration::from_millis(500);
self.wait_for_log_timeout(node_ids, want_log, timeout, msg).await
}

/// Same as wait_for_state() but provides additional timeout argument.
#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_state_timeout(&self, node_ids: &HashSet<u64>, want_state: State, timeout: tokio::time::Duration, msg: &str) -> Result<()> {
for i in node_ids.iter() {
self
.wait_for_metrics(
&i,
|x| x.last_log_index == want_log,
timeout,
&format!("{} n{}.last_log_index -> {}", msg, i, want_log),
)
.await?;
self
.wait_for_metrics(
&i,
|x| x.last_applied == want_log,
timeout,
&format!("{} n{}.last_applied -> {}", msg, i, want_log),
)
.await?;
self.wait_for_metrics(
&i,
|x| x.state == want_state,
timeout,
&format!("{} n{}.state -> {:?}", msg, i, want_state),
)
.await?;
}
Ok(())
}

pub async fn wait_for_state(&self, node_ids: &HashSet<u64>, want_state: State, msg: &str) -> Result<()> {
let timeout = tokio::time::Duration::from_millis(500);
self.wait_for_state_timeout(node_ids, want_state, timeout, msg).await
}

/// Get the ID of the current leader.
pub async fn leader(&self) -> Option<NodeId> {
let isolated = self.isolated_nodes.read().await;
Expand Down

0 comments on commit 668ad47

Please # to comment.