Skip to content

Commit

Permalink
host-guest-host test and wire ObserveContext into outbound http
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <[email protected]>
  • Loading branch information
calebschoepp committed Aug 29, 2024
1 parent f78566b commit f9de9d9
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 167 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/factor-observe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ once_cell = "1"
opentelemetry = { version = "0.22.0", features = [ "metrics", "trace"] }
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client", "metrics", "grpc-tonic"] }
pin-project-lite = "0.2"
serde = "1.0.188"
spin-app = { path = "../app" }
spin-core = { path = "../core" }
Expand All @@ -35,4 +34,4 @@ toml = "0.5"
[lints]
workspace = true

# TODO(Caleb): Cleanup these dependencies
# TODO(Caleb): Cleanup these dependencies, use workspace, remove not needed
80 changes: 0 additions & 80 deletions crates/factor-observe/src/future.rs

This file was deleted.

1 change: 1 addition & 0 deletions crates/factor-observe/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl traces::HostSpan for InstanceState {

fn drop(&mut self, _resource: Resource<WitSpan>) -> Result<()> {
// TODO(Caleb): What do we want the dropping behavior to be?
// TODO(Caleb): How is the drop semantics test passing?
Ok(())
}
}
Expand Down
111 changes: 30 additions & 81 deletions crates/factor-observe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
pub mod future;
mod host;

use std::sync::{Arc, RwLock};

use future::ObserveContext;
use indexmap::IndexMap;
use opentelemetry::{global::ObjectSafeSpan, trace::TraceContextExt, Context};
use spin_factors::{Factor, SelfInstanceBuilder};
use tracing_opentelemetry::OpenTelemetrySpanExt;

#[derive(Default)]
pub struct ObserveFactor {}
Expand Down Expand Up @@ -57,29 +57,6 @@ pub struct InstanceState {
impl SelfInstanceBuilder for InstanceState {}

impl InstanceState {
// /// Close the span associated with the given resource and optionally drop the resource
// /// from the table. Additionally close any other active spans that are more recent on the stack
// /// in reverse order.
// ///
// /// Exiting any spans that were already closed will not cause this to error.
// fn safely_close(&mut self, resource_id: u32, drop_resource: bool) {
// let mut state: std::sync::RwLockWriteGuard<State> = self.state.write().unwrap();

// if let Some(index) = state
// .active_spans
// .iter()
// .rposition(|(_, id)| *id == resource_id)
// {
// state.close_from_back_to(index);
// } else {
// tracing::debug!("found no active spans to close")
// }

// if drop_resource {
// state.guest_spans.remove(resource_id).unwrap();
// }
// }

pub fn get_observe_context(&self) -> ObserveContext {
ObserveContext {
state: self.state.clone(),
Expand All @@ -100,65 +77,37 @@ pub(crate) struct State {
pub active_spans: IndexMap<String, u32>,
}

// impl State {
// /// Close all active spans from the top of the stack to the given index. Closing entails exiting
// /// the inner [tracing] span and removing it from the active spans stack.
// pub(crate) fn close_from_back_to(&mut self, index: usize) {
// self.active_spans
// .split_off(index)
// .iter()
// .rev()
// .for_each(|(_, id)| {
// if let Some(guest_span) = self.guest_spans.get(*id) {
// guest_span.exit();
// } else {
// tracing::debug!("active_span {id:?} already removed from resource table");
// }
// });
// }

// /// Enter the inner [tracing] span for all active spans.
// pub(crate) fn enter_all(&self) {
// for (_, guest_span_id) in self.active_spans.iter() {
// if let Some(span_resource) = self.guest_spans.get(*guest_span_id) {
// span_resource.enter();
// } else {
// tracing::debug!("guest span already dropped")
// }
// }
// }

// /// Exit the inner [tracing] span for all active spans.
// pub(crate) fn exit_all(&self) {
// for (_, guest_span_id) in self.active_spans.iter().rev() {
// if let Some(span_resource) = self.guest_spans.get(*guest_span_id) {
// span_resource.exit();
// } else {
// tracing::debug!("guest span already dropped")
// }
// }
// }
// }

/// The WIT resource Span. Effectively wraps an [opentelemetry_sdk::trace::Span].
pub struct GuestSpan {
/// The [opentelemetry_sdk::trace::Span] we use to do the actual tracing work.
pub inner: opentelemetry_sdk::trace::Span,
}

// // Note: We use tracing enter instead of Entered because Entered is not Send
// impl GuestSpan {
// /// Enter the inner [tracing] span.
// pub fn enter(&self) {
// self.inner.with_subscriber(|(id, dispatch)| {
// dispatch.enter(id);
// });
// }

// /// Exits the inner [tracing] span.
// pub fn exit(&self) {
// self.inner.with_subscriber(|(id, dispatch)| {
// dispatch.exit(id);
// });
// }
// }
pub struct ObserveContext {
pub(crate) state: Arc<RwLock<State>>,
}

impl ObserveContext {
/// TODO comment
pub fn oh_dear_i_better_get_renamed(&self) {
// TODO: Move this duplicate logic into its own impl
let state = self.state.read().unwrap();
if state.active_spans.is_empty() {
return;
}

let parent_context = Context::new().with_remote_span_context(
state
.guest_spans
.get(*state.active_spans.last().unwrap().1)
.unwrap()
.inner
.span_context()
.clone(),
);
tracing::Span::current().set_parent(parent_context);
}
}

// TODO(Caleb): Reorder things
// TODO(Caleb): Make otel a workspace dependency
1 change: 1 addition & 0 deletions crates/factor-outbound-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ http-body-util = "0.1"
hyper = "1.4.1"
reqwest = { version = "0.11", features = ["gzip"] }
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
spin-factor-observe = { path = "../factor-observe" }
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
spin-factors = { path = "../factors" }
spin-telemetry = { path = "../telemetry" }
Expand Down
4 changes: 4 additions & 0 deletions crates/factor-outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use http::{
uri::{Authority, Parts, PathAndQuery, Scheme},
HeaderValue, Uri,
};
use spin_factor_observe::{ObserveContext, ObserveFactor};
use spin_factor_outbound_networking::{
ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor,
};
Expand Down Expand Up @@ -65,13 +66,15 @@ impl Factor for OutboundHttpFactor {
let outbound_networking = builders.get_mut::<OutboundNetworkingFactor>()?;
let allowed_hosts = outbound_networking.allowed_hosts();
let component_tls_configs = outbound_networking.component_tls_configs().clone();
let observe_context = builders.get_mut::<ObserveFactor>()?.get_observe_context();
Ok(InstanceState {
wasi_http_ctx: WasiHttpCtx::new(),
allowed_hosts,
component_tls_configs,
self_request_origin: None,
request_interceptor: None,
spin_http_client: None,
observe_context,
})
}
}
Expand All @@ -84,6 +87,7 @@ pub struct InstanceState {
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
// Connection-pooling client for 'fermyon:spin/http' interface
spin_http_client: Option<reqwest::Client>,
observe_context: ObserveContext,
}

impl InstanceState {
Expand Down
2 changes: 2 additions & 0 deletions crates/factor-outbound-http/src/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ impl spin_http::Host for crate::InstanceState {
fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
self.observe_context.oh_dear_i_better_get_renamed();

let span = Span::current();
record_request_fields(&span, &req);

Expand Down
2 changes: 2 additions & 0 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
self.state.observe_context.oh_dear_i_better_get_renamed();

// wasmtime-wasi-http fills in scheme and authority for relative URLs
// (e.g. https://:443/<path>), which makes them hard to reason about.
// Undo that here.
Expand Down
4 changes: 2 additions & 2 deletions crates/trigger/src/factors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use spin_runtime_config::TomlRuntimeConfigSource;

#[derive(RuntimeFactors)]
pub struct TriggerFactors {
pub observe: ObserveFactor,
pub wasi: WasiFactor,
pub variables: VariablesFactor,
pub key_value: KeyValueFactor,
Expand All @@ -29,7 +30,6 @@ pub struct TriggerFactors {
pub pg: OutboundPgFactor,
pub mysql: OutboundMysqlFactor,
pub llm: LlmFactor,
pub observe: ObserveFactor,
}

impl TriggerFactors {
Expand All @@ -42,6 +42,7 @@ impl TriggerFactors {
use_gpu: bool,
) -> anyhow::Result<Self> {
Ok(Self {
observe: ObserveFactor::new(),
wasi: wasi_factor(working_dir, allow_transient_writes),
variables: VariablesFactor::default(),
key_value: KeyValueFactor::new(default_key_value_label_resolver),
Expand All @@ -56,7 +57,6 @@ impl TriggerFactors {
spin_factor_llm::spin::default_engine_creator(state_dir, use_gpu)
.context("failed to configure LLM factor")?,
),
observe: ObserveFactor::new(),
})
}
}
Expand Down
Loading

0 comments on commit f9de9d9

Please sign in to comment.