From 9f569400897af121e0b29126aa4f37a6f750cc22 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Mon, 10 Feb 2025 10:45:39 -0800 Subject: [PATCH 01/11] feat(events): Allow attaching context to event dispatching --- .../src/event_ingestion/event_delivery.rs | 77 ++++++++++++++++++- .../src/event_ingestion/event_ingestion.rs | 31 ++++++-- ruby-sdk/ext/eppo_client/src/client.rs | 33 +++++++- ruby-sdk/ext/eppo_client/src/lib.rs | 1 + 4 files changed, 130 insertions(+), 12 deletions(-) diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 3c1f5ba9..4041771c 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -1,8 +1,9 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use log::debug; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; +use serde_json::Value; use url::Url; use uuid::Uuid; @@ -17,6 +18,13 @@ pub(super) struct EventDelivery { sdk_key: SdkKey, ingestion_url: Url, client: reqwest::Client, + context: HashMap, +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq)] +pub enum ContextError { + #[error("JSON value cannot be an object or an array")] + InvalidContextValueType, } #[derive(thiserror::Error, Debug)] @@ -68,6 +76,7 @@ impl From for EventDeliveryError { #[derive(Debug, Serialize)] struct IngestionRequestBody<'a> { + context: &'a HashMap, eppo_events: &'a [Event], } @@ -83,6 +92,7 @@ impl EventDelivery { sdk_key, ingestion_url, client, + context: HashMap::new(), } } @@ -121,6 +131,19 @@ impl EventDelivery { status } + pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { + // ensure value is valid not object or array + return match value { + Value::Object(_) | Value::Array(_) => { + Err(ContextError::InvalidContextValueType) + } + _ => { + self.context.insert(key, value); + Ok(()) + } + } + } + async fn deliver_inner( &self, events: &[Event], @@ -136,6 +159,7 @@ impl EventDelivery { debug!("Delivering {} events to {}", events.len(), ingestion_url); let body = IngestionRequestBody { + context: &self.context, eppo_events: events, }; @@ -181,6 +205,12 @@ mod tests { .and(path("/")) .and(header("X-Eppo-Token", "foobar")) .and(body_json(&json!({ + "context": { + "key1": "value1", + "key2": 42, + "key3": true, + "key4": null, + }, "eppo_events": [{ "uuid": uuid, "timestamp": timestamp.timestamp_millis(), @@ -196,7 +226,7 @@ mod tests { .mount(&mock_server) .await; - let client = EventDelivery::new( + let mut delivery = EventDelivery::new( reqwest::Client::new(), SdkKey::new("foobar".into()), Url::parse(mock_server.uri().as_str()).unwrap(), @@ -212,10 +242,51 @@ mod tests { }), }; - let result = client.deliver(vec![event.clone()]).await; + delivery.attach_context("key1".to_string(), json!("value1")).unwrap(); + delivery.attach_context("key2".to_string(), json!(42)).unwrap(); + delivery.attach_context("key3".to_string(), json!(true)).unwrap(); + delivery.attach_context("key4".to_string(), json!(null)).unwrap(); + + let result = delivery.deliver(vec![event.clone()]).await; assert_eq!(result, DeliveryStatus::success(vec![event])); mock_server.verify().await; } + + #[test] + fn test_attach_context_valid_values() { + let mut delivery = EventDelivery::new( + reqwest::Client::new(), + SdkKey::new("foobar".into()), + Url::parse("http://example.com").unwrap(), + ); + assert!(delivery.attach_context("key1".to_string(), json!("value1")).is_ok()); + assert!(delivery.attach_context("key2".to_string(), json!(42)).is_ok()); + assert!(delivery.attach_context("key3".to_string(), json!(true)).is_ok()); + assert!(delivery.attach_context("key4".to_string(), json!(null)).is_ok()); + assert_eq!(delivery.context.len(), 4); + assert_eq!(delivery.context.get("key1").unwrap(), &json!("value1")); + assert_eq!(delivery.context.get("key2").unwrap(), &json!(42)); + assert_eq!(delivery.context.get("key3").unwrap(), &json!(true)); + assert_eq!(delivery.context.get("key4").unwrap(), &json!(null)); + } + + #[test] + fn test_attach_context_invalid_values() { + let mut delivery = EventDelivery::new( + reqwest::Client::new(), + SdkKey::new("foobar".into()), + Url::parse("http://example.com").unwrap(), + ); + assert_eq!( + delivery.attach_context("key1".to_string(), json!({"foo": "bar"})), + Err(ContextError::InvalidContextValueType) + ); + assert_eq!( + delivery.attach_context("key2".to_string(), json!([1, 2, 3])), + Err(ContextError::InvalidContextValueType) + ); + assert_eq!(delivery.context.len(), 0); + } } diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 84f89a08..1e6b2e86 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -1,5 +1,6 @@ -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; +use serde_json::Value; use tokio::sync::mpsc; use url::Url; use uuid::Uuid; @@ -9,7 +10,7 @@ use crate::{background::BackgroundRuntime, sdk_key::SdkKey}; use super::{ auto_flusher, batcher, delivery::{self, DeliveryConfig}, - event_delivery::EventDelivery, + event_delivery::{ContextError, EventDelivery}, BatchedMessage, Event, }; @@ -60,6 +61,7 @@ impl EventIngestionConfig { /// A handle to Event Ingestion subsystem. pub struct EventIngestion { tx: mpsc::Sender>, + event_delivery: EventDelivery, } impl EventIngestion { @@ -89,7 +91,7 @@ impl EventIngestion { runtime.spawn_untracked(delivery::delivery( delivery_uplink, delivery_status_tx.clone(), - event_delivery, + event_delivery.clone(), DeliveryConfig { max_retries: config.max_retries, base_retry_delay: config.base_retry_delay, @@ -100,7 +102,10 @@ impl EventIngestion { // For now, nobody is interested in delivery statuses. let _ = delivery_status_rx; - EventIngestion { tx: input } + EventIngestion { + tx: input, + event_delivery, + } } pub fn track(&self, event_type: String, payload: serde_json::Value) { @@ -114,6 +119,19 @@ impl EventIngestion { self.track_event(event); } + /// Attaches a context to be included with all events dispatched by the EventDispatcher. + /// The context is delivered as a top-level object in the ingestion request payload. + /// An existing key can be removed by providing a `null` value. + /// Calling this method with same key multiple times causes only the last value to be used for the + /// given key. + /// + /// @param key - The context entry key. + /// @param value - The context entry value, must be a string, number, boolean, or null. If value is + /// an object or an array, will throw an ArgumentError. + pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { + self.event_delivery.attach_context(key, value) + } + fn track_event(&self, event: Event) { let result = self.tx.try_send(BatchedMessage::singleton(event)); @@ -148,6 +166,7 @@ mod tests { Mock::given(method("POST")) .and(path("/")) .and(body_json(&json!({ + "context": {}, "eppo_events": [event.clone()], }))) .and(header("x-eppo-token", "test-sdk-key")) @@ -172,7 +191,7 @@ mod tests { let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path("/")) - .and(body_json(&json!({"eppo_events": [event] }))) + .and(body_json(&json!({"context": {}, "eppo_events": [event] }))) .and(header("x-eppo-token", "test-sdk-key")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "failed_events": [event.uuid], @@ -222,4 +241,4 @@ mod tests { // TODO: use flush instead of sleeping tokio::time::sleep(Duration::from_millis(100)).await; } -} +} \ No newline at end of file diff --git a/ruby-sdk/ext/eppo_client/src/client.rs b/ruby-sdk/ext/eppo_client/src/client.rs index 919edc47..3b81e4d2 100644 --- a/ruby-sdk/ext/eppo_client/src/client.rs +++ b/ruby-sdk/ext/eppo_client/src/client.rs @@ -55,6 +55,15 @@ impl TryConvert for Config { } } +#[magnus::wrap(class = "EventIngestion")] +struct MutEventIngestion(RefCell); + +impl MutEventIngestion { + pub fn new(event_ingestion: EventIngestion) -> Self { + Self(RefCell::new(event_ingestion)) + } +} + #[magnus::wrap(class = "EppoClient::Core::Client")] pub struct Client { configuration_store: Arc, @@ -67,9 +76,10 @@ pub struct Client { // world. background_thread: RefCell>, configuration_poller: Option, - event_ingestion: Option, + event_ingestion: Option, } + impl Client { pub fn new(config: Config) -> Client { // Initialize logger @@ -120,7 +130,7 @@ impl Client { let event_ingestion = config .event_ingestion_config - .map(|config| config.spawn(background_thread.runtime())); + .map(|config| MutEventIngestion::new(config.spawn(background_thread.runtime()))); Client { configuration_store, @@ -255,6 +265,23 @@ impl Client { } } + pub fn set_context(&self, key: String, value: Value) -> Result<()> { + let Some(event_ingestion) = &self.event_ingestion else { + // Event ingestion is disabled, do nothing. + return Ok(()); + }; + let value: serde_json::Value = serde_magnus::deserialize(value).map_err(|err| { + Error::new( + exception::runtime_error(), + format!("Unexpected value for payload: {err}"), + ) + })?; + + event_ingestion.0.borrow_mut().attach_context(key, value); + + Ok(()) + } + pub fn track(&self, event_type: String, payload: Value) -> Result<()> { let Some(event_ingestion) = &self.event_ingestion else { // Event ingestion is disabled, do nothing. @@ -268,7 +295,7 @@ impl Client { ) })?; - event_ingestion.track(event_type, payload); + event_ingestion.0.borrow().track(event_type, payload); Ok(()) } diff --git a/ruby-sdk/ext/eppo_client/src/lib.rs b/ruby-sdk/ext/eppo_client/src/lib.rs index 34bdf71e..17f0e339 100644 --- a/ruby-sdk/ext/eppo_client/src/lib.rs +++ b/ruby-sdk/ext/eppo_client/src/lib.rs @@ -29,6 +29,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { method!(Client::get_bandit_action_details, 5), )?; core_client.define_method("track", method!(Client::track, 2))?; + core_client.define_method("set_context", method!(Client::set_context, 2))?; core_client.define_method("configuration", method!(Client::get_configuration, 0))?; core_client.define_method("configuration=", method!(Client::set_configuration, 1))?; core_client.define_method("shutdown", method!(Client::shutdown, 0))?; From 89be230b1f77cdef9e85a684dacb91810c787932 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Mon, 10 Feb 2025 10:50:14 -0800 Subject: [PATCH 02/11] add ruby api --- .changeset/shy-countries-press.md | 6 +++ eppo_core/src/event_ingestion/event.rs | 2 +- .../src/event_ingestion/event_delivery.rs | 38 +++++++++++++------ .../src/event_ingestion/event_ingestion.rs | 2 +- ruby-sdk/ext/eppo_client/src/client.rs | 1 - ruby-sdk/lib/eppo_client/client.rb | 8 +++- 6 files changed, 41 insertions(+), 16 deletions(-) create mode 100644 .changeset/shy-countries-press.md diff --git a/.changeset/shy-countries-press.md b/.changeset/shy-countries-press.md new file mode 100644 index 00000000..89238a5e --- /dev/null +++ b/.changeset/shy-countries-press.md @@ -0,0 +1,6 @@ +--- +"eppo_core": minor +"ruby-sdk": minor +--- + +[Unstable] Add ability for attaching context to events dispatched diff --git a/eppo_core/src/event_ingestion/event.rs b/eppo_core/src/event_ingestion/event.rs index edfb77b4..12724e9d 100644 --- a/eppo_core/src/event_ingestion/event.rs +++ b/eppo_core/src/event_ingestion/event.rs @@ -15,4 +15,4 @@ pub(super) struct Event { pub event_type: String, pub payload: serde_json::Value, -} \ No newline at end of file +} diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 4041771c..90ffd568 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -134,14 +134,12 @@ impl EventDelivery { pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { // ensure value is valid not object or array return match value { - Value::Object(_) | Value::Array(_) => { - Err(ContextError::InvalidContextValueType) - } + Value::Object(_) | Value::Array(_) => Err(ContextError::InvalidContextValueType), _ => { self.context.insert(key, value); Ok(()) } - } + }; } async fn deliver_inner( @@ -242,10 +240,18 @@ mod tests { }), }; - delivery.attach_context("key1".to_string(), json!("value1")).unwrap(); - delivery.attach_context("key2".to_string(), json!(42)).unwrap(); - delivery.attach_context("key3".to_string(), json!(true)).unwrap(); - delivery.attach_context("key4".to_string(), json!(null)).unwrap(); + delivery + .attach_context("key1".to_string(), json!("value1")) + .unwrap(); + delivery + .attach_context("key2".to_string(), json!(42)) + .unwrap(); + delivery + .attach_context("key3".to_string(), json!(true)) + .unwrap(); + delivery + .attach_context("key4".to_string(), json!(null)) + .unwrap(); let result = delivery.deliver(vec![event.clone()]).await; @@ -261,10 +267,18 @@ mod tests { SdkKey::new("foobar".into()), Url::parse("http://example.com").unwrap(), ); - assert!(delivery.attach_context("key1".to_string(), json!("value1")).is_ok()); - assert!(delivery.attach_context("key2".to_string(), json!(42)).is_ok()); - assert!(delivery.attach_context("key3".to_string(), json!(true)).is_ok()); - assert!(delivery.attach_context("key4".to_string(), json!(null)).is_ok()); + assert!(delivery + .attach_context("key1".to_string(), json!("value1")) + .is_ok()); + assert!(delivery + .attach_context("key2".to_string(), json!(42)) + .is_ok()); + assert!(delivery + .attach_context("key3".to_string(), json!(true)) + .is_ok()); + assert!(delivery + .attach_context("key4".to_string(), json!(null)) + .is_ok()); assert_eq!(delivery.context.len(), 4); assert_eq!(delivery.context.get("key1").unwrap(), &json!("value1")); assert_eq!(delivery.context.get("key2").unwrap(), &json!(42)); diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 1e6b2e86..29e18dc2 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -241,4 +241,4 @@ mod tests { // TODO: use flush instead of sleeping tokio::time::sleep(Duration::from_millis(100)).await; } -} \ No newline at end of file +} diff --git a/ruby-sdk/ext/eppo_client/src/client.rs b/ruby-sdk/ext/eppo_client/src/client.rs index 3b81e4d2..ddafdced 100644 --- a/ruby-sdk/ext/eppo_client/src/client.rs +++ b/ruby-sdk/ext/eppo_client/src/client.rs @@ -79,7 +79,6 @@ pub struct Client { event_ingestion: Option, } - impl Client { pub fn new(config: Config) -> Client { // Initialize logger diff --git a/ruby-sdk/lib/eppo_client/client.rb b/ruby-sdk/lib/eppo_client/client.rb index af5f7b09..c165b68f 100644 --- a/ruby-sdk/lib/eppo_client/client.rb +++ b/ruby-sdk/lib/eppo_client/client.rb @@ -44,11 +44,17 @@ def shutdown end # Unstable - # Enqueues an arbitrary event. Events must have a type and a payload + # Enqueues an arbitrary event. Events must have a type and a payload. def unstable_track(event_type, payload) @core.track(event_type, payload) end + # Unstable + # Sets an arbitrary context key/value pair to be sent with all events. + def unstable_set_context(key, value) + @core.set_context(key, value) + end + def get_string_assignment(flag_key, subject_key, subject_attributes, default_value) get_assignment_inner(flag_key, subject_key, subject_attributes, "STRING", default_value) end From a50e24064ce18b1782d410156584bdbac03393a8 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 09:53:08 -0800 Subject: [PATCH 03/11] replace MutEventIngestion with RefCell --- .../src/event_ingestion/event_ingestion.rs | 6 +++-- ruby-sdk/ext/eppo_client/src/client.rs | 26 +++++++------------ 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 29e18dc2..74c077ca 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -128,8 +128,10 @@ impl EventIngestion { /// @param key - The context entry key. /// @param value - The context entry value, must be a string, number, boolean, or null. If value is /// an object or an array, will throw an ArgumentError. - pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { - self.event_delivery.attach_context(key, value) + pub fn attach_context(&mut self, key: String, value: Value) { + if let Err(err) = self.event_delivery.attach_context(key, value) { + log::warn!(target: "eppo", "failed to attach context: {}", err); + } } fn track_event(&self, event: Event) { diff --git a/ruby-sdk/ext/eppo_client/src/client.rs b/ruby-sdk/ext/eppo_client/src/client.rs index ddafdced..71c6bb28 100644 --- a/ruby-sdk/ext/eppo_client/src/client.rs +++ b/ruby-sdk/ext/eppo_client/src/client.rs @@ -55,15 +55,6 @@ impl TryConvert for Config { } } -#[magnus::wrap(class = "EventIngestion")] -struct MutEventIngestion(RefCell); - -impl MutEventIngestion { - pub fn new(event_ingestion: EventIngestion) -> Self { - Self(RefCell::new(event_ingestion)) - } -} - #[magnus::wrap(class = "EppoClient::Core::Client")] pub struct Client { configuration_store: Arc, @@ -76,7 +67,7 @@ pub struct Client { // world. background_thread: RefCell>, configuration_poller: Option, - event_ingestion: Option, + event_ingestion: RefCell>, } impl Client { @@ -129,14 +120,14 @@ impl Client { let event_ingestion = config .event_ingestion_config - .map(|config| MutEventIngestion::new(config.spawn(background_thread.runtime()))); + .map(|config| config.spawn(background_thread.runtime())); Client { configuration_store, evaluator, background_thread: RefCell::new(Some(background_thread)), configuration_poller, - event_ingestion, + event_ingestion: RefCell::new(event_ingestion), } } @@ -265,7 +256,8 @@ impl Client { } pub fn set_context(&self, key: String, value: Value) -> Result<()> { - let Some(event_ingestion) = &self.event_ingestion else { + let mut binding = self.event_ingestion.borrow_mut(); + let Some(event_ingestion) = binding.as_mut() else { // Event ingestion is disabled, do nothing. return Ok(()); }; @@ -276,13 +268,13 @@ impl Client { ) })?; - event_ingestion.0.borrow_mut().attach_context(key, value); + event_ingestion.attach_context(key, value); Ok(()) } - pub fn track(&self, event_type: String, payload: Value) -> Result<()> { - let Some(event_ingestion) = &self.event_ingestion else { + let binding = self.event_ingestion.borrow(); + let Some(event_ingestion) = binding.as_ref() else { // Event ingestion is disabled, do nothing. return Ok(()); }; @@ -294,7 +286,7 @@ impl Client { ) })?; - event_ingestion.0.borrow().track(event_type, payload); + event_ingestion.track(event_type, payload); Ok(()) } From 5a4d26e358b676886847af5c3cffa17e0c78d58f Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 10:39:49 -0800 Subject: [PATCH 04/11] fix sharing context between ingestion and delivery --- eppo_core/src/event_ingestion/delivery.rs | 9 ++- .../src/event_ingestion/event_delivery.rs | 15 ++-- .../src/event_ingestion/event_ingestion.rs | 79 ++++++++++++++++--- 3 files changed, 79 insertions(+), 24 deletions(-) diff --git a/eppo_core/src/event_ingestion/delivery.rs b/eppo_core/src/event_ingestion/delivery.rs index ec937019..700f1118 100644 --- a/eppo_core/src/event_ingestion/delivery.rs +++ b/eppo_core/src/event_ingestion/delivery.rs @@ -1,7 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use exponential_backoff::Backoff; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use super::{event::Event, event_delivery::EventDelivery, BatchedMessage}; @@ -56,7 +56,7 @@ pub(super) struct DeliveryConfig { pub(super) async fn delivery( mut uplink: mpsc::Receiver>, delivery_status: mpsc::Sender, - event_delivery: EventDelivery, + event_delivery: Arc>, config: DeliveryConfig, ) -> Option<()> { // We use this unbounded channel to loop back messages that need retrying. @@ -72,7 +72,8 @@ pub(super) async fn delivery( let BatchedMessage { batch, flush } = msg; - let mut result = event_delivery.deliver(batch).await; + let delivery = event_delivery.lock().await; + let mut result = delivery.deliver(batch).await; if attempts >= config.max_retries { // Exceeded max retries -> promote retriable errors to permanent ones. diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 90ffd568..4ae6ec1a 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::{collections::{HashMap, HashSet}, sync::Mutex}; use log::debug; use reqwest::StatusCode; @@ -132,7 +132,7 @@ impl EventDelivery { } pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { - // ensure value is valid not object or array + // ensure value is valid (not object or array) return match value { Value::Object(_) | Value::Array(_) => Err(ContextError::InvalidContextValueType), _ => { @@ -279,11 +279,12 @@ mod tests { assert!(delivery .attach_context("key4".to_string(), json!(null)) .is_ok()); - assert_eq!(delivery.context.len(), 4); - assert_eq!(delivery.context.get("key1").unwrap(), &json!("value1")); - assert_eq!(delivery.context.get("key2").unwrap(), &json!(42)); - assert_eq!(delivery.context.get("key3").unwrap(), &json!(true)); - assert_eq!(delivery.context.get("key4").unwrap(), &json!(null)); + let ctx = delivery.context; + assert_eq!(ctx.len(), 4); + assert_eq!(ctx.get("key1").unwrap(), &json!("value1")); + assert_eq!(ctx.get("key2").unwrap(), &json!(42)); + assert_eq!(ctx.get("key3").unwrap(), &json!(true)); + assert_eq!(ctx.get("key4").unwrap(), &json!(null)); } #[test] diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 74c077ca..f98886d7 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -1,7 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{sync::Arc, time::Duration}; use serde_json::Value; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use url::Url; use uuid::Uuid; @@ -10,7 +10,7 @@ use crate::{background::BackgroundRuntime, sdk_key::SdkKey}; use super::{ auto_flusher, batcher, delivery::{self, DeliveryConfig}, - event_delivery::{ContextError, EventDelivery}, + event_delivery::EventDelivery, BatchedMessage, Event, }; @@ -61,18 +61,30 @@ impl EventIngestionConfig { /// A handle to Event Ingestion subsystem. pub struct EventIngestion { tx: mpsc::Sender>, - event_delivery: EventDelivery, + context_sender: mpsc::Sender<(String, Value)> } impl EventIngestion { /// Starts the event ingestion subsystem on the given background runtime. pub fn spawn(runtime: &BackgroundRuntime, config: &EventIngestionConfig) -> EventIngestion { - let event_delivery = EventDelivery::new( + let event_delivery = Arc::new(Mutex::new(EventDelivery::new( reqwest::Client::new(), config.sdk_key.clone(), config.ingestion_url.clone(), - ); - + ))); + + let event_delivery_clone = Arc::clone(&event_delivery); + let (context_sender, mut context_rx) = mpsc::channel::<(String, Value)>(100); + runtime.spawn_untracked(async move { + while let Some((key, value)) = context_rx.recv().await { + let mut event_delivery = event_delivery_clone.lock().await; + if let Err(err) = event_delivery.attach_context(key, value) { + log::warn!(target: "eppo", "Failed to attach context: {}", err); + } + } + }); + + let event_delivery_clone = Arc::clone(&event_delivery); let (input, flusher_uplink) = mpsc::channel(config.max_queue_size); let (flusher_downlink, batcher_uplink) = mpsc::channel(1); let (batcher_downlink, delivery_uplink) = mpsc::channel(1); @@ -91,7 +103,7 @@ impl EventIngestion { runtime.spawn_untracked(delivery::delivery( delivery_uplink, delivery_status_tx.clone(), - event_delivery.clone(), + event_delivery_clone, DeliveryConfig { max_retries: config.max_retries, base_retry_delay: config.base_retry_delay, @@ -104,7 +116,7 @@ impl EventIngestion { EventIngestion { tx: input, - event_delivery, + context_sender, } } @@ -128,9 +140,9 @@ impl EventIngestion { /// @param key - The context entry key. /// @param value - The context entry value, must be a string, number, boolean, or null. If value is /// an object or an array, will throw an ArgumentError. - pub fn attach_context(&mut self, key: String, value: Value) { - if let Err(err) = self.event_delivery.attach_context(key, value) { - log::warn!(target: "eppo", "failed to attach context: {}", err); + pub fn attach_context(&self, key: String, value: Value) { + if self.context_sender.try_send((key, value)).is_err() { + log::warn!(target: "eppo", "Failed to send context update to worker"); } } @@ -138,7 +150,7 @@ impl EventIngestion { let result = self.tx.try_send(BatchedMessage::singleton(event)); if let Err(err) = result { - log::warn!(target: "eppo", "failed to submit event to event ingestion: {}", err); + log::warn!(target: "eppo", "Failed to submit event to event ingestion: {}", err); } } } @@ -207,6 +219,47 @@ mod tests { mock_server.verify().await; } + #[tokio::test] + async fn test_attach_context() { + init(); + + let event = new_test_event(); + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/")) + .and(body_json(&json!({ + "context": { + "string": "value", + "number": 42, + "boolean": true, + "null": null, + }, + "eppo_events": [event.clone()], + }))) + .and(header("x-eppo-token", "test-sdk-key")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "failed_events": [] + }))) + .expect(1) + .mount(&mock_server) + .await; + + let batch_size = 1; + let config = new_test_event_config(Url::parse(&mock_server.uri()).unwrap(), batch_size); + let runtime = BackgroundRuntime::new(tokio::runtime::Handle::current()); + let event_ingestion = config.spawn(&runtime); + event_ingestion.attach_context("string".to_string(), json!("value")); + event_ingestion.attach_context("number".to_string(), json!(42)); + event_ingestion.attach_context("boolean".to_string(), json!(true)); + event_ingestion.attach_context("null".to_string(), json!(null)); + event_ingestion.track_event(event); + // wait some time for the async task to finish + // TODO: use flush instead of sleeping + tokio::time::sleep(Duration::from_millis(100)).await; + + mock_server.verify().await; + } + fn new_test_event() -> Event { Event { uuid: Uuid::new_v4(), From 3e70d02ac2335f1a84f481b012b385f64a853f28 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 10:40:02 -0800 Subject: [PATCH 05/11] fmt --- eppo_core/src/event_ingestion/event_delivery.rs | 5 ++++- eppo_core/src/event_ingestion/event_ingestion.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 4ae6ec1a..8f06b4fc 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -1,4 +1,7 @@ -use std::{collections::{HashMap, HashSet}, sync::Mutex}; +use std::{ + collections::{HashMap, HashSet}, + sync::Mutex, +}; use log::debug; use reqwest::StatusCode; diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index f98886d7..98c28154 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -61,7 +61,7 @@ impl EventIngestionConfig { /// A handle to Event Ingestion subsystem. pub struct EventIngestion { tx: mpsc::Sender>, - context_sender: mpsc::Sender<(String, Value)> + context_sender: mpsc::Sender<(String, Value)>, } impl EventIngestion { From 28acc7f67cc85354a3262025623ed856fe06d733 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 10:40:51 -0800 Subject: [PATCH 06/11] patch release --- .changeset/shy-countries-press.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changeset/shy-countries-press.md b/.changeset/shy-countries-press.md index 89238a5e..96da7047 100644 --- a/.changeset/shy-countries-press.md +++ b/.changeset/shy-countries-press.md @@ -1,6 +1,6 @@ --- -"eppo_core": minor -"ruby-sdk": minor +"eppo_core": patch +"ruby-sdk": patch --- [Unstable] Add ability for attaching context to events dispatched From 04c4c3320cf4ccaaa2f5c5f5c1f4b2a305b9cb76 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 10:44:24 -0800 Subject: [PATCH 07/11] better error handling --- eppo_core/src/event_ingestion/event_ingestion.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 98c28154..612dc262 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -141,8 +141,9 @@ impl EventIngestion { /// @param value - The context entry value, must be a string, number, boolean, or null. If value is /// an object or an array, will throw an ArgumentError. pub fn attach_context(&self, key: String, value: Value) { - if self.context_sender.try_send((key, value)).is_err() { - log::warn!(target: "eppo", "Failed to send context update to worker"); + let result = self.context_sender.try_send((key, value)); + if let Err(err) = result { + log::warn!(target: "eppo", "Failed to send context update to worker: {}", err); } } From 33ce145604c35682bff234eb4eec912fa5d876fb Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 13:33:46 -0800 Subject: [PATCH 08/11] switch to context value enum --- eppo_core/src/event_ingestion/context.rs | 73 +++++++++++++++ .../src/event_ingestion/event_delivery.rs | 89 +++++-------------- .../src/event_ingestion/event_ingestion.rs | 23 +++-- eppo_core/src/event_ingestion/mod.rs | 2 + eppo_core/src/lib.rs | 1 + ruby-sdk/ext/eppo_client/src/client.rs | 13 ++- 6 files changed, 116 insertions(+), 85 deletions(-) create mode 100644 eppo_core/src/event_ingestion/context.rs diff --git a/eppo_core/src/event_ingestion/context.rs b/eppo_core/src/event_ingestion/context.rs new file mode 100644 index 00000000..3f547294 --- /dev/null +++ b/eppo_core/src/event_ingestion/context.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::Str; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, derive_more::From)] +#[serde(untagged)] +pub enum ContextValue { + String(Str), + Number(f64), + Boolean(bool), + Null, +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq)] +pub enum ContextError { + #[error("JSON value cannot be an object or an array")] + InvalidContextValueType, +} + +impl ContextValue { + pub fn try_from_json(value: Value) -> Result { + match value { + Value::String(s) => Ok(ContextValue::String(s.into())), + Value::Number(n) => Ok(ContextValue::Number(n.as_f64().unwrap())), // Safe unwrap since it's always f64 or i64 + Value::Bool(b) => Ok(ContextValue::Boolean(b)), + Value::Null => Ok(ContextValue::Null), + _ => Err(ContextError::InvalidContextValueType), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::*; + use serde_json::json; + + #[test] + fn test_serialization() { + #[derive(Debug, Serialize)] + struct IngestionRequestBody<'a> { + context: &'a BTreeMap, + } + + let mut context = BTreeMap::new(); + context.insert("key1".to_string(), ContextValue::String("value1".into())); + context.insert("key2".to_string(), ContextValue::Number(42.0)); + context.insert("key3".to_string(), ContextValue::Boolean(true)); + context.insert("key4".to_string(), ContextValue::Null); + + let body = IngestionRequestBody { context: &context }; + + let json = serde_json::to_string(&body).unwrap(); + assert_eq!( + json, + "{\"context\":{\"key1\":\"value1\",\"key2\":42.0,\"key3\":true,\"key4\":null}}" + ); + } + + #[test] + fn test_context_invalid_values() { + assert_eq!( + ContextValue::try_from_json(json!({"foo": "bar"})), + Err(ContextError::InvalidContextValueType) + ); + assert_eq!( + ContextValue::try_from_json(json!([1, 2, 3])), + Err(ContextError::InvalidContextValueType) + ); + } +} diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 8f06b4fc..1f77bbfc 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -1,18 +1,14 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::Mutex, -}; +use std::collections::{HashMap, HashSet}; use log::debug; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use serde_json::Value; use url::Url; use uuid::Uuid; use crate::sdk_key::SdkKey; -use super::{delivery::DeliveryStatus, event::Event}; +use super::{delivery::DeliveryStatus, event::Event, ContextValue}; const MAX_EVENT_SERIALIZED_LENGTH: usize = 4096; @@ -21,13 +17,7 @@ pub(super) struct EventDelivery { sdk_key: SdkKey, ingestion_url: Url, client: reqwest::Client, - context: HashMap, -} - -#[derive(thiserror::Error, Debug, Clone, PartialEq)] -pub enum ContextError { - #[error("JSON value cannot be an object or an array")] - InvalidContextValueType, + context: HashMap, } #[derive(thiserror::Error, Debug)] @@ -79,7 +69,7 @@ impl From for EventDeliveryError { #[derive(Debug, Serialize)] struct IngestionRequestBody<'a> { - context: &'a HashMap, + context: &'a HashMap, eppo_events: &'a [Event], } @@ -134,15 +124,9 @@ impl EventDelivery { status } - pub fn attach_context(&mut self, key: String, value: Value) -> Result<(), ContextError> { + pub fn attach_context(&mut self, key: String, value: ContextValue) { // ensure value is valid (not object or array) - return match value { - Value::Object(_) | Value::Array(_) => Err(ContextError::InvalidContextValueType), - _ => { - self.context.insert(key, value); - Ok(()) - } - }; + self.context.insert(key, value); } async fn deliver_inner( @@ -208,7 +192,7 @@ mod tests { .and(body_json(&json!({ "context": { "key1": "value1", - "key2": 42, + "key2": 42.0, "key3": true, "key4": null, }, @@ -243,18 +227,10 @@ mod tests { }), }; - delivery - .attach_context("key1".to_string(), json!("value1")) - .unwrap(); - delivery - .attach_context("key2".to_string(), json!(42)) - .unwrap(); - delivery - .attach_context("key3".to_string(), json!(true)) - .unwrap(); - delivery - .attach_context("key4".to_string(), json!(null)) - .unwrap(); + delivery.attach_context("key1".to_string(), ContextValue::String("value1".into())); + delivery.attach_context("key2".to_string(), ContextValue::Number(42.0)); + delivery.attach_context("key3".to_string(), ContextValue::Boolean(true)); + delivery.attach_context("key4".to_string(), ContextValue::Null); let result = delivery.deliver(vec![event.clone()]).await; @@ -270,41 +246,16 @@ mod tests { SdkKey::new("foobar".into()), Url::parse("http://example.com").unwrap(), ); - assert!(delivery - .attach_context("key1".to_string(), json!("value1")) - .is_ok()); - assert!(delivery - .attach_context("key2".to_string(), json!(42)) - .is_ok()); - assert!(delivery - .attach_context("key3".to_string(), json!(true)) - .is_ok()); - assert!(delivery - .attach_context("key4".to_string(), json!(null)) - .is_ok()); + + delivery.attach_context("key1".to_string(), ContextValue::String("value1".into())); + delivery.attach_context("key2".to_string(), ContextValue::Number(42.0)); + delivery.attach_context("key3".to_string(), ContextValue::Boolean(true)); + delivery.attach_context("key4".to_string(), ContextValue::Null); let ctx = delivery.context; assert_eq!(ctx.len(), 4); - assert_eq!(ctx.get("key1").unwrap(), &json!("value1")); - assert_eq!(ctx.get("key2").unwrap(), &json!(42)); - assert_eq!(ctx.get("key3").unwrap(), &json!(true)); - assert_eq!(ctx.get("key4").unwrap(), &json!(null)); - } - - #[test] - fn test_attach_context_invalid_values() { - let mut delivery = EventDelivery::new( - reqwest::Client::new(), - SdkKey::new("foobar".into()), - Url::parse("http://example.com").unwrap(), - ); - assert_eq!( - delivery.attach_context("key1".to_string(), json!({"foo": "bar"})), - Err(ContextError::InvalidContextValueType) - ); - assert_eq!( - delivery.attach_context("key2".to_string(), json!([1, 2, 3])), - Err(ContextError::InvalidContextValueType) - ); - assert_eq!(delivery.context.len(), 0); + assert_eq!(ctx["key1"], ContextValue::String("value1".into())); + assert_eq!(ctx["key2"], ContextValue::Number(42.0)); + assert_eq!(ctx["key3"], ContextValue::Boolean(true)); + assert_eq!(ctx["key4"], ContextValue::Null); } } diff --git a/eppo_core/src/event_ingestion/event_ingestion.rs b/eppo_core/src/event_ingestion/event_ingestion.rs index 612dc262..e5115158 100644 --- a/eppo_core/src/event_ingestion/event_ingestion.rs +++ b/eppo_core/src/event_ingestion/event_ingestion.rs @@ -1,6 +1,5 @@ use std::{sync::Arc, time::Duration}; -use serde_json::Value; use tokio::sync::{mpsc, Mutex}; use url::Url; use uuid::Uuid; @@ -11,7 +10,7 @@ use super::{ auto_flusher, batcher, delivery::{self, DeliveryConfig}, event_delivery::EventDelivery, - BatchedMessage, Event, + BatchedMessage, ContextValue, Event, }; #[derive(Debug, Clone)] @@ -61,7 +60,7 @@ impl EventIngestionConfig { /// A handle to Event Ingestion subsystem. pub struct EventIngestion { tx: mpsc::Sender>, - context_sender: mpsc::Sender<(String, Value)>, + context_sender: mpsc::Sender<(String, ContextValue)>, } impl EventIngestion { @@ -74,13 +73,11 @@ impl EventIngestion { ))); let event_delivery_clone = Arc::clone(&event_delivery); - let (context_sender, mut context_rx) = mpsc::channel::<(String, Value)>(100); + let (context_sender, mut context_rx) = mpsc::channel::<(String, ContextValue)>(100); runtime.spawn_untracked(async move { while let Some((key, value)) = context_rx.recv().await { let mut event_delivery = event_delivery_clone.lock().await; - if let Err(err) = event_delivery.attach_context(key, value) { - log::warn!(target: "eppo", "Failed to attach context: {}", err); - } + event_delivery.attach_context(key, value) } }); @@ -140,7 +137,7 @@ impl EventIngestion { /// @param key - The context entry key. /// @param value - The context entry value, must be a string, number, boolean, or null. If value is /// an object or an array, will throw an ArgumentError. - pub fn attach_context(&self, key: String, value: Value) { + pub fn attach_context(&self, key: String, value: ContextValue) { let result = self.context_sender.try_send((key, value)); if let Err(err) = result { log::warn!(target: "eppo", "Failed to send context update to worker: {}", err); @@ -231,7 +228,7 @@ mod tests { .and(body_json(&json!({ "context": { "string": "value", - "number": 42, + "number": 42.0, "boolean": true, "null": null, }, @@ -249,10 +246,10 @@ mod tests { let config = new_test_event_config(Url::parse(&mock_server.uri()).unwrap(), batch_size); let runtime = BackgroundRuntime::new(tokio::runtime::Handle::current()); let event_ingestion = config.spawn(&runtime); - event_ingestion.attach_context("string".to_string(), json!("value")); - event_ingestion.attach_context("number".to_string(), json!(42)); - event_ingestion.attach_context("boolean".to_string(), json!(true)); - event_ingestion.attach_context("null".to_string(), json!(null)); + event_ingestion.attach_context("string".to_string(), ContextValue::String("value".into())); + event_ingestion.attach_context("number".to_string(), ContextValue::Number(42.0)); + event_ingestion.attach_context("boolean".to_string(), ContextValue::Boolean(true)); + event_ingestion.attach_context("null".to_string(), ContextValue::Null); event_ingestion.track_event(event); // wait some time for the async task to finish // TODO: use flush instead of sleeping diff --git a/eppo_core/src/event_ingestion/mod.rs b/eppo_core/src/event_ingestion/mod.rs index ecb201e0..bfadc7c9 100644 --- a/eppo_core/src/event_ingestion/mod.rs +++ b/eppo_core/src/event_ingestion/mod.rs @@ -1,6 +1,7 @@ mod auto_flusher; mod batched_message; mod batcher; +mod context; mod delivery; mod event; mod event_delivery; @@ -9,4 +10,5 @@ mod event_ingestion; use batched_message::BatchedMessage; use event::Event; +pub use context::ContextValue; pub use event_ingestion::{EventIngestion, EventIngestionConfig}; diff --git a/eppo_core/src/lib.rs b/eppo_core/src/lib.rs index 2a9faa8f..38680815 100644 --- a/eppo_core/src/lib.rs +++ b/eppo_core/src/lib.rs @@ -79,6 +79,7 @@ pub use attributes::{ }; pub use configuration::Configuration; pub use error::{Error, EvaluationError, Result}; +pub use event_ingestion::ContextValue; #[cfg(feature = "event_ingestion")] pub use sdk_key::SdkKey; pub use sdk_metadata::SdkMetadata; diff --git a/ruby-sdk/ext/eppo_client/src/client.rs b/ruby-sdk/ext/eppo_client/src/client.rs index 71c6bb28..26264838 100644 --- a/ruby-sdk/ext/eppo_client/src/client.rs +++ b/ruby-sdk/ext/eppo_client/src/client.rs @@ -9,7 +9,7 @@ use eppo_core::{ }, configuration_store::ConfigurationStore, eval::{Evaluator, EvaluatorConfig}, - event_ingestion::{EventIngestion, EventIngestionConfig}, + event_ingestion::{ContextValue, EventIngestion, EventIngestionConfig}, ufc::VariationType, Attributes, ContextAttributes, SdkKey, }; @@ -264,11 +264,18 @@ impl Client { let value: serde_json::Value = serde_magnus::deserialize(value).map_err(|err| { Error::new( exception::runtime_error(), - format!("Unexpected value for payload: {err}"), + format!("Unexpected value: {}", err), + ) + })?; + + let context_value = ContextValue::try_from_json(value).map_err(|err| { + Error::new( + exception::runtime_error(), + format!("Unexpected value for context value: {}", err), ) })?; - event_ingestion.attach_context(key, value); + event_ingestion.attach_context(key, context_value); Ok(()) } From 0fdc926f6837dbb2ac264e708576c08281b923ce Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 13:35:41 -0800 Subject: [PATCH 09/11] fix build --- eppo_core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eppo_core/src/lib.rs b/eppo_core/src/lib.rs index 38680815..85c4d688 100644 --- a/eppo_core/src/lib.rs +++ b/eppo_core/src/lib.rs @@ -79,7 +79,7 @@ pub use attributes::{ }; pub use configuration::Configuration; pub use error::{Error, EvaluationError, Result}; -pub use event_ingestion::ContextValue; #[cfg(feature = "event_ingestion")] +pub use event_ingestion::ContextValue; pub use sdk_key::SdkKey; pub use sdk_metadata::SdkMetadata; From 4a6d1ab15a369543088363849a01bdc39b5684d7 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Tue, 11 Feb 2025 13:37:11 -0800 Subject: [PATCH 10/11] remove unneeded company --- eppo_core/src/event_ingestion/event_delivery.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/eppo_core/src/event_ingestion/event_delivery.rs b/eppo_core/src/event_ingestion/event_delivery.rs index 1f77bbfc..5fd58e94 100644 --- a/eppo_core/src/event_ingestion/event_delivery.rs +++ b/eppo_core/src/event_ingestion/event_delivery.rs @@ -125,7 +125,6 @@ impl EventDelivery { } pub fn attach_context(&mut self, key: String, value: ContextValue) { - // ensure value is valid (not object or array) self.context.insert(key, value); } From 1e17deadd300aa1ae7946906a6f2a4e18816e064 Mon Sep 17 00:00:00 2001 From: Felipe Lima Date: Fri, 14 Feb 2025 15:08:24 -0800 Subject: [PATCH 11/11] Update eppo_core/src/event_ingestion/delivery.rs Co-authored-by: Oleksii Shmalko --- eppo_core/src/event_ingestion/delivery.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eppo_core/src/event_ingestion/delivery.rs b/eppo_core/src/event_ingestion/delivery.rs index 700f1118..06aa5c24 100644 --- a/eppo_core/src/event_ingestion/delivery.rs +++ b/eppo_core/src/event_ingestion/delivery.rs @@ -72,8 +72,10 @@ pub(super) async fn delivery( let BatchedMessage { batch, flush } = msg; - let delivery = event_delivery.lock().await; - let mut result = delivery.deliver(batch).await; + let mut result = { + let delivery = event_delivery.lock().await; + delivery.deliver(batch).await + }; if attempts >= config.max_retries { // Exceeded max retries -> promote retriable errors to permanent ones.