Skip to content

Commit

Permalink
refactor: marker is mdc (#90)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Jan 10, 2025
1 parent 941449e commit 416242c
Show file tree
Hide file tree
Showing 21 changed files with 382 additions and 357 deletions.
2 changes: 1 addition & 1 deletion examples/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {
.dispatch(|d| {
d.append(
logforth::append::Stderr::default()
.with_marker(logforth::marker::TraceIdMarker::default()),
.with_marker(logforth::diagnostic::FastraceDiagnostic::default()),
)
})
.apply();
Expand Down
42 changes: 39 additions & 3 deletions src/append/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use jiff::Zoned;
use log::Record;

use crate::append::Append;
use crate::layout::collect_kvs;
use crate::diagnostic::Visitor;
use crate::Diagnostic;

/// An appender that adds log records to fastrace as an event associated to the current span.
///
Expand All @@ -37,16 +38,24 @@ pub struct FastraceEvent {
}

impl Append for FastraceEvent {
fn append(&self, record: &Record) -> anyhow::Result<()> {
fn append(&self, record: &Record, diagnostics: &[Diagnostic]) -> anyhow::Result<()> {
let message = format!("{}", record.args());

let mut collector = KvCollector { kv: Vec::new() };
record.key_values().visit(&mut collector)?;
for d in diagnostics {
d.visit(&mut collector);
}

fastrace::Event::add_to_local_parent(message, || {
[
(Cow::from("level"), Cow::from(record.level().as_str())),
(Cow::from("timestamp"), Cow::from(Zoned::now().to_string())),
]
.into_iter()
.chain(
collect_kvs(record.key_values())
collector
.kv
.into_iter()
.map(|(k, v)| (Cow::from(k), Cow::from(v))),
)
Expand All @@ -58,3 +67,30 @@ impl Append for FastraceEvent {
fastrace::flush();
}
}

struct KvCollector {
kv: Vec<(String, String)>,
}

impl<'kvs> log::kv::VisitSource<'kvs> for KvCollector {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
self.kv.push((key.to_string(), value.to_string()));
Ok(())
}
}

impl Visitor for KvCollector {
fn visit<'k, 'v, K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'k, str>>,
V: Into<Cow<'v, str>>,
{
let key = key.into().into_owned();
let value = value.into().into_owned();
self.kv.push((key, value));
}
}
34 changes: 25 additions & 9 deletions src/append/journald/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::io;
use std::io::Write;
use std::os::unix::net::UnixDatagram;

use log::Level;
use log::Record;

use crate::diagnostic::Visitor;
use crate::Append;
use crate::Diagnostic;

mod field;
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -237,19 +240,30 @@ impl<'kvs> log::kv::VisitSource<'kvs> for WriteKeyValues<'_> {
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
field::put_field_length_encoded(
self.0,
field::FieldName::WriteEscaped(key.as_str()),
value,
);
let key = key.as_str();
field::put_field_length_encoded(self.0, field::FieldName::WriteEscaped(key), value);
Ok(())
}
}

impl Visitor for WriteKeyValues<'_> {
fn visit<'k, 'v, K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'k, str>>,
V: Into<Cow<'v, str>>,
{
let key = key.into();
let value = value.into();
let key = key.as_ref();
let value = value.as_bytes();
field::put_field_length_encoded(self.0, field::FieldName::WriteEscaped(key), value);
}
}

impl Append for Journald {
/// Extract all fields (standard and custom) from `record`, append all `extra_fields` given
/// to this appender, and send the result to journald.
fn append(&self, record: &Record) -> anyhow::Result<()> {
fn append(&self, record: &Record, diagnostics: &[Diagnostic]) -> anyhow::Result<()> {
use field::*;

let mut buffer = vec![];
Expand Down Expand Up @@ -299,9 +313,11 @@ impl Append for Journald {
record.target().as_bytes(),
);
// Put all structured values of the record
record
.key_values()
.visit(&mut WriteKeyValues(&mut buffer))?;
let mut visitor = WriteKeyValues(&mut buffer);
record.key_values().visit(&mut visitor)?;
for d in diagnostics {
d.visit(&mut visitor);
}
// Put all extra fields of the appender
buffer.extend_from_slice(&self.extra_fields);
self.send_payload(&buffer)?;
Expand Down
4 changes: 3 additions & 1 deletion src/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
use std::fmt;

use crate::Diagnostic;

#[cfg(feature = "fastrace")]
mod fastrace;
#[cfg(all(unix, feature = "journald"))]
Expand Down Expand Up @@ -46,7 +48,7 @@ pub use self::syslog::Syslog;
/// Implementors of this trait can handle log records in custom ways.
pub trait Append: fmt::Debug + Send + Sync + 'static {
/// Dispatches a log record to the append target.
fn append(&self, record: &log::Record) -> anyhow::Result<()>;
fn append(&self, record: &log::Record, diagnostics: &[Diagnostic]) -> anyhow::Result<()>;

/// Flushes any buffered records.
fn flush(&self) {}
Expand Down
104 changes: 47 additions & 57 deletions src/append/opentelemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::LoggerProvider;

use crate::append::Append;
use crate::diagnostic::Visitor;
use crate::Diagnostic;
use crate::Layout;
use crate::Marker;

/// Specifies the wire protocol to use when sending logs to OpenTelemetry.
///
Expand All @@ -56,7 +57,6 @@ pub struct OpentelemetryLogBuilder {
protocol: Protocol,
labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
layout: Option<Layout>,
marker: Option<Marker>,
}

impl OpentelemetryLogBuilder {
Expand All @@ -76,7 +76,6 @@ impl OpentelemetryLogBuilder {
protocol: Protocol::Grpc,
labels: vec![],
layout: None,
marker: None,
}
}

Expand Down Expand Up @@ -155,22 +154,6 @@ impl OpentelemetryLogBuilder {
self
}

/// Sets the marker for the logs.
///
/// # Examples
///
/// ```
/// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
/// use logforth::marker::TraceIdMarker;
///
/// let builder = OpentelemetryLogBuilder::new("my_service", "http://localhost:4317");
/// builder.marker(TraceIdMarker::default());
/// ```
pub fn marker(mut self, marker: impl Into<Marker>) -> Self {
self.marker = Some(marker.into());
self
}

/// Builds the [`OpentelemetryLog`] appender.
///
/// # Examples
Expand All @@ -190,7 +173,6 @@ impl OpentelemetryLogBuilder {
protocol,
labels,
layout,
marker,
} = self;

let collector_timeout =
Expand Down Expand Up @@ -224,7 +206,6 @@ impl OpentelemetryLogBuilder {
Ok(OpentelemetryLog {
name,
layout,
marker,
logger,
provider,
})
Expand All @@ -250,61 +231,41 @@ impl OpentelemetryLogBuilder {
pub struct OpentelemetryLog {
name: String,
layout: Option<Layout>,
marker: Option<Marker>,
logger: opentelemetry_sdk::logs::Logger,
provider: LoggerProvider,
}

impl Append for OpentelemetryLog {
fn append(&self, record: &Record) -> anyhow::Result<()> {
let mut log_record_ = LogRecord::default();
log_record_.observed_timestamp = Some(SystemTime::now());
log_record_.severity_number = Some(log_level_to_otel_severity(record.level()));
log_record_.severity_text = Some(record.level().as_str());
log_record_.target = Some(record.target().to_string().into());
log_record_.body = Some(AnyValue::Bytes(Box::new(match self.layout.as_ref() {
fn append(&self, record: &Record, diagnostics: &[Diagnostic]) -> anyhow::Result<()> {
let mut log_record = LogRecord::default();
log_record.observed_timestamp = Some(SystemTime::now());
log_record.severity_number = Some(log_level_to_otel_severity(record.level()));
log_record.severity_text = Some(record.level().as_str());
log_record.target = Some(record.target().to_string().into());
log_record.body = Some(AnyValue::Bytes(Box::new(match self.layout.as_ref() {
None => record.args().to_string().into_bytes(),
Some(layout) => layout.format(record, self.marker.as_ref())?,
Some(layout) => layout.format(record, diagnostics)?,
})));

if let Some(module_path) = record.module_path() {
log_record_.add_attribute("module_path", module_path.to_string());
log_record.add_attribute("module_path", module_path.to_string());
}
if let Some(file) = record.file() {
log_record_.add_attribute("file", file.to_string());
log_record.add_attribute("file", file.to_string());
}
if let Some(line) = record.line() {
log_record_.add_attribute("line", line);
}

struct KvExtractor<'a> {
record: &'a mut LogRecord,
}

impl<'kvs> log::kv::VisitSource<'kvs> for KvExtractor<'_> {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
self.record
.add_attribute(key.to_string(), value.to_string());
Ok(())
}
log_record.add_attribute("line", line);
}

let mut extractor = KvExtractor {
record: &mut log_record_,
record: &mut log_record,
};
record.key_values().visit(&mut extractor).ok();

if let Some(marker) = &self.marker {
marker.mark(|key, value| {
log_record_.add_attribute(key.to_string(), value.to_string());
});
record.key_values().visit(&mut extractor)?;
for d in diagnostics {
d.visit(&mut extractor);
}

self.logger.emit(log_record_);
self.logger.emit(log_record);
Ok(())
}

Expand All @@ -329,3 +290,32 @@ fn log_level_to_otel_severity(level: log::Level) -> opentelemetry::logs::Severit
log::Level::Trace => opentelemetry::logs::Severity::Trace,
}
}

struct KvExtractor<'a> {
record: &'a mut LogRecord,
}

impl<'kvs> log::kv::VisitSource<'kvs> for KvExtractor<'_> {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
let key = key.to_string();
let value = value.to_string();
self.record.add_attribute(key, value);
Ok(())
}
}

impl Visitor for KvExtractor<'_> {
fn visit<'k, 'v, K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'k, str>>,
V: Into<Cow<'v, str>>,
{
let key = key.into().into_owned();
let value = value.into().into_owned();
self.record.add_attribute(key, value);
}
}
14 changes: 3 additions & 11 deletions src/append/rolling_file/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use crate::append::rolling_file::RollingFileWriter;
use crate::append::Append;
use crate::layout::TextLayout;
use crate::non_blocking::NonBlocking;
use crate::Diagnostic;
use crate::Layout;
use crate::Marker;

/// An appender that writes log records to rolling files.
#[derive(Debug)]
pub struct RollingFile {
layout: Layout,
marker: Option<Marker>,
writer: NonBlocking<RollingFileWriter>,
}

Expand All @@ -36,7 +35,6 @@ impl RollingFile {
pub fn new(writer: NonBlocking<RollingFileWriter>) -> Self {
Self {
layout: TextLayout::default().no_color().into(),
marker: None,
writer,
}
}
Expand All @@ -46,17 +44,11 @@ impl RollingFile {
self.layout = layout.into();
self
}

/// Sets the marker used to add additional fields to log records.
pub fn with_marker(mut self, marker: impl Into<Marker>) -> Self {
self.marker = Some(marker.into());
self
}
}

impl Append for RollingFile {
fn append(&self, record: &Record) -> anyhow::Result<()> {
let mut bytes = self.layout.format(record, self.marker.as_ref())?;
fn append(&self, record: &Record, diagnostics: &[Diagnostic]) -> anyhow::Result<()> {
let mut bytes = self.layout.format(record, diagnostics)?;
bytes.push(b'\n');
self.writer.send(bytes)?;
Ok(())
Expand Down
Loading

0 comments on commit 416242c

Please sign in to comment.