Skip to content

Commit

Permalink
Merge pull request #254 from drmingdrmer/async-entry
Browse files Browse the repository at this point in the history
Refactor: use crate async-entry to define async test functions
  • Loading branch information
drmingdrmer authored Mar 31, 2022
2 parents 41f2463 + e53ffce commit 6409602
Show file tree
Hide file tree
Showing 46 changed files with 154 additions and 296 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Directory Ignores ##########################################################
guide/book
target
Expand All @@ -11,3 +12,4 @@ Cargo.lock
perf.data*
nohup*
*svg
.DS_Store
2 changes: 1 addition & 1 deletion example-raft-kv/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::runtime::Runtime;

/// Setup a cluster of 3 nodes.
/// Write to it and read from it.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_cluster() -> anyhow::Result<()> {
// --- The client itself does not store addresses for all nodes, but just node id.
// Thus we need a supporting component to provide mapping from node id to node address.
Expand Down
1 change: 1 addition & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing-futures = "0.2.4"

[dev-dependencies]
anyhow = "1.0.32"
async-entry = "0.2.0"
lazy_static = "1.4.0"
memstore = { version="0.2.0", path="../memstore" }
pretty_assertions = "1.0.0"
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::RaftTypeConfig;
use crate::State;

/// Test wait for different state changes
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait() -> anyhow::Result<()> {
{
// wait for leader
Expand Down
7 changes: 3 additions & 4 deletions openraft/tests/api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use openraft::SnapshotMeta;
use openraft::State;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;

#[macro_use]
mod fixtures;

Expand All @@ -21,11 +23,8 @@ mod fixtures;
///
/// - build a stable single node cluster.
/// - send install_snapshot request with matched/mismatched id and offset
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn snapshot_ge_half_threshold() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use openraft::RaftNetworkFactory;
use openraft::Vote;

use crate::fixtures::blank;
use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Cluster conflict_with_empty_entries test.
Expand All @@ -33,11 +34,8 @@ use crate::fixtures::RaftRouter;
///
/// - send `append_logs` message with conflicting prev_log_index and empty `entries`.
/// - asserts that a response with ConflictOpt set.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn conflict_with_empty_entries() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/append_entries/t20_append_conflicts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ use openraft::State;
use openraft::Vote;

use crate::fixtures::blank;
use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Test append-entries response in every case.
///
/// - bring up a learner and send to it append_entries request. Check the response in every case.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn append_conflicts() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/append_entries/t30_append_inconsistent_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use openraft::RaftStorage;
use openraft::State;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Too many inconsistent log should not block replication.
Expand All @@ -31,11 +32,8 @@ use crate::fixtures::RaftRouter;
///
/// - Start the cluster and node 2 start to replicate logs.
/// - test the log should be replicated to node 0.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn append_inconsistent_log() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@ use openraft::State;
use openraft::Vote;

use crate::fixtures::blank;
use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// append-entries should update membership correctly when adding new logs and deleting
/// inconsistent logs.
///
/// - bring up a learner and send to it append_entries request. Check the membership updated.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn append_updates_membership() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// append-entries should update hard state when adding new logs with bigger term
///
/// - bring up a learner and send to it append_entries request. Check the hard state updated.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn append_entries_with_bigger_term() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/append_entries/t60_large_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use anyhow::Result;
use maplit::btreeset;
use openraft::Config;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Large heartbeat should not block replication.
/// I.e., replication should not be driven by heartbeat.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn large_heartbeat() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(
Config {
Expand Down
84 changes: 39 additions & 45 deletions openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::time::Duration;
use anyhow::Result;
use maplit::btreeset;
use openraft::Config;
use tracing::Instrument;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Ensures the stale value of ReplicationCore.last_log_id won't affect replication.
Expand All @@ -14,57 +14,51 @@ use crate::fixtures::RaftRouter;
/// TODO(xp): `max_applied_log_to_keep` to be 0 makes it very easy to enter snapshot replication and it will keeps
/// replicating every log by snapshot and get timeout.
/// Thus it is disabled until we find another way to test it.
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
#[ignore]
async fn stale_last_log_id() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();

async {
// Setup test dependencies.
let config = Arc::new(
Config {
heartbeat_interval: 50,
election_timeout_min: 500,
election_timeout_max: 1000,
max_payload_entries: 1,
max_applied_log_to_keep: 0,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());
router.network_send_delay(5);
// Setup test dependencies.
let config = Arc::new(
Config {
heartbeat_interval: 50,
election_timeout_min: 500,
election_timeout_max: 1000,
max_payload_entries: 1,
max_applied_log_to_keep: 0,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());
router.network_send_delay(5);

let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3,4}).await?;
let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3,4}).await?;

let n_threads = 4;
let n_ops = 500;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let n_threads = 4;
let n_ops = 500;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

for i in 0..n_threads {
tokio::spawn({
let router = router.clone();
let tx = tx.clone();
for i in 0..n_threads {
tokio::spawn({
let router = router.clone();
let tx = tx.clone();

async move {
router.client_request_many(0, &format!("{}", i), n_ops).await;
let _ = tx.send(());
}
});
}
async move {
router.client_request_many(0, &format!("{}", i), n_ops).await;
let _ = tx.send(());
}
});
}

for _i in 0..n_threads {
let _ = rx.recv().await;
log_index += n_ops as u64;
}
for _i in 0..n_threads {
let _ = rx.recv().await;
log_index += n_ops as u64;
}

router.wait(&1, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&2, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&3, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&4, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&1, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&2, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&3, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;
router.wait(&4, Some(Duration::from_millis(1000))).await?.log(Some(log_index), "").await?;

Ok(())
}
.instrument(ut_span)
.await
Ok(())
}
6 changes: 2 additions & 4 deletions openraft/tests/client_api/t10_client_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use openraft::LogId;
use openraft::SnapshotPolicy;
use openraft::State;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Client write tests.
Expand All @@ -18,11 +19,8 @@ use crate::fixtures::RaftRouter;
/// - create a stable 3-node cluster.
/// - write a lot of data to it.
/// - assert that the cluster stayed stable and has all of the expected data.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn client_writes() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(
Config {
Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/client_api/t20_client_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use maplit::btreeset;
use openraft::Config;
use openraft::State;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Client read tests.
Expand All @@ -14,11 +15,8 @@ use crate::fixtures::RaftRouter;
/// - create a stable 3-node cluster.
/// - call the is_leader interface on the leader, and assert success.
/// - call the is_leader interface on the followers, and assert failure.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn client_reads() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/client_api/t50_lagging_network_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use maplit::btreeset;
use openraft::Config;
use openraft::State;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Lagging network test.
Expand All @@ -16,11 +17,8 @@ use crate::fixtures::RaftRouter;
/// - bring a single-node cluster online.
/// - add two Learner and then try to commit one log.
/// - change config to a 3 members cluster and commit another log.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn lagging_network_write() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(
Config {
heartbeat_interval: 100,
Expand Down
8 changes: 3 additions & 5 deletions openraft/tests/concurrent_write_and_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use maplit::btreeset;
use openraft::Config;
use openraft::LogIdOptionExt;
use openraft::State;
use tracing_futures::Instrument;

use crate::fixtures::init_default_ut_tracing;

#[macro_use]
mod fixtures;
Expand Down Expand Up @@ -36,11 +37,8 @@ mod fixtures;
/// - brings a 3 candidates cluster online.
/// - add another learner and at the same time write a log.
/// - asserts that all of the leader, followers and the learner receives all logs.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn concurrent_write_and_add_learner() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let timeout = Duration::from_millis(500);
let candidates = btreeset![0, 1, 2];

Expand Down
6 changes: 2 additions & 4 deletions openraft/tests/elect_compare_last_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use openraft::State;
use openraft::Vote;

use crate::fixtures::blank;
use crate::fixtures::init_default_ut_tracing;

#[macro_use]
mod fixtures;
Expand All @@ -24,11 +25,8 @@ mod fixtures;
///
/// - Fake a cluster with two node: with last log {2,1} and {1,2}.
/// - Bring up the cluster and only node 0 can become leader.
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn elect_compare_last_log() -> Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

// Setup test dependencies.
let config = Arc::new(Config::default().validate()?);
let mut router = RaftRouter::new(config.clone());
Expand Down
Loading

0 comments on commit 6409602

Please # to comment.