diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs index 70f25157a..d8b41a657 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/event_loop/state.rs @@ -75,18 +75,14 @@ impl State { mut transaction: Transaction, options: &InterpreterOptions, ) -> Result<(), ExecutionError> { - let result = match transaction.start(options).await { + match transaction.start(options).await { Ok(_) => { self.transactions.init_tx(transaction.id(), transaction); + trace!("transaction started"); Ok(()) } - Err(e) => { - error!(tx_error = %e); - Err(e) - } - }; - trace!("transaction started"); - result + Err(e) => Err(e), + } } #[allow(clippy::unused_async)] @@ -103,7 +99,7 @@ impl State { let tx = match self.get_tx(&tx_id) { Ok(tx) => tx, Err(e) => { - error!( + debug!( port = %port, error=%e, "error handling port input" ); return Err(e); @@ -137,7 +133,7 @@ impl State { let tx = match self.get_tx(&tx_id) { Ok(tx) => tx, Err(e) => { - error!( + debug!( port = %port, error=%e, "error handling port output" ); return Err(e); diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction.rs index 4f626e851..6db67a117 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction.rs @@ -176,6 +176,8 @@ impl Transaction { while let Some(Ok(packet)) = payloads.next().await { if let Ok(port) = input.find_input(packet.port()) { accept_input(tx_id, port, &input, &channel, packet).await; + } else if packet.is_noop() { + // TODO: propagate this and/or its context if it becomes an issue. } else { warn!(port = packet.port(), "dropping packet for unconnected port"); } @@ -216,6 +218,7 @@ impl Transaction { pub(crate) async fn emit_done(&self) -> Result<()> { if !self.finished.load(Ordering::Relaxed) { + self.span.in_scope(|| trace!("tx finished, dispatching done")); self.finished.store(true, Ordering::Relaxed); self.channel.dispatch_done(self.id()).await; } @@ -249,6 +252,7 @@ impl Transaction { pub(crate) async fn check_hung(&self) -> Result { if self.done() { + error!("transaction done but not cleaned up yet"); self.channel.dispatch_done(self.id()).await; Ok(false) } else { diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation.rs index 834d6d185..fda0e1094 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation.rs @@ -173,14 +173,7 @@ impl InstanceHandler { let current_status = port.status(); let new_status = match current_status { - PortStatus::Open => { - // Note: A port can still be "Open" after a call if the operation panics. - // if port.is_empty() { - // PortStatus::DoneClosed - // } else { - PortStatus::DoneClosing - // } - } + PortStatus::Open => PortStatus::UpstreamComplete, orig => orig, }; @@ -219,7 +212,11 @@ impl InstanceHandler { let namespace = self.namespace().to_owned(); let mut associated_data = self.schematic.nodes()[self.index()].data().clone(); - associated_data.config.set_root(config.root().cloned()); + + if associated_data.config.root().is_none() { + associated_data.config.set_root(config.root().cloned()); + } + if associated_data.config.value().is_none() { associated_data.config.set_value(config.value().cloned()); } diff --git a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation/port.rs b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation/port.rs index c10152e1e..d5611fa8e 100644 --- a/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation/port.rs +++ b/crates/wick/flow-graph-interpreter/src/interpreter/executor/transaction/operation/port.rs @@ -15,6 +15,7 @@ pub(crate) enum PortStatus { Open, DoneClosing, DoneClosed, + UpstreamComplete, } impl PortStatus { @@ -45,6 +46,7 @@ impl std::fmt::Display for PortStatus { PortStatus::Open => "Open", PortStatus::DoneClosing => "DoneClosing", PortStatus::DoneClosed => "DoneClosed", + PortStatus::UpstreamComplete => "UpstreamComplete", } ) } diff --git a/crates/wick/wick-logger/src/logger.rs b/crates/wick/wick-logger/src/logger.rs index 88a23a7d4..719745855 100644 --- a/crates/wick/wick-logger/src/logger.rs +++ b/crates/wick/wick-logger/src/logger.rs @@ -59,6 +59,9 @@ fn silly_modules(module: &str) -> bool { "flow_graph_interpreter", "wasmtime_provider", "wasmrs", + "wasmrs_rx", + "wasmrs_runtime", + "wasmrs_guest", "wick_wascap", "flow_graph", ] @@ -74,37 +77,42 @@ where // This is split up into an if/else because FilterFn needs an fn type. // If the closure captures opts.silly then it won't be coercable to an fn. if opts.silly { - filter::dynamic_filter_fn(move |_metadata, _cx| true) - } else { - filter::dynamic_filter_fn(move |metadata, cx| { - #[allow(clippy::option_if_let_else)] - if let Some(md) = cx.current_span().metadata() { - let module = &metadata + filter::dynamic_filter_fn(move |_metadata, _cx| { + !hushed_modules( + _metadata .module_path() .unwrap_or_default() .split("::") .next() - .unwrap_or_default(); - if hushed_modules(module) { - return false; - } - if silly_modules(module) { - matches!(*md.level(), tracing::Level::ERROR | tracing::Level::WARN) - } else { - true - } + .unwrap_or_default(), + ) + }) + } else { + filter::dynamic_filter_fn(move |metadata, _cx| { + let module = &metadata + .module_path() + .unwrap_or_default() + .split("::") + .next() + .unwrap_or_default(); + + #[cfg(feature = "audit")] + if _cx.current_span().metadata().is_none() && !hushed_modules(module) { + warn!( + "Logging without a span: {} at {}:{}", + metadata.module_path().unwrap_or_default(), + metadata.file().unwrap_or_default(), + metadata.line().unwrap_or_default() + ); + } + + if hushed_modules(module) { + return false; + } + if silly_modules(module) { + matches!(*metadata.level(), tracing::Level::ERROR | tracing::Level::WARN) } else { - let module = &metadata - .module_path() - .unwrap_or_default() - .split("::") - .next() - .unwrap_or_default(); - if !hushed_modules(module) { - !silly_modules(module) - } else { - false - } + true } }) } @@ -229,10 +237,12 @@ where Some( tracing_subscriber::fmt::layer() .with_writer(stderr_writer) - .with_thread_names(true) .with_ansi(with_color) - .with_target(true) .with_timer(timer) + .with_thread_names(cfg!(debug_assertions)) + .with_target(cfg!(debug_assertions)) + .with_file(cfg!(debug_assertions)) + .with_line_number(cfg!(debug_assertions)) .with_filter(get_levelfilter(opts)) .with_filter(wick_filter(opts)), ),