Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wasi-observe): WASI Observe factor implementation #2787

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ openssl = { version = "0.10" }
anyhow = { workspace = true, features = ["backtrace"] }
conformance = { path = "tests/conformance-tests" }
conformance-tests = { workspace = true }
fake-opentelemetry-collector = "0.21.1"
hex = "0.4"
http-body-util = { workspace = true }
hyper = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ anyhow = { workspace = true }
lru = "0.12"
serde = { workspace = true }
spin-core = { path = "../core" }
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-locked-app = { path = "../locked-app" }
spin-world = { path = "../world" }
Expand Down
24 changes: 23 additions & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_factor_observe::ObserveContext;
use spin_world::v2::key_value;
use std::{collections::HashSet, sync::Arc};
use table::Table;
Expand Down Expand Up @@ -36,22 +37,25 @@ pub struct KeyValueDispatch {
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
stores: Table<Arc<dyn Store>>,
observe_context: Option<ObserveContext>,
}

impl KeyValueDispatch {
pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY, None)
}

pub fn new_with_capacity(
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
capacity: u32,
observe_context: Option<ObserveContext>,
) -> Self {
Self {
allowed_stores,
manager,
stores: Table::new(capacity),
observe_context,
}
}

Expand All @@ -71,6 +75,9 @@ impl key_value::Host for KeyValueDispatch {}
impl key_value::HostStore for KeyValueDispatch {
#[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
Ok(async {
if self.allowed_stores.contains(&name) {
let store = self
Expand All @@ -91,6 +98,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<Option<Vec<u8>>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get(&key).await)
}
Expand All @@ -102,6 +112,9 @@ impl key_value::HostStore for KeyValueDispatch {
key: String,
value: Vec<u8>,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.set(&key, &value).await)
}
Expand All @@ -112,6 +125,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<(), Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.delete(&key).await)
}
Expand All @@ -122,6 +138,9 @@ impl key_value::HostStore for KeyValueDispatch {
store: Resource<key_value::Store>,
key: String,
) -> Result<Result<bool, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.exists(&key).await)
}
Expand All @@ -131,6 +150,9 @@ impl key_value::HostStore for KeyValueDispatch {
&mut self,
store: Resource<key_value::Store>,
) -> Result<Result<Vec<String>, Error>> {
if let Some(observe_context) = self.observe_context.as_ref() {
observe_context.reparent_tracing_span()
}
let store = self.get_store(store)?;
Ok(store.get_keys().await)
}
Expand Down
8 changes: 7 additions & 1 deletion crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use anyhow::ensure;
use spin_factor_observe::ObserveContext;
use spin_factors::{
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
};
Expand Down Expand Up @@ -81,17 +82,19 @@ impl Factor for KeyValueFactor {

fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
mut ctx: PrepareContext<T, Self>,
) -> anyhow::Result<InstanceBuilder> {
let app_state = ctx.app_state();
let allowed_stores = app_state
.component_allowed_stores
.get(ctx.app_component().id())
.expect("component should be in component_stores")
.clone();
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
Ok(InstanceBuilder {
store_manager: app_state.store_manager.clone(),
allowed_stores,
observe_context,
})
}
}
Expand Down Expand Up @@ -140,6 +143,7 @@ pub struct InstanceBuilder {
store_manager: Arc<AppStoreManager>,
/// The allowed stores for this component instance.
allowed_stores: HashSet<String>,
observe_context: ObserveContext,
}

impl FactorInstanceBuilder for InstanceBuilder {
Expand All @@ -149,11 +153,13 @@ impl FactorInstanceBuilder for InstanceBuilder {
let Self {
store_manager,
allowed_stores,
observe_context,
} = self;
Ok(KeyValueDispatch::new_with_capacity(
allowed_stores,
store_manager,
u32::MAX,
Some(observe_context),
))
}
}
1 change: 1 addition & 0 deletions crates/factor-llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"]
anyhow = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
spin-factor-observe = { path = "../factor-observe" }
spin-factors = { path = "../factors" }
spin-llm-local = { path = "../llm-local", optional = true }
spin-llm-remote-http = { path = "../llm-remote-http" }
Expand Down
4 changes: 4 additions & 0 deletions crates/factor-llm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ impl v2::Host for InstanceState {
prompt: String,
params: Option<v2::InferencingParams>,
) -> Result<v2::InferencingResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down Expand Up @@ -42,6 +44,8 @@ impl v2::Host for InstanceState {
model: v1::EmbeddingModel,
data: Vec<String>,
) -> Result<v2::EmbeddingsResult, v2::Error> {
self.observe_context.reparent_tracing_span();

if !self.allowed_models.contains(&model) {
return Err(access_denied_error(&model));
}
Expand Down
6 changes: 5 additions & 1 deletion crates/factor-llm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use async_trait::async_trait;
use spin_factor_observe::ObserveContext;
use spin_factors::{
ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
};
Expand Down Expand Up @@ -76,7 +77,7 @@ impl Factor for LlmFactor {

fn prepare<T: RuntimeFactors>(
&self,
ctx: PrepareContext<T, Self>,
mut ctx: PrepareContext<T, Self>,
) -> anyhow::Result<Self::InstanceBuilder> {
let allowed_models = ctx
.app_state()
Expand All @@ -85,10 +86,12 @@ impl Factor for LlmFactor {
.cloned()
.unwrap_or_default();
let engine = ctx.app_state().engine.clone();
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;

Ok(InstanceState {
engine,
allowed_models,
observe_context,
})
}
}
Expand All @@ -103,6 +106,7 @@ pub struct AppState {
pub struct InstanceState {
engine: Arc<Mutex<dyn LlmEngine>>,
pub allowed_models: Arc<HashSet<String>>,
observe_context: ObserveContext,
}

/// The runtime configuration for the LLM factor.
Expand Down
Loading
Loading