Skip to content

Commit

Permalink
chore(VRL decoder): add 'message' semantic meaning (#20296)
Browse files Browse the repository at this point in the history
* chore(VRL decoder): add semantic meaning for message

* changelog fragment

* also udpate legacy namespace
  • Loading branch information
pront authored Apr 12, 2024
1 parent 153919d commit 808368f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a "message" semantic meaning to the VRL decoder for all log namespaces.
44 changes: 41 additions & 3 deletions lib/codecs/src/decoding/format/vrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use bytes::Bytes;
use derivative::Derivative;
use smallvec::{smallvec, SmallVec};
use vector_config_macros::configurable_component;
use vector_core::config::{DataType, LogNamespace};
use vector_core::config::{log_schema, DataType, LogNamespace};
use vector_core::event::{Event, TargetEvents, VrlTarget};
use vector_core::schema::meaning;
use vector_core::{compile_vrl, schema};
use vrl::compiler::state::ExternalEnv;
use vrl::compiler::{runtime::Runtime, CompileConfig, Program, TimeZone, TypeState};
use vrl::diagnostic::Formatter;
use vrl::path::OwnedTargetPath;
use vrl::value::Kind;

/// Config used to build a `VrlDeserializer`.
Expand Down Expand Up @@ -78,10 +80,21 @@ impl VrlDeserializerConfig {
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => {
schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
let definition =
schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
if let Some(message_key) = log_schema().message_key() {
definition.with_event_field(
message_key,
Kind::bytes().or_undefined(),
Some(meaning::MESSAGE),
)
} else {
definition
}
}
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
.with_meaning(OwnedTargetPath::event_root(), "message")
}
}
}
Expand Down Expand Up @@ -138,9 +151,10 @@ mod tests {
use super::*;
use chrono::{DateTime, Utc};
use indoc::indoc;
use vrl::btreemap;
use vector_core::schema::Definition;
use vrl::path::OwnedTargetPath;
use vrl::value::Value;
use vrl::{btreemap, owned_value_path};

fn make_decoder(source: &str) -> VrlDeserializer {
VrlDeserializerConfig {
Expand Down Expand Up @@ -317,4 +331,28 @@ mod tests {
.to_string();
assert!(error.contains("aborted"));
}

#[test]
fn output_schema_definition_legacy_namespace() {
let legacy_definition =
VrlDeserializerConfig::default().schema_definition(LogNamespace::Legacy);
let expected_definition = Definition::empty_legacy_namespace()
.unknown_fields(Kind::any())
.with_event_field(
&owned_value_path!("message"),
Kind::bytes().or_undefined(),
Some(meaning::MESSAGE),
);
assert_eq!(legacy_definition, expected_definition);
}

#[test]
fn output_schema_definition_vector_namespace() {
let vector_definition =
VrlDeserializerConfig::default().schema_definition(LogNamespace::Vector);
let expected_definition =
Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), "message");
assert_eq!(vector_definition, expected_definition);
}
}

0 comments on commit 808368f

Please sign in to comment.