From f9de9d930c28f5329ac8a35b1cb208989f15f3f6 Mon Sep 17 00:00:00 2001 From: Caleb Schoepp Date: Thu, 29 Aug 2024 09:14:05 -0600 Subject: [PATCH] host-guest-host test and wire ObserveContext into outbound http Signed-off-by: Caleb Schoepp --- Cargo.lock | 2 +- crates/factor-observe/Cargo.toml | 3 +- crates/factor-observe/src/future.rs | 80 ------------- crates/factor-observe/src/host.rs | 1 + crates/factor-observe/src/lib.rs | 111 +++++------------- crates/factor-outbound-http/Cargo.toml | 1 + crates/factor-outbound-http/src/lib.rs | 4 + crates/factor-outbound-http/src/spin.rs | 2 + crates/factor-outbound-http/src/wasi.rs | 2 + crates/trigger/src/factors.rs | 4 +- tests/integration.rs | 72 ++++++++++++ .../wasi-observe-tracing/src/lib.rs | 16 ++- .../testcases/wasi-observe-tracing/spin.toml | 2 + 13 files changed, 133 insertions(+), 167 deletions(-) delete mode 100644 crates/factor-observe/src/future.rs diff --git a/Cargo.lock b/Cargo.lock index da91e64110..275f577560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7436,7 +7436,6 @@ dependencies = [ "opentelemetry 0.22.0", "opentelemetry-otlp 0.15.0", "opentelemetry_sdk 0.22.1", - "pin-project-lite", "serde 1.0.197", "spin-app", "spin-core", @@ -7463,6 +7462,7 @@ dependencies = [ "hyper 1.4.1", "reqwest 0.11.27", "rustls 0.23.7", + "spin-factor-observe", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", diff --git a/crates/factor-observe/Cargo.toml b/crates/factor-observe/Cargo.toml index d102a4d5ef..3a519ed189 100644 --- a/crates/factor-observe/Cargo.toml +++ b/crates/factor-observe/Cargo.toml @@ -14,7 +14,6 @@ once_cell = "1" opentelemetry = { version = "0.22.0", features = [ "metrics", "trace"] } opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.15.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client", "metrics", "grpc-tonic"] } -pin-project-lite = "0.2" serde = "1.0.188" spin-app = { path = "../app" } spin-core = { path = "../core" } @@ -35,4 +34,4 @@ toml = "0.5" [lints] workspace = true -# TODO(Caleb): Cleanup these dependencies \ No newline at end of file +# TODO(Caleb): Cleanup these dependencies, use workspace, remove not needed diff --git a/crates/factor-observe/src/future.rs b/crates/factor-observe/src/future.rs deleted file mode 100644 index 0c44c7cefe..0000000000 --- a/crates/factor-observe/src/future.rs +++ /dev/null @@ -1,80 +0,0 @@ -use pin_project_lite::pin_project; -use std::{ - future::Future, - sync::{Arc, RwLock}, -}; - -use crate::State; - -// pin_project! { -// struct Instrumented { -// #[pin] -// inner: F, -// observe_context: ObserveContext, -// } - -// impl PinnedDrop for Instrumented { -// fn drop(this: Pin<&mut Self>) { -// this.project().observe_context.drop_all(); -// } -// } -// } - -// pub trait FutureExt: Future + Sized { -// /// Manage WASI Observe guest spans. -// fn manage_wasi_observe_spans( -// self, -// observe_context: ObserveContext, -// ) -> impl Future; -// } - -// impl FutureExt for F { -// fn manage_wasi_observe_spans( -// self, -// observe_context: ObserveContext, -// ) -> impl Future { -// Instrumented { -// inner: self, -// observe_context, -// } -// } -// } - -// impl Future for Instrumented { -// type Output = F::Output; - -// /// Maintains the invariant that all active spans are entered before polling the inner future -// /// and exited otherwise. If we don't do this then the timing (among many other things) of the -// /// spans becomes wildly incorrect. -// fn poll( -// self: std::pin::Pin<&mut Self>, -// cx: &mut std::task::Context<'_>, -// ) -> std::task::Poll { -// let this = self.project(); - -// // Enter the active spans before entering the inner poll -// { -// this.observe_context.state.write().unwrap().enter_all(); -// } - -// let ret = this.inner.poll(cx); - -// // Exit the active spans after exiting the inner poll -// { -// this.observe_context.state.write().unwrap().exit_all(); -// } - -// ret -// } -// } - -/// The context necessary for the observe host component to function. -pub struct ObserveContext { - pub(crate) state: Arc>, -} - -// impl ObserveContext { -// fn drop_all(&self) { -// self.state.write().unwrap().close_from_back_to(0); -// } -// } diff --git a/crates/factor-observe/src/host.rs b/crates/factor-observe/src/host.rs index 6cef461c22..f5be718617 100644 --- a/crates/factor-observe/src/host.rs +++ b/crates/factor-observe/src/host.rs @@ -123,6 +123,7 @@ impl traces::HostSpan for InstanceState { fn drop(&mut self, _resource: Resource) -> Result<()> { // TODO(Caleb): What do we want the dropping behavior to be? + // TODO(Caleb): How is the drop semantics test passing? Ok(()) } } diff --git a/crates/factor-observe/src/lib.rs b/crates/factor-observe/src/lib.rs index 4bd952f2b3..fb179ad5b6 100644 --- a/crates/factor-observe/src/lib.rs +++ b/crates/factor-observe/src/lib.rs @@ -1,11 +1,11 @@ -pub mod future; mod host; use std::sync::{Arc, RwLock}; -use future::ObserveContext; use indexmap::IndexMap; +use opentelemetry::{global::ObjectSafeSpan, trace::TraceContextExt, Context}; use spin_factors::{Factor, SelfInstanceBuilder}; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Default)] pub struct ObserveFactor {} @@ -57,29 +57,6 @@ pub struct InstanceState { impl SelfInstanceBuilder for InstanceState {} impl InstanceState { - // /// Close the span associated with the given resource and optionally drop the resource - // /// from the table. Additionally close any other active spans that are more recent on the stack - // /// in reverse order. - // /// - // /// Exiting any spans that were already closed will not cause this to error. - // fn safely_close(&mut self, resource_id: u32, drop_resource: bool) { - // let mut state: std::sync::RwLockWriteGuard = self.state.write().unwrap(); - - // if let Some(index) = state - // .active_spans - // .iter() - // .rposition(|(_, id)| *id == resource_id) - // { - // state.close_from_back_to(index); - // } else { - // tracing::debug!("found no active spans to close") - // } - - // if drop_resource { - // state.guest_spans.remove(resource_id).unwrap(); - // } - // } - pub fn get_observe_context(&self) -> ObserveContext { ObserveContext { state: self.state.clone(), @@ -100,65 +77,37 @@ pub(crate) struct State { pub active_spans: IndexMap, } -// impl State { -// /// Close all active spans from the top of the stack to the given index. Closing entails exiting -// /// the inner [tracing] span and removing it from the active spans stack. -// pub(crate) fn close_from_back_to(&mut self, index: usize) { -// self.active_spans -// .split_off(index) -// .iter() -// .rev() -// .for_each(|(_, id)| { -// if let Some(guest_span) = self.guest_spans.get(*id) { -// guest_span.exit(); -// } else { -// tracing::debug!("active_span {id:?} already removed from resource table"); -// } -// }); -// } - -// /// Enter the inner [tracing] span for all active spans. -// pub(crate) fn enter_all(&self) { -// for (_, guest_span_id) in self.active_spans.iter() { -// if let Some(span_resource) = self.guest_spans.get(*guest_span_id) { -// span_resource.enter(); -// } else { -// tracing::debug!("guest span already dropped") -// } -// } -// } - -// /// Exit the inner [tracing] span for all active spans. -// pub(crate) fn exit_all(&self) { -// for (_, guest_span_id) in self.active_spans.iter().rev() { -// if let Some(span_resource) = self.guest_spans.get(*guest_span_id) { -// span_resource.exit(); -// } else { -// tracing::debug!("guest span already dropped") -// } -// } -// } -// } - /// The WIT resource Span. Effectively wraps an [opentelemetry_sdk::trace::Span]. pub struct GuestSpan { /// The [opentelemetry_sdk::trace::Span] we use to do the actual tracing work. pub inner: opentelemetry_sdk::trace::Span, } -// // Note: We use tracing enter instead of Entered because Entered is not Send -// impl GuestSpan { -// /// Enter the inner [tracing] span. -// pub fn enter(&self) { -// self.inner.with_subscriber(|(id, dispatch)| { -// dispatch.enter(id); -// }); -// } - -// /// Exits the inner [tracing] span. -// pub fn exit(&self) { -// self.inner.with_subscriber(|(id, dispatch)| { -// dispatch.exit(id); -// }); -// } -// } +pub struct ObserveContext { + pub(crate) state: Arc>, +} + +impl ObserveContext { + /// TODO comment + pub fn oh_dear_i_better_get_renamed(&self) { + // TODO: Move this duplicate logic into its own impl + let state = self.state.read().unwrap(); + if state.active_spans.is_empty() { + return; + } + + let parent_context = Context::new().with_remote_span_context( + state + .guest_spans + .get(*state.active_spans.last().unwrap().1) + .unwrap() + .inner + .span_context() + .clone(), + ); + tracing::Span::current().set_parent(parent_context); + } +} + +// TODO(Caleb): Reorder things +// TODO(Caleb): Make otel a workspace dependency diff --git a/crates/factor-outbound-http/Cargo.toml b/crates/factor-outbound-http/Cargo.toml index 9d22b59084..38e2c73f89 100644 --- a/crates/factor-outbound-http/Cargo.toml +++ b/crates/factor-outbound-http/Cargo.toml @@ -11,6 +11,7 @@ http-body-util = "0.1" hyper = "1.4.1" reqwest = { version = "0.11", features = ["gzip"] } rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } +spin-factor-observe = { path = "../factor-observe" } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-telemetry = { path = "../telemetry" } diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index 739be2ab9a..c0b59fae76 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -10,6 +10,7 @@ use http::{ uri::{Authority, Parts, PathAndQuery, Scheme}, HeaderValue, Uri, }; +use spin_factor_observe::{ObserveContext, ObserveFactor}; use spin_factor_outbound_networking::{ ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor, }; @@ -65,6 +66,7 @@ impl Factor for OutboundHttpFactor { let outbound_networking = builders.get_mut::()?; let allowed_hosts = outbound_networking.allowed_hosts(); let component_tls_configs = outbound_networking.component_tls_configs().clone(); + let observe_context = builders.get_mut::()?.get_observe_context(); Ok(InstanceState { wasi_http_ctx: WasiHttpCtx::new(), allowed_hosts, @@ -72,6 +74,7 @@ impl Factor for OutboundHttpFactor { self_request_origin: None, request_interceptor: None, spin_http_client: None, + observe_context, }) } } @@ -84,6 +87,7 @@ pub struct InstanceState { request_interceptor: Option>, // Connection-pooling client for 'fermyon:spin/http' interface spin_http_client: Option, + observe_context: ObserveContext, } impl InstanceState { diff --git a/crates/factor-outbound-http/src/spin.rs b/crates/factor-outbound-http/src/spin.rs index 633df727d9..08691932be 100644 --- a/crates/factor-outbound-http/src/spin.rs +++ b/crates/factor-outbound-http/src/spin.rs @@ -13,6 +13,8 @@ impl spin_http::Host for crate::InstanceState { fields(otel.kind = "client", url.full = Empty, http.request.method = Empty, http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))] async fn send_request(&mut self, req: Request) -> Result { + self.observe_context.oh_dear_i_better_get_renamed(); + let span = Span::current(); record_request_fields(&span, &req); diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 8d49bad2ab..6965b9982f 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -86,6 +86,8 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> { mut request: Request, mut config: wasmtime_wasi_http::types::OutgoingRequestConfig, ) -> wasmtime_wasi_http::HttpResult { + self.state.observe_context.oh_dear_i_better_get_renamed(); + // wasmtime-wasi-http fills in scheme and authority for relative URLs // (e.g. https://:443/), which makes them hard to reason about. // Undo that here. diff --git a/crates/trigger/src/factors.rs b/crates/trigger/src/factors.rs index 257409444c..a26734013d 100644 --- a/crates/trigger/src/factors.rs +++ b/crates/trigger/src/factors.rs @@ -18,6 +18,7 @@ use spin_runtime_config::TomlRuntimeConfigSource; #[derive(RuntimeFactors)] pub struct TriggerFactors { + pub observe: ObserveFactor, pub wasi: WasiFactor, pub variables: VariablesFactor, pub key_value: KeyValueFactor, @@ -29,7 +30,6 @@ pub struct TriggerFactors { pub pg: OutboundPgFactor, pub mysql: OutboundMysqlFactor, pub llm: LlmFactor, - pub observe: ObserveFactor, } impl TriggerFactors { @@ -42,6 +42,7 @@ impl TriggerFactors { use_gpu: bool, ) -> anyhow::Result { Ok(Self { + observe: ObserveFactor::new(), wasi: wasi_factor(working_dir, allow_transient_writes), variables: VariablesFactor::default(), key_value: KeyValueFactor::new(default_key_value_label_resolver), @@ -56,7 +57,6 @@ impl TriggerFactors { spin_factor_llm::spin::default_engine_creator(state_dir, use_gpu) .context("failed to configure LLM factor")?, ), - observe: ObserveFactor::new(), }) } } diff --git a/tests/integration.rs b/tests/integration.rs index 601998e0f6..041fb011e6 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -434,6 +434,77 @@ mod integration_tests { Ok(()) } + #[tokio::test] + async fn wasi_observe_host_guest_host() -> anyhow::Result<()> { + let collector = FakeCollectorServer::start() + .await + .expect("fake collector server should start"); + let collector_endpoint = collector.endpoint().clone(); + + tokio::task::spawn_blocking(|| { + run_test_inited( + "wasi-observe-tracing", + SpinConfig { + binary_path: spin_binary(), + spin_up_args: Vec::new(), + app_type: SpinAppType::Http, + }, + ServicesConfig::none(), + |env| { + env.set_env_var("OTEL_EXPORTER_OTLP_ENDPOINT", collector_endpoint); + env.set_env_var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "grpc"); + env.set_env_var("OTEL_BSP_SCHEDULE_DELAY", "5"); + Ok(()) + }, + move |env| { + let spin = env.runtime_mut(); + assert_spin_request( + spin, + Request::new(Method::Get, "/host-guest-host"), + Response::new(200), + )?; + + let mut spans: Vec; + assert_eventually!( + { + spans = collector.exported_spans(); + !spans.is_empty() + }, + 5 + ); + + assert_eq!(spans.len(), 4); + + assert!(spans + .iter() + .map(|s| s.trace_id.clone()) + .all(|t| t == spans[0].trace_id)); + + let exec_component_span = spans + .iter() + .find(|s| s.name == "execute_wasm_component wasi-observe-tracing") + .expect("'execute_wasm_component wasi-observe-tracing' span should exist"); + let guest_span = spans + .iter() + .find(|s| s.name == "guest") + .expect("'guest' span should exist"); + let get_span = spans + .iter() + .find(|s| s.name == "GET") + .expect("'GET' span should exist"); + + assert_eq!(guest_span.parent_span_id, exec_component_span.span_id); + assert_eq!(get_span.parent_span_id, guest_span.span_id); + + Ok(()) + }, + ) + }) + .await??; + + Ok(()) + } + // TODO: wasi_observe_set_event // TODO: wasi_observe_update_name // TODO: wasi_observe_set_link @@ -441,6 +512,7 @@ mod integration_tests { // TODO: semantics of closing a parent doesn't close child // TODO: inbound trace propagation // TODO: outbound trace propagation + // TODO: Weird edge cases where a span is closed before we try to load it implicitly as parent or as observecontext thing. #[test] /// Test dynamic environment variables diff --git a/tests/test-components/components/wasi-observe-tracing/src/lib.rs b/tests/test-components/components/wasi-observe-tracing/src/lib.rs index 22eb18c339..b1fa359a4a 100644 --- a/tests/test-components/components/wasi-observe-tracing/src/lib.rs +++ b/tests/test-components/components/wasi-observe-tracing/src/lib.rs @@ -5,7 +5,7 @@ wit_bindgen::generate!({ }); use spin_sdk::{ - http::{Params, Request, Response, Router}, + http::{Method, Params, Request, Response, Router}, http_component, }; use wasi::observe::traces::{KeyValue, Span, Value}; @@ -16,6 +16,7 @@ fn handle(req: http::Request<()>) -> Response { router.get("/nested-spans", nested_spans); router.get("/drop-semantics", drop_semantics); router.get("/setting-attributes", setting_attributes); + router.get_async("/host-guest-host", host_guest_host); router.handle(req) } @@ -56,3 +57,16 @@ fn setting_attributes(_req: Request, _params: Params) -> Response { span.end(); Response::new(200, "") } + +async fn host_guest_host(_req: Request, _params: Params) -> Response { + let span = Span::start("guest"); + + let req = Request::builder() + .method(Method::Get) + .uri("https://asdf.com") + .build(); + let _res: Response = spin_sdk::http::send(req).await.unwrap(); + span.end(); + + Response::new(200, "") +} diff --git a/tests/testcases/wasi-observe-tracing/spin.toml b/tests/testcases/wasi-observe-tracing/spin.toml index a72de91568..cc4698b5f9 100644 --- a/tests/testcases/wasi-observe-tracing/spin.toml +++ b/tests/testcases/wasi-observe-tracing/spin.toml @@ -12,6 +12,8 @@ component = "wasi-observe-tracing" [component.wasi-observe-tracing] source = "%{source=wasi-observe-tracing}" +key_value_stores = ["default"] +allowed_outbound_hosts = ["http://self", "https://asdf.com"] [component.wasi-observe-tracing.build] command = "cargo build --target wasm32-wasi --release" watch = ["src/**/*.rs", "Cargo.toml"]