Skip to content

Commit

Permalink
feat: Add workflow spans and every cli logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bgins committed Mar 7, 2024
1 parent 90c6246 commit 0041da0
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 29 deletions.
67 changes: 48 additions & 19 deletions homestar-runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,60 @@ fn init(
guard: WorkerGuard,
#[allow(unused_variables)] settings: &settings::Monitoring,
) -> WorkerGuard {
// RUST_LOG ignored when EVERY_CLI is true
let every_cli = std::env::var("EVERY_CLI").is_ok_and(|val| val == "true");

// TODO: Add support for customizing logger(s) / specialzed formatters.
let format_layer = tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer);
let format_layer = if every_cli {
tracing_logfmt::builder()
.with_level(true)
.with_target(false)
.with_span_name(false)
.with_span_path(false)
.with_location(false)
.with_module_path(false)
.layer()
.with_writer(writer)
} else {
tracing_logfmt::builder()
.with_level(true)
.with_target(true)
.with_span_name(true)
.with_span_path(true)
.with_location(true)
.with_module_path(true)
.layer()
.with_writer(writer)
};

let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
let filter = if every_cli {
EnvFilter::new("off")
.add_directive(
"homestar_runtime::worker[run]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive(
"libp2p_gossipsub::behaviour=info"
"homestar_runtime::worker[spawn_tasks]=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
});
} else {
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("info")
.add_directive("homestar_wasm=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive(
"libp2p_gossipsub::behaviour=info"
.parse()
.expect(DIRECTIVE_EXPECT),
)
.add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("moka=info".parse().expect(DIRECTIVE_EXPECT))
.add_directive("jsonrpsee=info".parse().expect(DIRECTIVE_EXPECT))
})
};

#[cfg(all(
feature = "console",
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/tasks/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use homestar_wasm::{
io::{Arg, Output},
wasmtime::{world::Env, Error as WasmRuntimeError, State, World},
};
use tracing::Instrument;

#[allow(dead_code)]
#[allow(missing_debug_implementations)]
Expand All @@ -32,7 +33,7 @@ impl WasmContext {
args: Args<Arg>,
) -> Result<Output, WasmRuntimeError> {
let env = World::instantiate_with_current_env(bytes, fun_name, &mut self.env).await?;
env.execute(args).await
env.execute(args).in_current_span().await
}
}

Expand Down
49 changes: 42 additions & 7 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use indexmap::IndexMap;
use libipld::{Cid, Ipld};
use std::{collections::BTreeMap, sync::Arc};
use tokio::task::JoinSet;
use tracing::{debug, error, info};
use tracing::{debug, error, info, info_span, instrument, Instrument};

mod poller;
mod resolver;
Expand Down Expand Up @@ -157,6 +157,7 @@ where
/// [Instruction]: homestar_invocation::task::Instruction
/// [Swarm]: crate::network::swarm
/// [LinkMap]: homestar_workflow::LinkMap
#[instrument(skip_all)]
pub(crate) async fn run<F>(self, running_tasks: Arc<RunningTaskSet>, fetch_fn: F) -> Result<()>
where
F: FnOnce(FnvHashSet<Resource>) -> BoxFuture<'a, Result<IndexMap<Resource, Vec<u8>>>>,
Expand All @@ -169,6 +170,15 @@ where
.await
{
Ok(ctx) => {
let workflow_cid = self.workflow_info.cid.to_string();

info!(
subject = "worker.init_workflow",
category = "worker.run",
workflow_cid,
"initializing workflow"
);

let promises_to_resolve = ctx.scheduler.promises_to_resolve.clone();
let resolver = DHTResolver::new(
promises_to_resolve,
Expand All @@ -181,7 +191,7 @@ where
info!(
subject = "worker.resolve_receipts",
category = "worker.run",
workflow_cid = self.workflow_info.cid.to_string(),
workflow_cid,
"resolving receipts in the background"
);
poller::poll(
Expand Down Expand Up @@ -209,8 +219,26 @@ where
)?;
}

info!(
subject = "worker.start_workflow",
category = "worker.run",
workflow_cid,
"starting workflow"
);

// Run the queue of tasks.
self.run_queue(ctx.scheduler, running_tasks).await
let result = self.run_queue(ctx.scheduler, running_tasks).await;

if let Ok(()) = result {
info!(
subject = "worker.end_workflow",
category = "worker.run",
workflow_cid,
"workflow completed"
);
}

result
}
Err(err) => {
error!(subject = "worker.init.err",
Expand All @@ -223,6 +251,7 @@ where
}

#[allow(unused_mut)]
#[instrument(skip_all)]
async fn run_queue(
mut self,
mut scheduler: TaskScheduler<'a>,
Expand Down Expand Up @@ -321,17 +350,19 @@ where
category = "worker.run",
workflow_cid = workflow_cid.to_string(),
cid = cid.to_string(),
"attempting to resolve cid in workflow"
"attempting to resolve workflow args by cid"
);

cid.resolve(linkmap.clone(), resources.clone(), db.clone())
.boxed()
});

let handle = task_set.spawn(async move {
match resolved.await {
match resolved.await {
Ok(inst_result) => {
match wasm_ctx.run(wasm, &fun, inst_result).await {
match wasm_ctx.run(wasm, &fun, inst_result).instrument({
info_span!("wasm_run").or_current()
}).await {
Ok(output) => Ok((
output,
instruction_ptr,
Expand All @@ -352,7 +383,11 @@ where
})
}
}
});
}
.instrument({
info_span!("spawn_tasks").or_current()
}));

handles.push(handle);
}
None => error!(
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/worker/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::{
sync::RwLock,
time::{timeout_at, Instant},
};
use tracing::debug;
use tracing::{debug, instrument};

pub(crate) trait Resolver {
async fn resolve(
Expand All @@ -35,6 +35,7 @@ pub(crate) trait Resolver {
}

impl Resolver for Cid {
#[instrument(level = "debug", name = "cid_resolve", skip_all)]
async fn resolve(
self,
linkmap: Arc<RwLock<LinkMap<task::Result<Arg>>>>,
Expand Down
2 changes: 2 additions & 0 deletions homestar-wasm/src/wasmtime/host/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::wasmtime::{
};
use async_trait::async_trait;
use std::time::Instant;
use tracing::instrument;

#[async_trait]
impl helpers::Host for State {
Expand All @@ -30,6 +31,7 @@ impl helpers::Host for State {
#[async_trait]
impl wasi::logging::logging::Host for State {
/// Log a message, formatted by the runtime subscriber.
#[instrument(name = "wasi_log", skip_all)]
async fn log(
&mut self,
level: wasi::logging::logging::Level,
Expand Down
6 changes: 5 additions & 1 deletion homestar-wasm/src/wasmtime/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use homestar_invocation::{
task::instruction::{Args, Input},
};
use std::{iter, time::Instant};
use tracing::{instrument, Instrument};
use wasmtime::{
component::{self, Component, Func, Instance, Linker},
Config, Engine, Store,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl<T> Env<T> {
/// Types must conform to [Wit] IDL types when Wasm was compiled/generated.
///
/// [Wit]: <https://github.com/WebAssembly/component-model/blob/main/design/mvp/WIT.md>
#[instrument(name = "execute", skip_all)]
pub async fn execute(&mut self, args: Args<Arg>) -> Result<Output, Error>
where
T: Send,
Expand Down Expand Up @@ -196,13 +198,15 @@ impl<T> Env<T> {
.ok_or(Error::WasmInstantiation)?
.func()
.call_async(&mut self.store, &params, &mut results_alloc)
.in_current_span()
.await?;

self.bindings
.as_mut()
.ok_or(Error::WasmInstantiation)?
.func()
.post_return_async(&mut self.store)
.in_current_span()
.await?;

let results = match &results_alloc[..] {
Expand Down Expand Up @@ -409,7 +413,7 @@ fn component_from_bytes(bytes: &[u8], engine: Engine) -> Result<Component, Error
if is_component(chunk) {
Component::from_binary(&engine, bytes).map_err(Error::IntoWasmComponent)
} else {
tracing::info!("Converting Wasm binary into a Wasm component");
tracing::info!("converting Wasm binary into a Wasm component");

let component = ComponentEncoder::default()
.module(bytes)?
Expand Down

0 comments on commit 0041da0

Please sign in to comment.