diff --git a/changelog.d/splunk_hec_sink_encode_message_only.breaking.md b/changelog.d/splunk_hec_sink_encode_message_only.breaking.md new file mode 100644 index 0000000000000..c973057c7c605 --- /dev/null +++ b/changelog.d/splunk_hec_sink_encode_message_only.breaking.md @@ -0,0 +1,2 @@ +The Splunk HEC sink is now using the `message` meaning to retrieve the relevant value and encodes the retrieved value +only. Note that if the retrieved value is `None`, an event with an empty message will be published. diff --git a/src/sinks/splunk_hec/logs/encoder.rs b/src/sinks/splunk_hec/logs/encoder.rs index 9f8faa841fde6..01f8c70897adf 100644 --- a/src/sinks/splunk_hec/logs/encoder.rs +++ b/src/sinks/splunk_hec/logs/encoder.rs @@ -22,6 +22,7 @@ pub enum HecEvent<'a> { Text(Cow<'a, str>), } +/// See https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/FormateventsforHTTPEventCollector#Event_data. #[derive(Serialize, Debug)] pub struct HecData<'a> { #[serde(flatten)] @@ -86,16 +87,21 @@ impl Encoder> for HecLogsEncoder { } EndpointTarget::Event => { let serializer = encoder.serializer(); - let hec_event = if serializer.supports_json() { - HecEvent::Json( - serializer - .to_json_value(event) - .map_err(|error| emit!(SplunkEventEncodeError { error })) - .ok()?, - ) + let hec_event = if let Some(message) = event.as_log().get_message() { + if serializer.supports_json() { + let message_event = Event::from(LogEvent::from(message.clone())); + HecEvent::Json( + serializer + .to_json_value(message_event) + .map_err(|error| emit!(SplunkEventEncodeError { error })) + .ok()?, + ) + } else { + encoder.encode(event, &mut bytes).ok()?; + HecEvent::Text(String::from_utf8_lossy(&bytes)) + } } else { - encoder.encode(event, &mut bytes).ok()?; - HecEvent::Text(String::from_utf8_lossy(&bytes)) + HecEvent::Text("".into()) }; let mut hec_data = HecData::new( diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 4589bd9ad6f24..173b52b4d120c 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -14,7 +14,7 @@ use vector_lib::{ }; use vrl::path::OwnedTargetPath; use vrl::value::Kind; -use vrl::{event_path, metadata_path, owned_value_path}; +use vrl::{metadata_path, owned_value_path}; use super::sink::{HecLogsProcessedEventMetadata, HecProcessedEvent}; use crate::sinks::util::processed_event::ProcessedEvent; @@ -45,7 +45,7 @@ struct HecEventJson { #[derive(Deserialize, Debug)] struct HecEventText { - time: f64, + time: Option, event: String, fields: BTreeMap, source: Option, @@ -71,17 +71,25 @@ fn get_processed_event_timestamp( timestamp_key: Option, auto_extract_timestamp: bool, ) -> HecProcessedEvent { - let mut event = Event::Log(LogEvent::from("hello world")); + // JSON object as event data + let mut event = Event::Log(LogEvent::from(btreemap! { + "msg" => "hello world", + "event_sourcetype" => "test_sourcetype", + "event_source" => "test_source", + "event_index" => "test_index", + "host_key" => "test_host", + "event_field1" => "test_value1", + "event_field2" => "test_value2", + "key" => "value", + "int_val" => 123, + })); + // This must exist because it is checked at runtime + event.as_mut_log().insert(metadata_path!("vector"), ""); + let definition = Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector]) + .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE); event - .as_mut_log() - .insert("event_sourcetype", "test_sourcetype"); - event.as_mut_log().insert("event_source", "test_source"); - event.as_mut_log().insert("event_index", "test_index"); - event.as_mut_log().insert("host_key", "test_host"); - event.as_mut_log().insert("event_field1", "test_value1"); - event.as_mut_log().insert("event_field2", "test_value2"); - event.as_mut_log().insert("key", "value"); - event.as_mut_log().insert("int_val", 123); + .metadata_mut() + .set_schema_definition(&Arc::new(definition)); if let Some(OptionalTargetPath { path: Some(ts_path), @@ -172,17 +180,21 @@ fn splunk_encode_log_event_json() { let processed_event = get_processed_event(); let hec_data = get_encoded_event::(JsonSerializerConfig::default().into(), processed_event); - let event = hec_data.event; + let hec_event = hec_data.event; - assert_eq!(event.get("key").unwrap(), &serde_json::Value::from("value")); - assert_eq!(event.get("int_val").unwrap(), &serde_json::Value::from(123)); assert_eq!( - event - .get(&log_schema().message_key().unwrap().to_string()) - .unwrap(), + hec_event.get("key").unwrap(), + &serde_json::Value::from("value") + ); + assert_eq!( + hec_event.get("int_val").unwrap(), + &serde_json::Value::from(123) + ); + assert_eq!( + hec_event.get("msg").unwrap(), &serde_json::Value::from("hello world") ); - assert!(event + assert!(hec_event .get(log_schema().timestamp_key().unwrap().to_string().as_str()) .is_none()); @@ -195,7 +207,7 @@ fn splunk_encode_log_event_json() { assert_eq!(hec_data.time, Some(1638366107.111)); assert_eq!( - event.get("ts_nanos_key").unwrap(), + hec_event.get("ts_nanos_key").unwrap(), &serde_json::Value::from(456123) ); } @@ -206,7 +218,19 @@ fn splunk_encode_log_event_text() { let hec_data = get_encoded_event::(TextSerializerConfig::default().into(), processed_event); - assert_eq!(hec_data.event.as_str(), "hello world"); + assert_eq!( + hec_data.event.as_str(), + "{\"event_field1\":\"test_value1\", + \"event_field2\":\"test_value2\", + \"event_index\":\"test_index\", + \"event_source\":\"test_source\", + \"event_sourcetype\":\"test_sourcetype\", + \"host_key\":\"test_host\", + \"int_val\":123, + \"key\":\"value\", + \"msg\":\"hello world\", + \"ts_nanos_key\":456123}" + ); assert_eq!(hec_data.source, Some("test_source".to_string())); assert_eq!(hec_data.sourcetype, Some("test_sourcetype".to_string())); @@ -215,7 +239,34 @@ fn splunk_encode_log_event_text() { assert_eq!(hec_data.fields.get("event_field1").unwrap(), "test_value1"); - assert_eq!(hec_data.time, 1638366107.111); + assert_eq!(hec_data.time, Some(1638366107.111)); +} + +#[test] +fn splunk_encode_log_event_message_none() { + let metadata = EventMetadata::default().with_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]) + .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE), + )); + + let processed_event = process_log( + Event::Log(LogEvent::new_with_metadata(metadata)), + &super::sink::HecLogData { + sourcetype: None, + source: None, + index: None, + host_key: None, + indexed_fields: &[], + timestamp_nanos_key: None, + timestamp_key: None, + endpoint_target: EndpointTarget::Event, + auto_extract_timestamp: false, + }, + ); + + let hec_data = + get_encoded_event::(TextSerializerConfig::default().into(), processed_event); + assert_eq!(hec_data.event.as_str(), ""); } #[tokio::test] @@ -344,14 +395,20 @@ fn splunk_encode_log_event_semantic_meanings() { &owned_value_path!("timestamp"), Kind::timestamp(), Some(meaning::TIMESTAMP), - ), + ) + .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE), )); let mut log = LogEvent::new_with_metadata(metadata); - log.insert(event_path!("message"), "the_message"); + + log.insert( + &OwnedTargetPath::event_root(), + Value::from(btreemap! {"foo" => "bar"}), + ); // insert an arbitrary metadata field such that the log becomes Vector namespaced log.insert(metadata_path!("vector", "foo"), "bar"); + assert!(log.namespace() == LogNamespace::Vector); let og_time = Utc::now(); @@ -364,12 +421,8 @@ fn splunk_encode_log_event_semantic_meanings() { Value::Timestamp(og_time), ); - assert!(log.namespace() == LogNamespace::Vector); - - let event = Event::Log(log); - let processed_event = process_log( - event, + Event::Log(log), &super::sink::HecLogData { sourcetype: None, source: None, @@ -385,7 +438,6 @@ fn splunk_encode_log_event_semantic_meanings() { let hec_data = get_encoded_event::(JsonSerializerConfig::default().into(), processed_event); - assert_eq!(hec_data.time, Some(expected_time)); assert_eq!(hec_data.host, Some("roast".to_string()));