From 70bdcd33efb037ea1155f3f6a67b5134297e885b Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Tue, 5 Mar 2024 16:41:47 -0800 Subject: [PATCH 1/8] feat: Add workflow spans and every cli logging --- homestar-runtime/src/logger.rs | 67 ++++++++++++++++------ homestar-runtime/src/tasks/wasm.rs | 3 +- homestar-runtime/src/worker.rs | 49 +++++++++++++--- homestar-runtime/src/worker/resolver.rs | 3 +- homestar-wasm/src/wasmtime/host/helpers.rs | 2 + homestar-wasm/src/wasmtime/world.rs | 6 +- 6 files changed, 101 insertions(+), 29 deletions(-) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 744a21c5..65ad5584 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -43,31 +43,60 @@ fn init( guard: WorkerGuard, #[allow(unused_variables)] settings: &settings::Monitoring, ) -> WorkerGuard { + // RUST_LOG ignored when EVERY_CLI is true + let every_cli = std::env::var("EVERY_CLI").is_ok_and(|val| val == "true"); + // TODO: Add support for customizing logger(s) / specialzed formatters. - let format_layer = tracing_logfmt::builder() - .with_level(true) - .with_target(true) - .with_span_name(true) - .with_span_path(true) - .with_location(true) - .with_module_path(true) - .layer() - .with_writer(writer); + let format_layer = if every_cli { + tracing_logfmt::builder() + .with_level(true) + .with_target(false) + .with_span_name(false) + .with_span_path(false) + .with_location(false) + .with_module_path(false) + .layer() + .with_writer(writer) + } else { + tracing_logfmt::builder() + .with_level(true) + .with_target(true) + .with_span_name(true) + .with_span_path(true) + .with_location(true) + .with_module_path(true) + .layer() + .with_writer(writer) + }; - let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { - EnvFilter::new("info") - .add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + let filter = if every_cli { + EnvFilter::new("off") + .add_directive( + "homestar_runtime::worker[run]=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) .add_directive( - "libp2p_gossipsub::behaviour=info" + "homestar_runtime::worker[spawn_tasks]=info" .parse() .expect(DIRECTIVE_EXPECT), ) - .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT)) - }); + } else { + EnvFilter::try_from_default_env().unwrap_or_else(|_| { + EnvFilter::new("info") + .add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive( + "libp2p_gossipsub::behaviour=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) + .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT)) + }) + }; #[cfg(all( feature = "console", diff --git a/homestar-runtime/src/tasks/wasm.rs b/homestar-runtime/src/tasks/wasm.rs index 5b51e5df..d14341bc 100644 --- a/homestar-runtime/src/tasks/wasm.rs +++ b/homestar-runtime/src/tasks/wasm.rs @@ -8,6 +8,7 @@ use homestar_wasm::{ io::{Arg, Output}, wasmtime::{world::Env, Error as WasmRuntimeError, State, World}, }; +use tracing::Instrument; #[allow(dead_code)] #[allow(missing_debug_implementations)] @@ -32,7 +33,7 @@ impl WasmContext { args: Args, ) -> Result { let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?; - env.execute(args).await + env.execute(args).in_current_span().await } } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index a5f2e31e..d5830606 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -36,7 +36,7 @@ use indexmap::IndexMap; use libipld::{Cid, Ipld}; use std::{collections::BTreeMap, sync::Arc}; use tokio::task::JoinSet; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, info_span, instrument, Instrument}; mod poller; mod resolver; @@ -157,6 +157,7 @@ where /// [Instruction]: homestar_invocation::task::Instruction /// [Swarm]: crate::network::swarm /// [LinkMap]: homestar_workflow::LinkMap + #[instrument(skip_all)] pub(crate) async fn run(self, running_tasks: Arc, fetch_fn: F) -> Result<()> where F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, @@ -169,6 +170,15 @@ where .await { Ok(ctx) => { + let workflow_cid = self.workflow_info.cid.to_string(); + + info!( + subject = "worker.init_workflow", + category = "worker.run", + workflow_cid, + "initializing workflow" + ); + let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone(); let resolver = DHTResolver::new( promises_to_resolve, @@ -181,7 +191,7 @@ where info!( subject = "worker.resolve_receipts", category = "worker.run", - workflow_cid = self.workflow_info.cid.to_string(), + workflow_cid, "resolving receipts in the background" ); poller::poll( @@ -209,8 +219,26 @@ where )?; } + info!( + subject = "worker.start_workflow", + category = "worker.run", + workflow_cid, + "starting workflow" + ); + // Run the queue of tasks. - self.run_queue(ctx.scheduler, running_tasks).await + let result = self.run_queue(ctx.scheduler, running_tasks).await; + + if let Ok(()) = result { + info!( + subject = "worker.end_workflow", + category = "worker.run", + workflow_cid, + "workflow completed" + ); + } + + result } Err(err) => { error!(subject = "worker.init.err", @@ -223,6 +251,7 @@ where } #[allow(unused_mut)] + #[instrument(skip_all)] async fn run_queue( mut self, mut scheduler: TaskScheduler<'a>, @@ -321,7 +350,7 @@ where category = "worker.run", workflow_cid = workflow_cid.to_string(), cid = cid.to_string(), - "attempting to resolve cid in workflow" + "attempting to resolve workflow args by cid" ); cid.resolve(linkmap.clone(), resources.clone(), db.clone()) @@ -329,9 +358,11 @@ where }); let handle = task_set.spawn(async move { - match resolved.await { + match resolved.await { Ok(inst_result) => { - match wasm_ctx.run(wasm, &fun, inst_result).await { + match wasm_ctx.run(wasm, &fun, inst_result).instrument({ + info_span!("wasm_run").or_current() + }).await { Ok(output) => Ok(( output, instruction_ptr, @@ -352,7 +383,11 @@ where }) } } - }); + } + .instrument({ + info_span!("spawn_tasks").or_current() + })); + handles.push(handle); } None => error!( diff --git a/homestar-runtime/src/worker/resolver.rs b/homestar-runtime/src/worker/resolver.rs index 8629cfa5..9edadde4 100644 --- a/homestar-runtime/src/worker/resolver.rs +++ b/homestar-runtime/src/worker/resolver.rs @@ -23,7 +23,7 @@ use tokio::{ sync::RwLock, time::{timeout_at, Instant}, }; -use tracing::debug; +use tracing::{debug, instrument}; pub(crate) trait Resolver { async fn resolve( @@ -35,6 +35,7 @@ pub(crate) trait Resolver { } impl Resolver for Cid { + #[instrument(level = "debug", name = "cid_resolve", skip_all)] async fn resolve( self, linkmap: Arc>>>, diff --git a/homestar-wasm/src/wasmtime/host/helpers.rs b/homestar-wasm/src/wasmtime/host/helpers.rs index 2dd9fb74..5309230d 100644 --- a/homestar-wasm/src/wasmtime/host/helpers.rs +++ b/homestar-wasm/src/wasmtime/host/helpers.rs @@ -6,6 +6,7 @@ use crate::wasmtime::{ }; use async_trait::async_trait; use std::time::Instant; +use tracing::instrument; #[async_trait] impl helpers::Host for State { @@ -30,6 +31,7 @@ impl helpers::Host for State { #[async_trait] impl wasi::logging::logging::Host for State { /// Log a message, formatted by the runtime subscriber. + #[instrument(name = "wasi_log", skip_all)] async fn log( &mut self, level: wasi::logging::logging::Level, diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index fc980239..f0ecf217 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -20,6 +20,7 @@ use homestar_invocation::{ task::instruction::{Args, Input}, }; use std::{iter, time::Instant}; +use tracing::{instrument, Instrument}; use wasmtime::{ component::{self, Component, Func, Instance, Linker}, Config, Engine, Store, @@ -145,6 +146,7 @@ impl Env { /// Types must conform to [Wit] IDL types when Wasm was compiled/generated. /// /// [Wit]: + #[instrument(name = "execute", skip_all)] pub async fn execute(&mut self, args: Args) -> Result where T: Send, @@ -196,6 +198,7 @@ impl Env { .ok_or(Error::WasmInstantiation)? .func() .call_async(&mut self.store, ¶ms, &mut results_alloc) + .in_current_span() .await?; self.bindings @@ -203,6 +206,7 @@ impl Env { .ok_or(Error::WasmInstantiation)? .func() .post_return_async(&mut self.store) + .in_current_span() .await?; let results = match &results_alloc[..] { @@ -415,7 +419,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result Date: Thu, 7 Mar 2024 10:11:35 -0800 Subject: [PATCH 2/8] chore: Rename spawn_tasks span to spawn_workflow_tasks --- homestar-runtime/src/logger.rs | 2 +- homestar-runtime/src/worker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 65ad5584..af923bae 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -77,7 +77,7 @@ fn init( .expect(DIRECTIVE_EXPECT), ) .add_directive( - "homestar_runtime::worker[spawn_tasks]=info" + "homestar_runtime::worker[spawn_workflow_tasks]=info" .parse() .expect(DIRECTIVE_EXPECT), ) diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index d5830606..baf6381f 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -385,7 +385,7 @@ where } } .instrument({ - info_span!("spawn_tasks").or_current() + info_span!("spawn_workflow_tasks").or_current() })); handles.push(handle); From c378f12cfd77671fb7fcdfe76e50ff71843e8d6f Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Thu, 7 Mar 2024 10:46:15 -0800 Subject: [PATCH 3/8] refactor: Re-order workflow lifecycle logs --- homestar-runtime/src/worker.rs | 42 +++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index baf6381f..7fd1fdd1 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -206,12 +206,26 @@ where // Set the workflow status to running. let conn = &mut self.db.conn()?; if ctx.scheduler.run_length() > 0 { + info!( + subject = "worker.start_workflow", + category = "worker.run", + workflow_cid, + "starting workflow" + ); + Db::set_workflow_status( self.workflow_info.cid, workflow::Status::Running, conn, )?; } else { + info!( + subject = "worker.start_workflow", + category = "worker.run", + workflow_cid, + "replaying workflow" + ); + Db::set_workflow_status( self.workflow_info.cid, workflow::Status::Completed, @@ -219,26 +233,8 @@ where )?; } - info!( - subject = "worker.start_workflow", - category = "worker.run", - workflow_cid, - "starting workflow" - ); - // Run the queue of tasks. - let result = self.run_queue(ctx.scheduler, running_tasks).await; - - if let Ok(()) = result { - info!( - subject = "worker.end_workflow", - category = "worker.run", - workflow_cid, - "workflow completed" - ); - } - - result + self.run_queue(ctx.scheduler, running_tasks).await } Err(err) => { error!(subject = "worker.init.err", @@ -477,6 +473,14 @@ where // Set the workflow status to `completed` let conn = &mut self.db.conn()?; Db::set_workflow_status(self.workflow_info.cid, workflow::Status::Completed, conn)?; + + info!( + subject = "worker.end_workflow", + category = "worker.run", + workflow_cid = self.workflow_info.cid.to_string(), + "workflow completed" + ); + Ok(()) } } From 49d4628db3453cf55a3eaefb3627f9e8af9bd8d2 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Thu, 7 Mar 2024 11:09:51 -0800 Subject: [PATCH 4/8] chore: Add WASI logging at trace level --- homestar-runtime/src/logger.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index af923bae..3675994e 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -81,6 +81,11 @@ fn init( .parse() .expect(DIRECTIVE_EXPECT), ) + .add_directive( + "homestar_wasm[wasi_log]=trace" + .parse() + .expect(DIRECTIVE_EXPECT), + ) } else { EnvFilter::try_from_default_env().unwrap_or_else(|_| { EnvFilter::new("info") From de83f18c8eb7fe17fe3d986e21f32ccc72de8cfb Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Thu, 7 Mar 2024 11:26:29 -0800 Subject: [PATCH 5/8] refactor: Change wasm_run to a debug span --- homestar-runtime/src/worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 7fd1fdd1..3c62c18d 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -36,7 +36,7 @@ use indexmap::IndexMap; use libipld::{Cid, Ipld}; use std::{collections::BTreeMap, sync::Arc}; use tokio::task::JoinSet; -use tracing::{debug, error, info, info_span, instrument, Instrument}; +use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument}; mod poller; mod resolver; @@ -357,7 +357,7 @@ where match resolved.await { Ok(inst_result) => { match wasm_ctx.run(wasm, &fun, inst_result).instrument({ - info_span!("wasm_run").or_current() + debug_span!("wasm_run").or_current() }).await { Ok(output) => Ok(( output, From 60c4d4117c13fa576a7001bd7a7cb35987d199b2 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Thu, 7 Mar 2024 11:43:58 -0800 Subject: [PATCH 6/8] chore: const EVERY_CLI string --- homestar-runtime/src/logger.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 3675994e..4b4c9fc8 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -7,6 +7,7 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; const LOG_FILE: &str = "homestar.log"; const DIRECTIVE_EXPECT: &str = "Invalid tracing directive"; +const EVERY_CLI: &str = "EVERY_CLI"; /// Logger interface. #[derive(Debug)] @@ -44,7 +45,7 @@ fn init( #[allow(unused_variables)] settings: &settings::Monitoring, ) -> WorkerGuard { // RUST_LOG ignored when EVERY_CLI is true - let every_cli = std::env::var("EVERY_CLI").is_ok_and(|val| val == "true"); + let every_cli: bool = std::env::var(EVERY_CLI).is_ok_and(|val| val == "true"); // TODO: Add support for customizing logger(s) / specialzed formatters. let format_layer = if every_cli { From 4e307112806c6f7fd8d7f868e12b71fb400ac1b7 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Tue, 12 Mar 2024 22:20:48 -0700 Subject: [PATCH 7/8] chore: Add computed and replayed receipt logs --- homestar-runtime/src/logger.rs | 5 +++++ homestar-runtime/src/runner.rs | 25 +++++++++++++++++++++---- homestar-runtime/src/worker.rs | 7 +++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 4b4c9fc8..9c585271 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -72,6 +72,11 @@ fn init( let filter = if every_cli { EnvFilter::new("off") + .add_directive( + "homestar_runtime::runner[run_worker]=info" + .parse() + .expect(DIRECTIVE_EXPECT), + ) .add_directive( "homestar_runtime::worker[run]=info" .parse() diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index c8e6eec6..0de197b3 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -40,7 +40,7 @@ use tokio::{ time, }; use tokio_util::time::{delay_queue, DelayQueue}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; mod error; pub(crate) mod file; @@ -702,6 +702,7 @@ impl Runner { } } + #[instrument(skip_all)] async fn run_worker>( &self, workflow: Workflow<'static, Arg>, @@ -767,9 +768,11 @@ impl Runner { async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed() }; - let handle = self - .runtime - .spawn(worker.run(self.running_tasks(), fetch_fn)); + let handle = self.runtime.spawn( + worker + .run(self.running_tasks(), fetch_fn) + .instrument(info_span!("run").or_current()), + ); // Add Cid to expirations timing wheel let delay_key = self @@ -790,6 +793,20 @@ impl Runner { .collect(); let replayed_receipt_info = find_receipt_info_by_pointers(&receipt_pointers, db)?; + // Log replayed receipts if any + if !replayed_receipt_info.is_empty() { + info!( + subject = "workflow.receipts", + category = "workflow", + receipt_cids = replayed_receipt_info + .iter() + .map(|info| info.0.to_string()) + .collect::>() + .join(","), + "replaying receipts", + ); + }; + Ok(WorkflowData { info: initial_info, name: workflow_name, diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 3c62c18d..d5d89733 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -459,6 +459,13 @@ where "committed to database" ); + info!( + subject = "worker.receipt", + category = "worker.run", + receipt_cid = stored_receipt.cid().to_string(), + "computed receipt" + ); + let _ = self .event_sender .send_async(Event::CapturedReceipt(Captured::with( From 45cb6cf9f707623bf671653e98ad8a1f2d75ecd0 Mon Sep 17 00:00:00 2001 From: Brian Ginsburg Date: Wed, 13 Mar 2024 09:17:01 -0700 Subject: [PATCH 8/8] chore: Address feedback --- homestar-runtime/src/logger.rs | 1 + homestar-wasm/src/wasmtime/world.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 9c585271..f397e6be 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -7,6 +7,7 @@ use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; const LOG_FILE: &str = "homestar.log"; const DIRECTIVE_EXPECT: &str = "Invalid tracing directive"; +// Sets simplified logging filter and format for Every CLI const EVERY_CLI: &str = "EVERY_CLI"; /// Logger interface. diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index f0ecf217..7c21b469 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -146,7 +146,7 @@ impl Env { /// Types must conform to [Wit] IDL types when Wasm was compiled/generated. /// /// [Wit]: - #[instrument(name = "execute", skip_all)] + #[instrument(skip_all)] pub async fn execute(&mut self, args: Args) -> Result where T: Send,