Skip to content

Commit

Permalink
handle keyed and flattened values separately
Browse files Browse the repository at this point in the history
  • Loading branch information
mladedav committed Sep 4, 2024
1 parent 9eee25c commit d57d623
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 78 deletions.
5 changes: 3 additions & 2 deletions src/fmt/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::names::{
THREAD_NAME,
TIMESTAMP,
};
use crate::layer::{JsonLayer, SchemaKey};
use crate::layer::{FlatSchemaKey, JsonLayer};

/// A [`Layer`] that logs JSON formatted representations of `tracing` events.
///
Expand Down Expand Up @@ -341,7 +341,8 @@ where
self.inner.remove_field(FIELDS);
self.inner.with_flattened_event();
} else {
self.inner.remove_field_inner(&SchemaKey::FlattenedEvent);
self.inner
.remove_flattened_field(&FlatSchemaKey::FlattenedEvent);
self.inner.with_event(FIELDS);
}
self
Expand Down
71 changes: 30 additions & 41 deletions src/layer/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,43 +91,21 @@ where
let mut serialized_anything = false;
let mut serialized_anything_serde = false;

for (key, value) in &self.schema {
for (SchemaKey::Static(key), value) in &self.keyed_values {
let Some(value) = resolve_json_value(value, &event_ref) else {
continue;
};

match value {
MaybeCached::Serde(value) => {
match key {
SchemaKey::Static(key) => {
if serialized_anything && !serialized_anything_serde {
writer.inner_mut().push(',');
}
serialized_anything = true;
serialized_anything_serde = true;
serializer.serialize_entry(key, &value)?;
},
SchemaKey::Uuid(_) | SchemaKey::FlattenedEvent => {
let map = value.as_object().unwrap();
if !map.is_empty() {
if serialized_anything && !serialized_anything_serde {
writer.inner_mut().push(',');
}
serialized_anything = true;
serialized_anything_serde = true;
for (key, value) in map {
serializer.serialize_entry(key, value)?;
}
}
},
if serialized_anything && !serialized_anything_serde {
writer.inner_mut().push(',');
}
serialized_anything = true;
serialized_anything_serde = true;
serializer.serialize_entry(key, &value)?;
},
MaybeCached::Cached(Cached::Raw(raw)) => {
let SchemaKey::Static(key) = key else {
panic!(
"[json-subscriber] provided raw cached value has invalid key: \
{key:?}"
);
};
debug_assert!(
serde_json::to_value(&*raw).is_ok(),
"[json-subscriber] provided cached value is not valid json: {raw}",
Expand All @@ -143,12 +121,6 @@ where
writer.push_str(&raw);
},
MaybeCached::Cached(Cached::Array(arr)) => {
let SchemaKey::Static(key) = key else {
panic!(
"[json-subscriber] provided raw cached array has invalid key: \
{key:?}"
);
};
let mut writer = writer.inner_mut();
if serialized_anything {
writer.push(',');
Expand All @@ -173,12 +145,6 @@ where
writer.push(']');
},
MaybeCached::Raw(raw_fun) => {
let SchemaKey::Static(key) = key else {
panic!(
"[json-subscriber] provided raw value factory has invalid key: \
{key:?}"
);
};
let mut writer = writer.inner_mut();
let rollback_position = writer.len();
if serialized_anything {
Expand Down Expand Up @@ -209,6 +175,29 @@ where
}
}

for value in self.flattened_values.values() {
let Some(value) = resolve_json_value(value, &event_ref) else {
continue;
};

match value {
MaybeCached::Serde(value) => {
let map = value.as_object().unwrap();
if !map.is_empty() {
if serialized_anything && !serialized_anything_serde {
writer.inner_mut().push(',');
}
serialized_anything = true;
serialized_anything_serde = true;
for (key, value) in map {
serializer.serialize_entry(key, value)?;
}
}
},
MaybeCached::Cached(_) | MaybeCached::Raw(_) => todo!(),
}
}

serializer.end()
};

Expand Down
82 changes: 47 additions & 35 deletions src/layer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ use crate::{
pub struct JsonLayer<S: for<'lookup> LookupSpan<'lookup> = Registry, W = fn() -> io::Stdout> {
make_writer: W,
log_internal_errors: bool,
schema: BTreeMap<SchemaKey, JsonValue<S>>,
keyed_values: BTreeMap<SchemaKey, JsonValue<S>>,
flattened_values: BTreeMap<FlatSchemaKey, JsonValue<S>>,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum SchemaKey {
Static(Cow<'static, str>),
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum FlatSchemaKey {
Uuid(Uuid),
FlattenedEvent,
}

impl SchemaKey {
impl FlatSchemaKey {
fn new_uuid() -> Self {
Self::Uuid(uuid::Uuid::new_v4())
}
Expand Down Expand Up @@ -213,7 +218,8 @@ where
JsonLayer::<S, W> {
make_writer,
log_internal_errors: false,
schema: BTreeMap::new(),
keyed_values: BTreeMap::new(),
flattened_values: BTreeMap::new(),
}
}
}
Expand Down Expand Up @@ -244,7 +250,8 @@ where
JsonLayer {
make_writer,
log_internal_errors: self.log_internal_errors,
schema: self.schema,
keyed_values: self.keyed_values,
flattened_values: self.flattened_values,
}
}

Expand Down Expand Up @@ -306,7 +313,8 @@ where
JsonLayer {
make_writer: TestWriter::default(),
log_internal_errors: self.log_internal_errors,
schema: self.schema,
keyed_values: self.keyed_values,
flattened_values: self.flattened_values,
}
}

Expand Down Expand Up @@ -350,7 +358,8 @@ where
JsonLayer {
make_writer: f(self.make_writer),
log_internal_errors: self.log_internal_errors,
schema: self.schema,
keyed_values: self.keyed_values,
flattened_values: self.flattened_values,
}
}

Expand All @@ -373,7 +382,7 @@ where
/// # fn get_hostname() -> &'static str { "localhost" }
/// ```
pub fn add_static_field(&mut self, key: impl Into<String>, value: serde_json::Value) {
self.schema
self.keyed_values
.insert(SchemaKey::from(key.into()), JsonValue::Serde(value));
}

Expand All @@ -395,19 +404,19 @@ where
/// # tracing_subscriber::registry().with(layer);
/// ```
pub fn remove_field(&mut self, key: impl Into<String>) {
self.remove_field_inner(&SchemaKey::from(key.into()));
self.keyed_values.remove(&SchemaKey::from(key.into()));
}

pub(crate) fn remove_field_inner(&mut self, key: &SchemaKey) {
self.schema.remove(key);
pub(crate) fn remove_flattened_field(&mut self, key: &FlatSchemaKey) {
self.flattened_values.remove(key);
}

pub fn add_dynamic_field<Fun, Res>(&mut self, key: impl Into<String>, mapper: Fun)
where
for<'a> Fun: Fn(&'a Event<'_>, &Context<'_, S>) -> Option<Res> + Send + Sync + 'a,
Res: serde::Serialize,
{
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromEvent(Box::new(move |event| {
serde_json::to_value(mapper(event.event(), event.context())).ok()
Expand All @@ -420,8 +429,8 @@ where
for<'a> Fun: Fn(&'a Event<'_>, &Context<'_, S>) -> Res + Send + Sync + 'a,
Res: IntoIterator<Item = (String, serde_json::Value)>,
{
self.schema.insert(
SchemaKey::new_uuid(),
self.flattened_values.insert(
FlatSchemaKey::new_uuid(),
JsonValue::DynamicFromEvent(Box::new(move |event| {
serde_json::to_value(
mapper(event.event(), event.context())
Expand All @@ -438,7 +447,7 @@ where
for<'a> Fun: Fn(&'a SpanRef<'_, S>) -> Option<Res> + Send + Sync + 'a,
Res: serde::Serialize,
{
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromSpan(Box::new(move |span| {
serde_json::to_value(mapper(span)).ok()
Expand Down Expand Up @@ -545,7 +554,7 @@ where
for<'a> Fun: Fn(&'a Ext) -> Option<&'a Res> + Send + Sync + 'a,
Res: serde::Serialize,
{
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromSpan(Box::new(move |span| {
let extensions = span.extensions();
Expand Down Expand Up @@ -608,7 +617,7 @@ where
for<'a> Fun: Fn(&'a Ext) -> Option<Res> + Send + Sync + 'a,
Res: serde::Serialize,
{
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromSpan(Box::new(move |span| {
let extensions = span.extensions();
Expand All @@ -625,26 +634,29 @@ where
/// clash with other defined fields. If they clash, invalid JSON with multiple fields with the
/// same key may be generated.
pub fn with_event(&mut self, key: impl Into<String>) -> &mut Self {
self.with_event_inner(SchemaKey::from(key.into()))
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromEvent(Box::new(move |event| {
serde_json::to_value(event.field_map()).ok()
})),
);
self
}

pub fn with_flattened_event(&mut self) -> &mut Self {
self.with_event_inner(SchemaKey::FlattenedEvent)
}

fn with_event_inner(&mut self, key: SchemaKey) -> &mut Self {
self.schema.insert(
key,
self.flattened_values.insert(
FlatSchemaKey::FlattenedEvent,
JsonValue::DynamicFromEvent(Box::new(move |event| {
serde_json::to_value(event.field_map()).ok()
})),
);
self
}

/// Sets whether or not the log line will include the current span in formatted events. If set
/// to true, it will be printed with the key `span`.
pub fn with_current_span(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicCachedFromSpan(Box::new(move |span| {
span.extensions()
Expand All @@ -658,7 +670,7 @@ where
/// Sets whether or not the formatter will include a list (from root to leaf) of all currently
/// entered spans in formatted events. If set to true, it will be printed with the key `spans`.
pub fn with_span_list(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicCachedFromSpan(Box::new(|span| {
Some(Cached::Array(
Expand All @@ -682,7 +694,7 @@ where
///
/// This overrides any previous calls to [`with_span_list`](Self::with_span_list).
pub(crate) fn with_flattened_span_fields(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromSpan(Box::new(|span| {
let fields =
Expand Down Expand Up @@ -720,7 +732,7 @@ where
key: impl Into<String>,
timer: T,
) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicFromEvent(Box::new(move |_| {
let mut timestamp = String::with_capacity(32);
Expand All @@ -733,7 +745,7 @@ where

/// Sets whether or not an event's target is displayed. It will use the `target` key if so.
pub fn with_target(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|event, writer| {
write_escaped(writer, event.metadata().target())
Expand All @@ -748,7 +760,7 @@ where
///
/// [file]: tracing_core::Metadata::file
pub fn with_file(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|event, writer| {
event
Expand All @@ -765,7 +777,7 @@ where
///
/// [line]: tracing_core::Metadata::line
pub fn with_line_number(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|event, writer| {
event
Expand All @@ -779,7 +791,7 @@ where

/// Sets whether or not an event's level is displayed. It will use the `level` key if so.
pub fn with_level(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|event, writer| {
write_escaped(writer, event.metadata().level().as_str())
Expand All @@ -793,7 +805,7 @@ where
///
/// [name]: std::thread#naming-threads
pub fn with_thread_names(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|_event, writer| {
std::thread::current()
Expand All @@ -809,7 +821,7 @@ where
///
/// [thread ID]: std::thread::ThreadId
pub fn with_thread_ids(&mut self, key: impl Into<String>) -> &mut Self {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from(key.into()),
JsonValue::DynamicRawFromEvent(Box::new(|_event, writer| {
use std::fmt::Write;
Expand All @@ -834,7 +846,7 @@ where
use tracing_opentelemetry::OtelData;

if display_opentelemetry_ids {
self.schema.insert(
self.keyed_values.insert(
SchemaKey::from("openTelemetry"),
JsonValue::DynamicFromSpan(Box::new(|span| {
span.extensions().get::<OtelData>().and_then(|otel_data| {
Expand All @@ -855,7 +867,7 @@ where
})),
);
} else {
self.schema.remove(&SchemaKey::from("openTelemetry"));
self.keyed_values.remove(&SchemaKey::from("openTelemetry"));
}

self
Expand Down

0 comments on commit d57d623

Please sign in to comment.