Skip to content

Commit

Permalink
chore(splunk hec sink): json encoding should get message first and th…
Browse files Browse the repository at this point in the history
…en parse
  • Loading branch information
pront committed Apr 12, 2024
1 parent fae2ebf commit bf3fcea
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 39 deletions.
2 changes: 2 additions & 0 deletions changelog.d/splunk_hec_sink_encode_message_only.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The Splunk HEC sink is now using the `message` meaning to retrieve the relevant value and encodes the retrieved value
only. Note that if the retrieved value is `None`, an event with an empty message will be published.
24 changes: 15 additions & 9 deletions src/sinks/splunk_hec/logs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum HecEvent<'a> {
Text(Cow<'a, str>),
}

/// See https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/FormateventsforHTTPEventCollector#Event_data.
#[derive(Serialize, Debug)]
pub struct HecData<'a> {
#[serde(flatten)]
Expand Down Expand Up @@ -86,16 +87,21 @@ impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
}
EndpointTarget::Event => {
let serializer = encoder.serializer();
let hec_event = if serializer.supports_json() {
HecEvent::Json(
serializer
.to_json_value(event)
.map_err(|error| emit!(SplunkEventEncodeError { error }))
.ok()?,
)
let hec_event = if let Some(message) = event.as_log().get_message() {
if serializer.supports_json() {
let message_event = Event::from(LogEvent::from(message.clone()));
HecEvent::Json(
serializer
.to_json_value(message_event)
.map_err(|error| emit!(SplunkEventEncodeError { error }))
.ok()?,
)
} else {
encoder.encode(event, &mut bytes).ok()?;
HecEvent::Text(String::from_utf8_lossy(&bytes))
}
} else {
encoder.encode(event, &mut bytes).ok()?;
HecEvent::Text(String::from_utf8_lossy(&bytes))
HecEvent::Text("".into())
};

let mut hec_data = HecData::new(
Expand Down
112 changes: 82 additions & 30 deletions src/sinks/splunk_hec/logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vector_lib::{
};
use vrl::path::OwnedTargetPath;
use vrl::value::Kind;
use vrl::{event_path, metadata_path, owned_value_path};
use vrl::{metadata_path, owned_value_path};

use super::sink::{HecLogsProcessedEventMetadata, HecProcessedEvent};
use crate::sinks::util::processed_event::ProcessedEvent;
Expand Down Expand Up @@ -45,7 +45,7 @@ struct HecEventJson {

#[derive(Deserialize, Debug)]
struct HecEventText {
time: f64,
time: Option<f64>,
event: String,
fields: BTreeMap<String, String>,
source: Option<String>,
Expand All @@ -71,17 +71,25 @@ fn get_processed_event_timestamp(
timestamp_key: Option<OptionalTargetPath>,
auto_extract_timestamp: bool,
) -> HecProcessedEvent {
let mut event = Event::Log(LogEvent::from("hello world"));
// JSON object as event data
let mut event = Event::Log(LogEvent::from(btreemap! {
"msg" => "hello world",
"event_sourcetype" => "test_sourcetype",
"event_source" => "test_source",
"event_index" => "test_index",
"host_key" => "test_host",
"event_field1" => "test_value1",
"event_field2" => "test_value2",
"key" => "value",
"int_val" => 123,
}));
// This must exist because it is checked at runtime
event.as_mut_log().insert(metadata_path!("vector"), "");
let definition = Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE);
event
.as_mut_log()
.insert("event_sourcetype", "test_sourcetype");
event.as_mut_log().insert("event_source", "test_source");
event.as_mut_log().insert("event_index", "test_index");
event.as_mut_log().insert("host_key", "test_host");
event.as_mut_log().insert("event_field1", "test_value1");
event.as_mut_log().insert("event_field2", "test_value2");
event.as_mut_log().insert("key", "value");
event.as_mut_log().insert("int_val", 123);
.metadata_mut()
.set_schema_definition(&Arc::new(definition));

if let Some(OptionalTargetPath {
path: Some(ts_path),
Expand Down Expand Up @@ -172,17 +180,21 @@ fn splunk_encode_log_event_json() {
let processed_event = get_processed_event();
let hec_data =
get_encoded_event::<HecEventJson>(JsonSerializerConfig::default().into(), processed_event);
let event = hec_data.event;
let hec_event = hec_data.event;

assert_eq!(event.get("key").unwrap(), &serde_json::Value::from("value"));
assert_eq!(event.get("int_val").unwrap(), &serde_json::Value::from(123));
assert_eq!(
event
.get(&log_schema().message_key().unwrap().to_string())
.unwrap(),
hec_event.get("key").unwrap(),
&serde_json::Value::from("value")
);
assert_eq!(
hec_event.get("int_val").unwrap(),
&serde_json::Value::from(123)
);
assert_eq!(
hec_event.get("msg").unwrap(),
&serde_json::Value::from("hello world")
);
assert!(event
assert!(hec_event
.get(log_schema().timestamp_key().unwrap().to_string().as_str())
.is_none());

Expand All @@ -195,7 +207,7 @@ fn splunk_encode_log_event_json() {

assert_eq!(hec_data.time, Some(1638366107.111));
assert_eq!(
event.get("ts_nanos_key").unwrap(),
hec_event.get("ts_nanos_key").unwrap(),
&serde_json::Value::from(456123)
);
}
Expand All @@ -206,7 +218,19 @@ fn splunk_encode_log_event_text() {
let hec_data =
get_encoded_event::<HecEventText>(TextSerializerConfig::default().into(), processed_event);

assert_eq!(hec_data.event.as_str(), "hello world");
assert_eq!(
hec_data.event.as_str(),
"{\"event_field1\":\"test_value1\",
\"event_field2\":\"test_value2\",
\"event_index\":\"test_index\",
\"event_source\":\"test_source\",
\"event_sourcetype\":\"test_sourcetype\",
\"host_key\":\"test_host\",
\"int_val\":123,
\"key\":\"value\",
\"msg\":\"hello world\",
\"ts_nanos_key\":456123}"
);

assert_eq!(hec_data.source, Some("test_source".to_string()));
assert_eq!(hec_data.sourcetype, Some("test_sourcetype".to_string()));
Expand All @@ -215,7 +239,34 @@ fn splunk_encode_log_event_text() {

assert_eq!(hec_data.fields.get("event_field1").unwrap(), "test_value1");

assert_eq!(hec_data.time, 1638366107.111);
assert_eq!(hec_data.time, Some(1638366107.111));
}

#[test]
fn splunk_encode_log_event_message_none() {
let metadata = EventMetadata::default().with_schema_definition(&Arc::new(
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
));

let processed_event = process_log(
Event::Log(LogEvent::new_with_metadata(metadata)),
&super::sink::HecLogData {
sourcetype: None,
source: None,
index: None,
host_key: None,
indexed_fields: &[],
timestamp_nanos_key: None,
timestamp_key: None,
endpoint_target: EndpointTarget::Event,
auto_extract_timestamp: false,
},
);

let hec_data =
get_encoded_event::<HecEventText>(TextSerializerConfig::default().into(), processed_event);
assert_eq!(hec_data.event.as_str(), "");
}

#[tokio::test]
Expand Down Expand Up @@ -344,14 +395,20 @@ fn splunk_encode_log_event_semantic_meanings() {
&owned_value_path!("timestamp"),
Kind::timestamp(),
Some(meaning::TIMESTAMP),
),
)
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
));

let mut log = LogEvent::new_with_metadata(metadata);
log.insert(event_path!("message"), "the_message");

log.insert(
&OwnedTargetPath::event_root(),
Value::from(btreemap! {"foo" => "bar"}),
);

// insert an arbitrary metadata field such that the log becomes Vector namespaced
log.insert(metadata_path!("vector", "foo"), "bar");
assert!(log.namespace() == LogNamespace::Vector);

let og_time = Utc::now();

Expand All @@ -364,12 +421,8 @@ fn splunk_encode_log_event_semantic_meanings() {
Value::Timestamp(og_time),
);

assert!(log.namespace() == LogNamespace::Vector);

let event = Event::Log(log);

let processed_event = process_log(
event,
Event::Log(log),
&super::sink::HecLogData {
sourcetype: None,
source: None,
Expand All @@ -385,7 +438,6 @@ fn splunk_encode_log_event_semantic_meanings() {

let hec_data =
get_encoded_event::<HecEventJson>(JsonSerializerConfig::default().into(), processed_event);

assert_eq!(hec_data.time, Some(expected_time));

assert_eq!(hec_data.host, Some("roast".to_string()));
Expand Down

0 comments on commit bf3fcea

Please sign in to comment.