Skip to content

Commit

Permalink
Migrate LogSchema::message_key to new lookup code
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Jul 19, 2023
1 parent b00727e commit a08a393
Show file tree
Hide file tree
Showing 53 changed files with 644 additions and 383 deletions.
2 changes: 1 addition & 1 deletion benches/codecs/character_delimited_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| CharacterDelimitedDecoder::new_with_max_length(b'a', ml))
.unwrap_or(CharacterDelimitedDecoder::new(b'a')),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer {});
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
2 changes: 1 addition & 1 deletion benches/codecs/newline_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn decoding(c: &mut Criterion) {
.map(|ml| NewlineDelimitedDecoder::new_with_max_length(ml))
.unwrap_or(NewlineDelimitedDecoder::new()),
);
let deserializer = Deserializer::Bytes(BytesDeserializer::new());
let deserializer = Deserializer::Bytes(BytesDeserializer {});
let decoder = vector::codecs::Decoder::new(framer, deserializer);

(Box::new(decoder), param.input.clone())
Expand Down
37 changes: 13 additions & 24 deletions lib/codecs/src/decoding/format/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::Bytes;
use lookup::lookup_v2::parse_value_path;
use lookup::OwnedTargetPath;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
Expand All @@ -9,6 +8,7 @@ use vector_core::{
event::{Event, LogEvent},
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use super::Deserializer;
Expand All @@ -25,7 +25,7 @@ impl BytesDeserializerConfig {

/// Build the `BytesDeserializer` from this configuration.
pub fn build(&self) -> BytesDeserializer {
BytesDeserializer::new()
BytesDeserializer {}
}

/// Return the type of event build by this deserializer.
Expand All @@ -37,7 +37,7 @@ impl BytesDeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
),
Expand All @@ -54,32 +54,18 @@ impl BytesDeserializerConfig {
/// This deserializer can be considered as the no-op action for input where no
/// further decoding has been specified.
#[derive(Debug, Clone)]
pub struct BytesDeserializer {
// Only used with the "Legacy" namespace. The "Vector" namespace decodes the data at the root of the event.
log_schema_message_key: &'static str,
}

impl Default for BytesDeserializer {
fn default() -> Self {
Self::new()
}
}
pub struct BytesDeserializer {}

impl BytesDeserializer {
/// Creates a new `BytesDeserializer`.
pub fn new() -> Self {
Self {
log_schema_message_key: log_schema().message_key(),
}
}

/// Deserializes the given bytes, which will always produce a single `LogEvent`.
pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
match log_namespace {
LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
LogNamespace::Legacy => {
let mut log = LogEvent::default();
log.insert(self.log_schema_message_key, bytes);
if let Some(message_key) = log_schema().message_key() {
log.insert((PathPrefix::Event, message_key), bytes);
}
log
}
}
Expand Down Expand Up @@ -107,15 +93,18 @@ mod tests {
#[test]
fn deserialize_bytes_legacy_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer {};

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
let mut events = events.into_iter();

{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log[log_schema().message_key()], "foo".into());
assert_eq!(
log[log_schema().message_key().unwrap().to_string()],
"foo".into()
);
}

assert_eq!(events.next(), None);
Expand All @@ -124,7 +113,7 @@ mod tests {
#[test]
fn deserialize_bytes_vector_namespace() {
let input = Bytes::from("foo");
let deserializer = BytesDeserializer::new();
let deserializer = BytesDeserializer {};

let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
assert_eq!(events.len(), 1);
Expand Down
4 changes: 2 additions & 2 deletions lib/codecs/src/decoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ mod tests {
Some(&Value::Bytes(Bytes::from_static(b"example.org")))
);
assert_eq!(
log.get(log_schema().message_key()),
log.get((PathPrefix::Event, log_schema().message_key().unwrap())),
Some(&Value::Bytes(Bytes::from_static(
b"A short message that helps you identify what is going on"
)))
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
let events = deserialize_gelf_input(&input).unwrap();
assert_eq!(events.len(), 1);
let log = events[0].as_log();
assert!(log.contains(log_schema().message_key()));
assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap())));
}

// filter out id
Expand Down
16 changes: 10 additions & 6 deletions lib/codecs/src/decoding/format/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::Bytes;
use chrono::{DateTime, Datelike, Utc};
use derivative::Derivative;
use lookup::lookup_v2::parse_value_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
Expand Down Expand Up @@ -71,7 +70,7 @@ impl SyslogDeserializerConfig {
// The `message` field is always defined. If parsing fails, the entire body becomes the
// message.
.with_event_field(
&parse_value_path(log_schema().message_key()).expect("valid message key"),
log_schema().message_key().expect("valid message key"),
Kind::bytes(),
Some("message"),
);
Expand Down Expand Up @@ -429,7 +428,9 @@ fn insert_fields_from_syslog(
) {
match log_namespace {
LogNamespace::Legacy => {
log.insert(event_path!(log_schema().message_key()), parsed.msg);
if let Some(message_key) = log_schema().message_key() {
log.insert((PathPrefix::Event, message_key), parsed.msg);
}
}
LogNamespace::Vector => {
log.insert(event_path!("message"), parsed.msg);
Expand Down Expand Up @@ -500,7 +501,10 @@ mod tests {

let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].as_log()[log_schema().message_key()], "MSG".into());
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
"MSG".into()
);
assert!(
events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()
);
Expand All @@ -522,8 +526,8 @@ mod tests {

fn init() {
let mut schema = LogSchema::default();
schema.set_message_key("legacy_message".to_string());
schema.set_message_key("legacy_timestamp".to_string());
schema.set_message_key(Some(owned_value_path!("legacy_message")));
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
init_log_schema(schema, false);
}
}
17 changes: 10 additions & 7 deletions lib/codecs/src/encoding/format/gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vector_core::{
event::Value,
schema,
};
use vrl::path::PathPrefix;

/// On GELF encoding behavior:
/// Graylog has a relaxed parsing. They are much more lenient than the spec would
Expand Down Expand Up @@ -138,13 +139,15 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent>
err_missing_field(HOST)?;
}

let message_key = log_schema().message_key();
if !log.contains(SHORT_MESSAGE) {
// rename the log_schema().message_key() to SHORT_MESSAGE
if log.contains(message_key) {
log.rename_key(message_key, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
if let Some(message_key) = log_schema().message_key() {
// rename the log_schema().message_key() to SHORT_MESSAGE
let target_path = (PathPrefix::Event, message_key);
if log.contains(target_path) {
log.rename_key(target_path, SHORT_MESSAGE);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
}
}
Ok(log)
Expand Down Expand Up @@ -329,7 +332,7 @@ mod tests {
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
log_schema().message_key() => "Some message",
log_schema().message_key().unwrap().to_string() => "Some message",
};

let jsn = do_serialize(true, event_fields).unwrap();
Expand Down
27 changes: 16 additions & 11 deletions lib/codecs/src/encoding/format/raw_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vector_core::{
event::Event,
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

/// Config used to build a `RawMessageSerializer`.
Expand All @@ -30,7 +31,11 @@ impl RawMessageSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
if let Some(message_key) = log_schema().message_key() {
schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any())
} else {
schema::Requirement::empty()
}
}
}

Expand All @@ -49,16 +54,16 @@ impl Encoder<Event> for RawMessageSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

let log = event.as_log();

if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
buffer.put(bytes);
if let Some(message_key) = log_schema().message_key() {
let log = event.as_log();

if let Some(bytes) = log
.get_by_meaning(message_key.to_string().as_str())
.or_else(|| log.get((PathPrefix::Event, message_key)))
.map(|value| value.coerce_to_bytes())
{
buffer.put(bytes);
}
}

Ok(())
Expand Down
23 changes: 14 additions & 9 deletions lib/codecs/src/encoding/format/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use vector_core::{
event::Event,
schema,
};
use vrl::path::PathPrefix;
use vrl::value::Kind;

use crate::MetricTagValues;
Expand Down Expand Up @@ -42,7 +43,11 @@ impl TextSerializerConfig {

/// The schema required by the serializer.
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any())
if let Some(message_key) = log_schema().message_key() {
schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any())
} else {
schema::Requirement::empty()
}
}
}

Expand All @@ -67,16 +72,16 @@ impl Encoder<Event> for TextSerializer {
type Error = vector_common::Error;

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message_key = log_schema().message_key();

match event {
Event::Log(log) => {
if let Some(bytes) = log
.get_by_meaning(message_key)
.or_else(|| log.get(message_key))
.map(|value| value.coerce_to_bytes())
{
buffer.put(bytes);
if let Some(message_key) = log_schema().message_key() {
if let Some(bytes) = log
.get_by_meaning(message_key.to_string().as_str())
.or(log.get((PathPrefix::Event, message_key)))
.map(|value| value.coerce_to_bytes())
{
buffer.put(bytes);
}
}
}
Event::Metric(mut metric) => {
Expand Down
5 changes: 4 additions & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent},
};
use vrl::path::PathPrefix;
use vrl::value::Value;

use super::proto::{
Expand Down Expand Up @@ -94,7 +95,9 @@ impl ResourceLog {
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.insert(log_schema().message_key(), v);
if let Some(message_key) = log_schema().message_key() {
log.insert((PathPrefix::Event, message_key), v);
}
}
log
}
Expand Down
Loading

0 comments on commit a08a393

Please sign in to comment.