Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): Allow attaching context to event dispatching #205

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions .changeset/shy-countries-press.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"eppo_core": patch
"ruby-sdk": patch
---

[Unstable] Add ability for attaching context to events dispatched
73 changes: 73 additions & 0 deletions eppo_core/src/event_ingestion/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::Str;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, derive_more::From)]
#[serde(untagged)]
pub enum ContextValue {
String(Str),
Number(f64),
Boolean(bool),
Null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think we want to model a potentially absent value as Option<ContextValue>. This will make sure that we don't send null fields and instead erase fields.

This also works well with deserialization because Option<> parses null, so we can get rid of try_from_json() and ContextError

}

#[derive(thiserror::Error, Debug, Clone, PartialEq)]
pub enum ContextError {
#[error("JSON value cannot be an object or an array")]
InvalidContextValueType,
}

impl ContextValue {
pub fn try_from_json(value: Value) -> Result<Self, ContextError> {
match value {
Value::String(s) => Ok(ContextValue::String(s.into())),
Value::Number(n) => Ok(ContextValue::Number(n.as_f64().unwrap())), // Safe unwrap since it's always f64 or i64
Value::Bool(b) => Ok(ContextValue::Boolean(b)),
Value::Null => Ok(ContextValue::Null),
_ => Err(ContextError::InvalidContextValueType),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use super::*;
use serde_json::json;

#[test]
fn test_serialization() {
#[derive(Debug, Serialize)]
struct IngestionRequestBody<'a> {
context: &'a BTreeMap<String, ContextValue>,
}

let mut context = BTreeMap::new();
context.insert("key1".to_string(), ContextValue::String("value1".into()));
context.insert("key2".to_string(), ContextValue::Number(42.0));
context.insert("key3".to_string(), ContextValue::Boolean(true));
context.insert("key4".to_string(), ContextValue::Null);

let body = IngestionRequestBody { context: &context };

let json = serde_json::to_string(&body).unwrap();
assert_eq!(
json,
"{\"context\":{\"key1\":\"value1\",\"key2\":42.0,\"key3\":true,\"key4\":null}}"
);
}

#[test]
fn test_context_invalid_values() {
assert_eq!(
ContextValue::try_from_json(json!({"foo": "bar"})),
Err(ContextError::InvalidContextValueType)
);
assert_eq!(
ContextValue::try_from_json(json!([1, 2, 3])),
Err(ContextError::InvalidContextValueType)
);
}
}
11 changes: 7 additions & 4 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use exponential_backoff::Backoff;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Mutex};

use super::{event::Event, event_delivery::EventDelivery, BatchedMessage};

Expand Down Expand Up @@ -56,7 +56,7 @@ pub(super) struct DeliveryConfig {
pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<Event>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
event_delivery: EventDelivery,
event_delivery: Arc<Mutex<EventDelivery>>,
config: DeliveryConfig,
) -> Option<()> {
// We use this unbounded channel to loop back messages that need retrying.
Expand All @@ -72,7 +72,10 @@ pub(super) async fn delivery(

let BatchedMessage { batch, flush } = msg;

let mut result = event_delivery.deliver(batch).await;
let mut result = {
let delivery = event_delivery.lock().await;
delivery.deliver(batch).await
};

if attempts >= config.max_retries {
// Exceeded max retries -> promote retriable errors to permanent ones.
Expand Down
2 changes: 1 addition & 1 deletion eppo_core/src/event_ingestion/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ pub(super) struct Event {
pub event_type: String,

pub payload: serde_json::Value,
}
}
47 changes: 43 additions & 4 deletions eppo_core/src/event_ingestion/event_delivery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use log::debug;
use reqwest::StatusCode;
Expand All @@ -8,7 +8,7 @@ use uuid::Uuid;

use crate::sdk_key::SdkKey;

use super::{delivery::DeliveryStatus, event::Event};
use super::{delivery::DeliveryStatus, event::Event, ContextValue};

const MAX_EVENT_SERIALIZED_LENGTH: usize = 4096;

Expand All @@ -17,6 +17,7 @@ pub(super) struct EventDelivery {
sdk_key: SdkKey,
ingestion_url: Url,
client: reqwest::Client,
context: HashMap<String, ContextValue>,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ impl From<reqwest::Error> for EventDeliveryError {

#[derive(Debug, Serialize)]
struct IngestionRequestBody<'a> {
context: &'a HashMap<String, ContextValue>,
eppo_events: &'a [Event],
}

Expand All @@ -83,6 +85,7 @@ impl EventDelivery {
sdk_key,
ingestion_url,
client,
context: HashMap::new(),
}
}

Expand Down Expand Up @@ -121,6 +124,10 @@ impl EventDelivery {
status
}

pub fn attach_context(&mut self, key: String, value: ContextValue) {
self.context.insert(key, value);
}

async fn deliver_inner(
&self,
events: &[Event],
Expand All @@ -136,6 +143,7 @@ impl EventDelivery {
debug!("Delivering {} events to {}", events.len(), ingestion_url);

let body = IngestionRequestBody {
context: &self.context,
eppo_events: events,
};

Expand Down Expand Up @@ -181,6 +189,12 @@ mod tests {
.and(path("/"))
.and(header("X-Eppo-Token", "foobar"))
.and(body_json(&json!({
"context": {
"key1": "value1",
"key2": 42.0,
"key3": true,
"key4": null,
},
"eppo_events": [{
"uuid": uuid,
"timestamp": timestamp.timestamp_millis(),
Expand All @@ -196,7 +210,7 @@ mod tests {
.mount(&mock_server)
.await;

let client = EventDelivery::new(
let mut delivery = EventDelivery::new(
reqwest::Client::new(),
SdkKey::new("foobar".into()),
Url::parse(mock_server.uri().as_str()).unwrap(),
Expand All @@ -212,10 +226,35 @@ mod tests {
}),
};

let result = client.deliver(vec![event.clone()]).await;
delivery.attach_context("key1".to_string(), ContextValue::String("value1".into()));
delivery.attach_context("key2".to_string(), ContextValue::Number(42.0));
delivery.attach_context("key3".to_string(), ContextValue::Boolean(true));
delivery.attach_context("key4".to_string(), ContextValue::Null);

let result = delivery.deliver(vec![event.clone()]).await;

assert_eq!(result, DeliveryStatus::success(vec![event]));

mock_server.verify().await;
}

#[test]
fn test_attach_context_valid_values() {
let mut delivery = EventDelivery::new(
reqwest::Client::new(),
SdkKey::new("foobar".into()),
Url::parse("http://example.com").unwrap(),
);

delivery.attach_context("key1".to_string(), ContextValue::String("value1".into()));
delivery.attach_context("key2".to_string(), ContextValue::Number(42.0));
delivery.attach_context("key3".to_string(), ContextValue::Boolean(true));
delivery.attach_context("key4".to_string(), ContextValue::Null);
let ctx = delivery.context;
assert_eq!(ctx.len(), 4);
assert_eq!(ctx["key1"], ContextValue::String("value1".into()));
assert_eq!(ctx["key2"], ContextValue::Number(42.0));
assert_eq!(ctx["key3"], ContextValue::Boolean(true));
assert_eq!(ctx["key4"], ContextValue::Null);
}
}
Loading
Loading