diff --git a/.changeset/shy-countries-press.md b/.changeset/shy-countries-press.md new file mode 100644 index 00000000..96da7047 --- /dev/null +++ b/.changeset/shy-countries-press.md @@ -0,0 +1,6 @@ +--- +"eppo_core": patch +"ruby-sdk": patch +--- + +[Unstable] Add ability for attaching context to events dispatched diff --git a/eppo_core/src/event_ingestion/context.rs b/eppo_core/src/event_ingestion/context.rs new file mode 100644 index 00000000..3f547294 --- /dev/null +++ b/eppo_core/src/event_ingestion/context.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::Str; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, derive_more::From)] +#[serde(untagged)] +pub enum ContextValue { + String(Str), + Number(f64), + Boolean(bool), + Null, +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq)] +pub enum ContextError { + #[error("JSON value cannot be an object or an array")] + InvalidContextValueType, +} + +impl ContextValue { + pub fn try_from_json(value: Value) -> Result { + match value { + Value::String(s) => Ok(ContextValue::String(s.into())), + Value::Number(n) => Ok(ContextValue::Number(n.as_f64().unwrap())), // Safe unwrap since it's always f64 or i64 + Value::Bool(b) => Ok(ContextValue::Boolean(b)), + Value::Null => Ok(ContextValue::Null), + _ => Err(ContextError::InvalidContextValueType), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::*; + use serde_json::json; + + #[test] + fn test_serialization() { + #[derive(Debug, Serialize)] + struct IngestionRequestBody<'a> { + context: &'a BTreeMap, + } + + let mut context = BTreeMap::new(); + context.insert("key1".to_string(), ContextValue::String("value1".into())); + context.insert("key2".to_string(), ContextValue::Number(42.0)); + context.insert("key3".to_string(), ContextValue::Boolean(true)); + context.insert("key4".to_string(), ContextValue::Null); + + let body = IngestionRequestBody { context: &context }; + + let json = serde_json::to_string(&body).unwrap(); + assert_eq!( + json, + "{\"context\":{\"key1\":\"value1\",\"key2\":42.0,\"key3\":true,\"key4\":null}}" + ); + } + + #[test] + fn test_context_invalid_values() { + assert_eq!( + ContextValue::try_from_json(json!({"foo": "bar"})), + Err(ContextError::InvalidContextValueType) + ); + assert_eq!( + ContextValue::try_from_json(json!([1, 2, 3])), + Err(ContextError::InvalidContextValueType) + ); + } +} diff --git a/eppo_core/src/event_ingestion/delivery.rs b/eppo_core/src/event_ingestion/delivery.rs index ec937019..06aa5c24 100644 --- a/eppo_core/src/event_ingestion/delivery.rs +++ b/eppo_core/src/event_ingestion/delivery.rs @@ -1,7 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use exponential_backoff::Backoff; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use super::{event::Event, event_delivery::EventDelivery, BatchedMessage}; @@ -56,7 +56,7 @@ pub(super) struct DeliveryConfig { pub(super) async fn delivery( mut uplink: mpsc::Receiver>, delivery_status: mpsc::Sender, - event_delivery: EventDelivery, + event_delivery: Arc>, config: DeliveryConfig, ) -> Option<()> { // We use this unbounded channel to loop back messages that need retrying. @@ -72,7 +72,10 @@ pub(super) async fn delivery( let BatchedMessage { batch, flush } = msg; - let mut result = event_delivery.deliver(batch).await; + let mut result = { + let delivery = event_delivery.lock().await; + delivery.deliver(batch).await + }; if attempts >= config.max_retries { // Exceeded max retries -> promote retriable errors to permanent ones. diff --git a/eppo_core/src/event_ingestion/event.rs b/eppo_core/src/event_ingestion/event.rs index edfb77b4..12724e9d 100644 --- a/eppo_core/src/event_ingestion/event.rs +++ b/eppo_core/src/event_ingestion/event.rs @@ -15,4 +15,4 @@ pub(super) struct Event { pub event_type: String, pub payload: serde_json::Value, -} \ No newline at end of file +} diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 3c1f5ba9..5fd58e94 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use log::debug; use reqwest::StatusCode; @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::sdk_key::SdkKey; -use super::{delivery::DeliveryStatus, event::Event}; +use super::{delivery::DeliveryStatus, event::Event, ContextValue}; const MAX_EVENT_SERIALIZED_LENGTH: usize = 4096; @@ -17,6 +17,7 @@ pub(super) struct EventDelivery { sdk_key: SdkKey, ingestion_url: Url, client: reqwest::Client, + context: HashMap, } #[derive(thiserror::Error, Debug)] @@ -68,6 +69,7 @@ impl From for EventDeliveryError { #[derive(Debug, Serialize)] struct IngestionRequestBody<'a> { + context: &'a HashMap, eppo_events: &'a [Event], } @@ -83,6 +85,7 @@ impl EventDelivery { sdk_key, ingestion_url, client, + context: HashMap::new(), } } @@ -121,6 +124,10 @@ impl EventDelivery { status } + pub fn attach_context(&mut self, key: String, value: ContextValue) { + self.context.insert(key, value); + } + async fn deliver_inner( &self, events: &[Event], @@ -136,6 +143,7 @@ impl EventDelivery { debug!("Delivering {} events to {}", events.len(), ingestion_url); let body = IngestionRequestBody { + context: &self.context, eppo_events: events, }; @@ -181,6 +189,12 @@ mod tests { .and(path("/")) .and(header("X-Eppo-Token", "foobar")) .and(body_json(&json!({ + "context": { + "key1": "value1", + "key2": 42.0, + "key3": true, + "key4": null, + }, "eppo_events": [{ "uuid": uuid, "timestamp": timestamp.timestamp_millis(), @@ -196,7 +210,7 @@ mod tests { .mount(&mock_server) .await; - let client = EventDelivery::new( + let mut delivery = EventDelivery::new( reqwest::Client::new(), SdkKey::new("foobar".into()), Url::parse(mock_server.uri().as_str()).unwrap(), @@ -212,10 +226,35 @@ mod tests { }), }; - let result = client.deliver(vec![event.clone()]).await; + delivery.attach_context("key1".to_string(), ContextValue::String("value1".into())); + delivery.attach_context("key2".to_string(), ContextValue::Number(42.0)); + delivery.attach_context("key3".to_string(), ContextValue::Boolean(true)); + delivery.attach_context("key4".to_string(), ContextValue::Null); + + let result = delivery.deliver(vec![event.clone()]).await; assert_eq!(result, DeliveryStatus::success(vec![event])); mock_server.verify().await; } + + #[test] + fn test_attach_context_valid_values() { + let mut delivery = EventDelivery::new( + reqwest::Client::new(), + SdkKey::new("foobar".into()), + Url::parse("http://example.com").unwrap(), + ); + + delivery.attach_context("key1".to_string(), ContextValue::String("value1".into())); + delivery.attach_context("key2".to_string(), ContextValue::Number(42.0)); + delivery.attach_context("key3".to_string(), ContextValue::Boolean(true)); + delivery.attach_context("key4".to_string(), ContextValue::Null); + let ctx = delivery.context; + assert_eq!(ctx.len(), 4); + assert_eq!(ctx["key1"], ContextValue::String("value1".into())); + assert_eq!(ctx["key2"], ContextValue::Number(42.0)); + assert_eq!(ctx["key3"], ContextValue::Boolean(true)); + assert_eq!(ctx["key4"], ContextValue::Null); + } } diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 84f89a08..e5115158 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -1,6 +1,6 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use url::Url; use uuid::Uuid; @@ -10,7 +10,7 @@ use super::{ auto_flusher, batcher, delivery::{self, DeliveryConfig}, event_delivery::EventDelivery, - BatchedMessage, Event, + BatchedMessage, ContextValue, Event, }; #[derive(Debug, Clone)] @@ -60,17 +60,28 @@ impl EventIngestionConfig { /// A handle to Event Ingestion subsystem. pub struct EventIngestion { tx: mpsc::Sender>, + context_sender: mpsc::Sender<(String, ContextValue)>, } impl EventIngestion { /// Starts the event ingestion subsystem on the given background runtime. pub fn spawn(runtime: &BackgroundRuntime, config: &EventIngestionConfig) -> EventIngestion { - let event_delivery = EventDelivery::new( + let event_delivery = Arc::new(Mutex::new(EventDelivery::new( reqwest::Client::new(), config.sdk_key.clone(), config.ingestion_url.clone(), - ); - + ))); + + let event_delivery_clone = Arc::clone(&event_delivery); + let (context_sender, mut context_rx) = mpsc::channel::<(String, ContextValue)>(100); + runtime.spawn_untracked(async move { + while let Some((key, value)) = context_rx.recv().await { + let mut event_delivery = event_delivery_clone.lock().await; + event_delivery.attach_context(key, value) + } + }); + + let event_delivery_clone = Arc::clone(&event_delivery); let (input, flusher_uplink) = mpsc::channel(config.max_queue_size); let (flusher_downlink, batcher_uplink) = mpsc::channel(1); let (batcher_downlink, delivery_uplink) = mpsc::channel(1); @@ -89,7 +100,7 @@ impl EventIngestion { runtime.spawn_untracked(delivery::delivery( delivery_uplink, delivery_status_tx.clone(), - event_delivery, + event_delivery_clone, DeliveryConfig { max_retries: config.max_retries, base_retry_delay: config.base_retry_delay, @@ -100,7 +111,10 @@ impl EventIngestion { // For now, nobody is interested in delivery statuses. let _ = delivery_status_rx; - EventIngestion { tx: input } + EventIngestion { + tx: input, + context_sender, + } } pub fn track(&self, event_type: String, payload: serde_json::Value) { @@ -114,11 +128,27 @@ impl EventIngestion { self.track_event(event); } + /// Attaches a context to be included with all events dispatched by the EventDispatcher. + /// The context is delivered as a top-level object in the ingestion request payload. + /// An existing key can be removed by providing a `null` value. + /// Calling this method with same key multiple times causes only the last value to be used for the + /// given key. + /// + /// @param key - The context entry key. + /// @param value - The context entry value, must be a string, number, boolean, or null. If value is + /// an object or an array, will throw an ArgumentError. + pub fn attach_context(&self, key: String, value: ContextValue) { + let result = self.context_sender.try_send((key, value)); + if let Err(err) = result { + log::warn!(target: "eppo", "Failed to send context update to worker: {}", err); + } + } + fn track_event(&self, event: Event) { let result = self.tx.try_send(BatchedMessage::singleton(event)); if let Err(err) = result { - log::warn!(target: "eppo", "failed to submit event to event ingestion: {}", err); + log::warn!(target: "eppo", "Failed to submit event to event ingestion: {}", err); } } } @@ -148,6 +178,7 @@ mod tests { Mock::given(method("POST")) .and(path("/")) .and(body_json(&json!({ + "context": {}, "eppo_events": [event.clone()], }))) .and(header("x-eppo-token", "test-sdk-key")) @@ -172,7 +203,7 @@ mod tests { let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path("/")) - .and(body_json(&json!({"eppo_events": [event] }))) + .and(body_json(&json!({"context": {}, "eppo_events": [event] }))) .and(header("x-eppo-token", "test-sdk-key")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "failed_events": [event.uuid], @@ -186,6 +217,47 @@ mod tests { mock_server.verify().await; } + #[tokio::test] + async fn test_attach_context() { + init(); + + let event = new_test_event(); + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/")) + .and(body_json(&json!({ + "context": { + "string": "value", + "number": 42.0, + "boolean": true, + "null": null, + }, + "eppo_events": [event.clone()], + }))) + .and(header("x-eppo-token", "test-sdk-key")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "failed_events": [] + }))) + .expect(1) + .mount(&mock_server) + .await; + + let batch_size = 1; + let config = new_test_event_config(Url::parse(&mock_server.uri()).unwrap(), batch_size); + let runtime = BackgroundRuntime::new(tokio::runtime::Handle::current()); + let event_ingestion = config.spawn(&runtime); + event_ingestion.attach_context("string".to_string(), ContextValue::String("value".into())); + event_ingestion.attach_context("number".to_string(), ContextValue::Number(42.0)); + event_ingestion.attach_context("boolean".to_string(), ContextValue::Boolean(true)); + event_ingestion.attach_context("null".to_string(), ContextValue::Null); + event_ingestion.track_event(event); + // wait some time for the async task to finish + // TODO: use flush instead of sleeping + tokio::time::sleep(Duration::from_millis(100)).await; + + mock_server.verify().await; + } + fn new_test_event() -> Event { Event { uuid: Uuid::new_v4(), diff --git a/eppo_core/src/event_ingestion/mod.rs b/eppo_core/src/event_ingestion/mod.rs index ecb201e0..bfadc7c9 100644 --- a/eppo_core/src/event_ingestion/mod.rs +++ b/eppo_core/src/event_ingestion/mod.rs @@ -1,6 +1,7 @@ mod auto_flusher; mod batched_message; mod batcher; +mod context; mod delivery; mod event; mod event_delivery; @@ -9,4 +10,5 @@ mod event_ingestion; use batched_message::BatchedMessage; use event::Event; +pub use context::ContextValue; pub use event_ingestion::{EventIngestion, EventIngestionConfig}; diff --git a/eppo_core/src/lib.rs b/eppo_core/src/lib.rs index 2a9faa8f..85c4d688 100644 --- a/eppo_core/src/lib.rs +++ b/eppo_core/src/lib.rs @@ -80,5 +80,6 @@ pub use attributes::{ pub use configuration::Configuration; pub use error::{Error, EvaluationError, Result}; #[cfg(feature = "event_ingestion")] +pub use event_ingestion::ContextValue; pub use sdk_key::SdkKey; pub use sdk_metadata::SdkMetadata; diff --git a/ruby-sdk/ext/eppo_client/src/client.rs b/ruby-sdk/ext/eppo_client/src/client.rs index 919edc47..26264838 100644 --- a/ruby-sdk/ext/eppo_client/src/client.rs +++ b/ruby-sdk/ext/eppo_client/src/client.rs @@ -9,7 +9,7 @@ use eppo_core::{ }, configuration_store::ConfigurationStore, eval::{Evaluator, EvaluatorConfig}, - event_ingestion::{EventIngestion, EventIngestionConfig}, + event_ingestion::{ContextValue, EventIngestion, EventIngestionConfig}, ufc::VariationType, Attributes, ContextAttributes, SdkKey, }; @@ -67,7 +67,7 @@ pub struct Client { // world. background_thread: RefCell>, configuration_poller: Option, - event_ingestion: Option, + event_ingestion: RefCell>, } impl Client { @@ -127,7 +127,7 @@ impl Client { evaluator, background_thread: RefCell::new(Some(background_thread)), configuration_poller, - event_ingestion, + event_ingestion: RefCell::new(event_ingestion), } } @@ -255,8 +255,33 @@ impl Client { } } + pub fn set_context(&self, key: String, value: Value) -> Result<()> { + let mut binding = self.event_ingestion.borrow_mut(); + let Some(event_ingestion) = binding.as_mut() else { + // Event ingestion is disabled, do nothing. + return Ok(()); + }; + let value: serde_json::Value = serde_magnus::deserialize(value).map_err(|err| { + Error::new( + exception::runtime_error(), + format!("Unexpected value: {}", err), + ) + })?; + + let context_value = ContextValue::try_from_json(value).map_err(|err| { + Error::new( + exception::runtime_error(), + format!("Unexpected value for context value: {}", err), + ) + })?; + + event_ingestion.attach_context(key, context_value); + + Ok(()) + } pub fn track(&self, event_type: String, payload: Value) -> Result<()> { - let Some(event_ingestion) = &self.event_ingestion else { + let binding = self.event_ingestion.borrow(); + let Some(event_ingestion) = binding.as_ref() else { // Event ingestion is disabled, do nothing. return Ok(()); }; diff --git a/ruby-sdk/ext/eppo_client/src/lib.rs b/ruby-sdk/ext/eppo_client/src/lib.rs index 34bdf71e..17f0e339 100644 --- a/ruby-sdk/ext/eppo_client/src/lib.rs +++ b/ruby-sdk/ext/eppo_client/src/lib.rs @@ -29,6 +29,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { method!(Client::get_bandit_action_details, 5), )?; core_client.define_method("track", method!(Client::track, 2))?; + core_client.define_method("set_context", method!(Client::set_context, 2))?; core_client.define_method("configuration", method!(Client::get_configuration, 0))?; core_client.define_method("configuration=", method!(Client::set_configuration, 1))?; core_client.define_method("shutdown", method!(Client::shutdown, 0))?; diff --git a/ruby-sdk/lib/eppo_client/client.rb b/ruby-sdk/lib/eppo_client/client.rb index af5f7b09..c165b68f 100644 --- a/ruby-sdk/lib/eppo_client/client.rb +++ b/ruby-sdk/lib/eppo_client/client.rb @@ -44,11 +44,17 @@ def shutdown end # Unstable - # Enqueues an arbitrary event. Events must have a type and a payload + # Enqueues an arbitrary event. Events must have a type and a payload. def unstable_track(event_type, payload) @core.track(event_type, payload) end + # Unstable + # Sets an arbitrary context key/value pair to be sent with all events. + def unstable_set_context(key, value) + @core.set_context(key, value) + end + def get_string_assignment(flag_key, subject_key, subject_attributes, default_value) get_assignment_inner(flag_key, subject_key, subject_attributes, "STRING", default_value) end