diff --git a/subxt/tests/integration/utils/context.rs b/subxt/tests/integration/utils/context.rs index ec261d9523..b1e85d102b 100644 --- a/subxt/tests/integration/utils/context.rs +++ b/subxt/tests/integration/utils/context.rs @@ -46,7 +46,6 @@ pub async fn test_node_process_with( let proc = TestNodeProcess::::build(path.as_str()) .with_authority(key) - .scan_for_open_ports() .spawn::() .await; proc.unwrap() diff --git a/subxt/tests/integration/utils/node_proc.rs b/subxt/tests/integration/utils/node_proc.rs index 487aef9091..b97b148937 100644 --- a/subxt/tests/integration/utils/node_proc.rs +++ b/subxt/tests/integration/utils/node_proc.rs @@ -20,10 +20,12 @@ use std::{ OsStr, OsString, }, - net::TcpListener, + io::{ + BufRead, + BufReader, + Read, + }, process, - thread, - time, }; use subxt::{ Client, @@ -79,7 +81,6 @@ where pub struct TestNodeProcessBuilder { node_path: OsString, authority: Option, - scan_port_range: bool, } impl TestNodeProcessBuilder { @@ -90,7 +91,6 @@ impl TestNodeProcessBuilder { Self { node_path: node_path.as_ref().into(), authority: None, - scan_port_range: false, } } @@ -100,21 +100,20 @@ impl TestNodeProcessBuilder { self } - /// Enable port scanning to scan for open ports. - /// - /// Allows spawning multiple node instances for tests to run in parallel. - pub fn scan_for_open_ports(&mut self) -> &mut Self { - self.scan_port_range = true; - self - } - /// Spawn the substrate node at the given path, and wait for rpc to be initialized. pub async fn spawn(&self) -> Result, String> where R: Config, { let mut cmd = process::Command::new(&self.node_path); - cmd.env("RUST_LOG", "error").arg("--dev").arg("--tmp"); + cmd.env("RUST_LOG", "info") + .arg("--dev") + .arg("--tmp") + .stdout(process::Stdio::piped()) + .stderr(process::Stdio::piped()) + .arg("--port=0") + .arg("--rpc-port=0") + .arg("--ws-port=0"); if let Some(authority) = self.authority { let authority = format!("{:?}", authority); @@ -122,21 +121,6 @@ impl TestNodeProcessBuilder { cmd.arg(arg); } - let ws_port = if self.scan_port_range { - let (p2p_port, http_port, ws_port) = next_open_port() - .ok_or_else(|| "No available ports in the given port range".to_owned())?; - - cmd.arg(format!("--port={}", p2p_port)); - cmd.arg(format!("--rpc-port={}", http_port)); - cmd.arg(format!("--ws-port={}", ws_port)); - ws_port - } else { - // the default Websockets port - 9944 - }; - - let ws_url = format!("ws://127.0.0.1:{}", ws_port); - let mut proc = cmd.spawn().map_err(|e| { format!( "Error spawning substrate node '{}': {}", @@ -144,37 +128,18 @@ impl TestNodeProcessBuilder { e ) })?; - // wait for rpc to be initialized - const MAX_ATTEMPTS: u32 = 6; - let mut attempts = 1; - let mut wait_secs = 1; - let client = loop { - thread::sleep(time::Duration::from_secs(wait_secs)); - log::info!( - "Connecting to contracts enabled node, attempt {}/{}", - attempts, - MAX_ATTEMPTS - ); - let result = ClientBuilder::new().set_url(ws_url.clone()).build().await; - match result { - Ok(client) => break Ok(client), - Err(err) => { - if attempts < MAX_ATTEMPTS { - attempts += 1; - wait_secs *= 2; // backoff - continue - } - break Err(err) - } - } - }; + + // Wait for RPC port to be logged (it's logged to stderr): + let stderr = proc.stderr.take().unwrap(); + let ws_port = find_substrate_port_from_output(stderr); + let ws_url = format!("ws://127.0.0.1:{}", ws_port); + + // Connect to the node with a subxt client: + let client = ClientBuilder::new().set_url(ws_url.clone()).build().await; match client { Ok(client) => Ok(TestNodeProcess { proc, client }), Err(err) => { - let err = format!( - "Failed to connect to node rpc at {} after {} attempts: {}", - ws_url, attempts, err - ); + let err = format!("Failed to connect to node rpc at {}: {}", ws_url, err); log::error!("{}", err); proc.kill().map_err(|e| { format!("Error killing substrate process '{}': {}", proc.id(), e) @@ -185,25 +150,30 @@ impl TestNodeProcessBuilder { } } -/// Returns the next set of 3 open ports. -/// -/// Returns None if there are not 3 open ports available. -fn next_open_port() -> Option<(u16, u16, u16)> { - // Ask the kernel to allocate a port. - let next_port = || { - match TcpListener::bind(("127.0.0.1", 0)) { - Ok(listener) => { - if let Ok(address) = listener.local_addr() { - Some(address.port()) - } else { - None - } - } - Err(_) => None, - } - }; - - // The ports allocated should be different, unless in - // the unlikely case that the system has less than 3 available ports. - Some((next_port()?, next_port()?, next_port()?)) +// Consume a stderr reader from a spawned substrate command and +// locate the port number that is logged out to it. +fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 { + BufReader::new(r) + .lines() + .find_map(|line| { + let line = line + .expect("failed to obtain next line from stdout for port discovery"); + + // does the line contain our port (we expect this specific output from substrate). + let line_end = match line.rsplit_once("Listening for new connections on 127.0.0.1:") { + None => return None, + Some((_, after)) => after + }; + + // trim non-numeric chars from the end of the port part of the line. + let port_str = line_end.trim_end_matches(|b| !('0'..='9').contains(&b)); + + // expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16. + let port_num = port_str + .parse() + .unwrap_or_else(|_| panic!("valid port expected on 'Listening for new connections' line, got '{port_str}'")); + + Some(port_num) + }) + .expect("We should find a port before the reader ends") } diff --git a/test-runtime/Cargo.toml b/test-runtime/Cargo.toml index f887b1bfd4..7ee7424920 100644 --- a/test-runtime/Cargo.toml +++ b/test-runtime/Cargo.toml @@ -11,5 +11,5 @@ codec = { package = "parity-scale-codec", version = "3.0.0", default-features = [build-dependencies] subxt = { path = "../subxt" } sp-core = "6.0.0" -tokio = { version = "1.8", features = ["rt", "macros"] } +tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] } which = "4.2.2"