Skip to content

Commit

Permalink
Tests: parse port from substrate binary output to avoid races (#501)
Browse files Browse the repository at this point in the history
* parse port from substrate binary output to avoid races

* cargo fmt

* clippy

* remove "rt" feature from tokio

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
jsdw and niklasad1 authored Apr 6, 2022
1 parent f7e9491 commit 10627fb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 80 deletions.
1 change: 0 additions & 1 deletion subxt/tests/integration/utils/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub async fn test_node_process_with(

let proc = TestNodeProcess::<DefaultConfig>::build(path.as_str())
.with_authority(key)
.scan_for_open_ports()
.spawn::<DefaultConfig>()
.await;
proc.unwrap()
Expand Down
126 changes: 48 additions & 78 deletions subxt/tests/integration/utils/node_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use std::{
OsStr,
OsString,
},
net::TcpListener,
io::{
BufRead,
BufReader,
Read,
},
process,
thread,
time,
};
use subxt::{
Client,
Expand Down Expand Up @@ -79,7 +81,6 @@ where
pub struct TestNodeProcessBuilder {
node_path: OsString,
authority: Option<AccountKeyring>,
scan_port_range: bool,
}

impl TestNodeProcessBuilder {
Expand All @@ -90,7 +91,6 @@ impl TestNodeProcessBuilder {
Self {
node_path: node_path.as_ref().into(),
authority: None,
scan_port_range: false,
}
}

Expand All @@ -100,81 +100,46 @@ impl TestNodeProcessBuilder {
self
}

/// Enable port scanning to scan for open ports.
///
/// Allows spawning multiple node instances for tests to run in parallel.
pub fn scan_for_open_ports(&mut self) -> &mut Self {
self.scan_port_range = true;
self
}

/// Spawn the substrate node at the given path, and wait for rpc to be initialized.
pub async fn spawn<R>(&self) -> Result<TestNodeProcess<R>, String>
where
R: Config,
{
let mut cmd = process::Command::new(&self.node_path);
cmd.env("RUST_LOG", "error").arg("--dev").arg("--tmp");
cmd.env("RUST_LOG", "info")
.arg("--dev")
.arg("--tmp")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.arg("--port=0")
.arg("--rpc-port=0")
.arg("--ws-port=0");

if let Some(authority) = self.authority {
let authority = format!("{:?}", authority);
let arg = format!("--{}", authority.as_str().to_lowercase());
cmd.arg(arg);
}

let ws_port = if self.scan_port_range {
let (p2p_port, http_port, ws_port) = next_open_port()
.ok_or_else(|| "No available ports in the given port range".to_owned())?;

cmd.arg(format!("--port={}", p2p_port));
cmd.arg(format!("--rpc-port={}", http_port));
cmd.arg(format!("--ws-port={}", ws_port));
ws_port
} else {
// the default Websockets port
9944
};

let ws_url = format!("ws://127.0.0.1:{}", ws_port);

let mut proc = cmd.spawn().map_err(|e| {
format!(
"Error spawning substrate node '{}': {}",
self.node_path.to_string_lossy(),
e
)
})?;
// wait for rpc to be initialized
const MAX_ATTEMPTS: u32 = 6;
let mut attempts = 1;
let mut wait_secs = 1;
let client = loop {
thread::sleep(time::Duration::from_secs(wait_secs));
log::info!(
"Connecting to contracts enabled node, attempt {}/{}",
attempts,
MAX_ATTEMPTS
);
let result = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match result {
Ok(client) => break Ok(client),
Err(err) => {
if attempts < MAX_ATTEMPTS {
attempts += 1;
wait_secs *= 2; // backoff
continue
}
break Err(err)
}
}
};

// Wait for RPC port to be logged (it's logged to stderr):
let stderr = proc.stderr.take().unwrap();
let ws_port = find_substrate_port_from_output(stderr);
let ws_url = format!("ws://127.0.0.1:{}", ws_port);

// Connect to the node with a subxt client:
let client = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match client {
Ok(client) => Ok(TestNodeProcess { proc, client }),
Err(err) => {
let err = format!(
"Failed to connect to node rpc at {} after {} attempts: {}",
ws_url, attempts, err
);
let err = format!("Failed to connect to node rpc at {}: {}", ws_url, err);
log::error!("{}", err);
proc.kill().map_err(|e| {
format!("Error killing substrate process '{}': {}", proc.id(), e)
Expand All @@ -185,25 +150,30 @@ impl TestNodeProcessBuilder {
}
}

/// Returns the next set of 3 open ports.
///
/// Returns None if there are not 3 open ports available.
fn next_open_port() -> Option<(u16, u16, u16)> {
// Ask the kernel to allocate a port.
let next_port = || {
match TcpListener::bind(("127.0.0.1", 0)) {
Ok(listener) => {
if let Ok(address) = listener.local_addr() {
Some(address.port())
} else {
None
}
}
Err(_) => None,
}
};

// The ports allocated should be different, unless in
// the unlikely case that the system has less than 3 available ports.
Some((next_port()?, next_port()?, next_port()?))
// Consume a stderr reader from a spawned substrate command and
// locate the port number that is logged out to it.
fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 {
BufReader::new(r)
.lines()
.find_map(|line| {
let line = line
.expect("failed to obtain next line from stdout for port discovery");

// does the line contain our port (we expect this specific output from substrate).
let line_end = match line.rsplit_once("Listening for new connections on 127.0.0.1:") {
None => return None,
Some((_, after)) => after
};

// trim non-numeric chars from the end of the port part of the line.
let port_str = line_end.trim_end_matches(|b| !('0'..='9').contains(&b));

// expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16.
let port_num = port_str
.parse()
.unwrap_or_else(|_| panic!("valid port expected on 'Listening for new connections' line, got '{port_str}'"));

Some(port_num)
})
.expect("We should find a port before the reader ends")
}
2 changes: 1 addition & 1 deletion test-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ codec = { package = "parity-scale-codec", version = "3.0.0", default-features =
[build-dependencies]
subxt = { path = "../subxt" }
sp-core = "6.0.0"
tokio = { version = "1.8", features = ["rt", "macros"] }
tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] }
which = "4.2.2"

0 comments on commit 10627fb

Please # to comment.