diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 1a29bb5e02b4e..80e897fbb0e1b 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -458,6 +458,7 @@ timestamped typesense tzdata ubuntu +unchunked upstreaminfo useragents usergroups diff --git a/Cargo.lock b/Cargo.lock index 5259d73832987..204cbc4fac3db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2190,6 +2190,7 @@ dependencies = [ "ordered-float 4.4.0", "prost 0.12.6", "prost-reflect", + "rand 0.8.5", "regex", "rstest", "serde", @@ -2201,6 +2202,7 @@ dependencies = [ "tokio", "tokio-util", "tracing 0.1.40", + "tracing-test", "uuid", "vector-common", "vector-config", @@ -4437,7 +4439,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing 0.1.40", @@ -7243,7 +7245,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes 1.8.0", "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -7276,7 +7278,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.12.1", "proc-macro2 1.0.89", "quote 1.0.37", "syn 2.0.85", @@ -7289,7 +7291,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2 1.0.89", "quote 1.0.37", "syn 2.0.85", @@ -10041,6 +10043,7 @@ dependencies = [ "serde", "serde_json", "sharded-slab", + "smallvec", "thread_local", "tracing 0.1.40", "tracing-core 0.1.32", @@ -10048,6 +10051,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core 0.1.32", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote 1.0.37", + "syn 2.0.85", +] + [[package]] name = "tracing-tower" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cb3f4a80dbd7d..bfac237d896ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,8 +149,10 @@ prost = { version = "0.12", default-features = false, features = ["std"] } prost-build = { version = "0.12", default-features = false } prost-reflect = { version = "0.14", features = ["serde"], default-features = false } prost-types = { version = "0.12", default-features = false } +rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } serde_json = { version = "1.0.132", default-features = false, features = ["raw_value", "std"] } serde = { version = "1.0.213", default-features = false, features = ["alloc", "derive", "rc"] } +tokio = { version = "1.39.2", default-features = false, features = ["full"] } toml = { version = "0.8.19", default-features = false, features = ["display", "parse"] } tonic = { version = "0.11", default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] } tonic-build = { version = "0.11", default-features = false, features = ["transport", "prost"] } @@ -337,7 +339,7 @@ paste = "1.0.15" percent-encoding = { version = "2.3.1", default-features = false } postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true } pulsar = { version = "6.3.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true } -rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } +rand.workspace = true rand_distr = { version = "0.4.3", default-features = false } rdkafka = { version = "0.35.0", default-features = false, features = ["curl-static", "tokio", "libz", "ssl", "zstd"], optional = true } redis = { version = "0.24.0", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true } diff --git a/changelog.d/20769-chunked_gelf_decoding.feature.md b/changelog.d/20769-chunked_gelf_decoding.feature.md new file mode 100644 index 0000000000000..2dc8e4cc15547 --- /dev/null +++ b/changelog.d/20769-chunked_gelf_decoding.feature.md @@ -0,0 +1,5 @@ +Allows for chunked gelf decoding in message-based sources, such as UDP sockets or unix datagram sockets. Implementation is based on [Graylog's documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP). + +This framing method can be configured via the `framing.method = "chunked_gelf"` option in the source configuration. + +authors: jorgehermo9 diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 691e2fd91c675..00cc2b3b610e7 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -22,6 +22,7 @@ memchr = { version = "2", default-features = false } ordered-float = { version = "4.4.0", default-features = false } prost.workspace = true prost-reflect.workspace = true +rand.workspace = true regex = { version = "1.11.0", default-features = false, features = ["std", "perf"] } serde.workspace = true serde_json.workspace = true @@ -29,6 +30,7 @@ smallvec = { version = "1", default-features = false, features = ["union"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } syslog_loose = { version = "0.21", default-features = false, optional = true } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } +tokio.workspace = true tracing = { version = "0.1", default-features = false } vrl.workspace = true vector-common = { path = "../vector-common", default-features = false } @@ -43,8 +45,9 @@ indoc = { version = "2", default-features = false } tokio = { version = "1", features = ["test-util"] } similar-asserts = "1.6.0" vector-core = { path = "../vector-core", default-features = false, features = ["vrl", "test"] } -uuid.workspace = true rstest = "0.23.0" +tracing-test = "0.2.5" +uuid.workspace = true vrl.workspace = true [features] diff --git a/lib/codecs/src/decoding/framing/chunked_gelf.rs b/lib/codecs/src/decoding/framing/chunked_gelf.rs new file mode 100644 index 0000000000000..131ff116eb9b1 --- /dev/null +++ b/lib/codecs/src/decoding/framing/chunked_gelf.rs @@ -0,0 +1,881 @@ +use crate::{BytesDecoder, StreamDecodingError}; + +use super::{BoxedFramingError, FramingError}; +use bytes::{Buf, Bytes, BytesMut}; +use derivative::Derivative; +use snafu::{ensure, Snafu}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio; +use tokio::task::JoinHandle; +use tokio_util::codec::Decoder; +use tracing::{debug, warn}; +use vector_config::configurable_component; + +const GELF_MAGIC: &[u8] = &[0x1e, 0x0f]; +const GELF_MAX_TOTAL_CHUNKS: u8 = 128; +const DEFAULT_TIMEOUT_SECS: f64 = 5.0; + +const fn default_timeout_secs() -> f64 { + DEFAULT_TIMEOUT_SECS +} + +/// Config used to build a `ChunkedGelfDecoder`. +#[configurable_component] +#[derive(Debug, Clone, Default)] +pub struct ChunkedGelfDecoderConfig { + /// Options for the chunked GELF decoder. + #[serde(default)] + pub chunked_gelf: ChunkedGelfDecoderOptions, +} + +impl ChunkedGelfDecoderConfig { + /// Build the `ChunkedGelfDecoder` from this configuration. + pub fn build(&self) -> ChunkedGelfDecoder { + ChunkedGelfDecoder::new( + self.chunked_gelf.timeout_secs, + self.chunked_gelf.pending_messages_limit, + self.chunked_gelf.max_chunk_length, + self.chunked_gelf.max_message_length, + ) + } +} + +/// Options for building a `ChunkedGelfDecoder`. +#[configurable_component] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +pub struct ChunkedGelfDecoderOptions { + /// The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + /// decoder drops all the received chunks of the timed out message. + #[serde(default = "default_timeout_secs")] + #[derivative(Default(value = "default_timeout_secs()"))] + pub timeout_secs: f64, + + /// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + /// dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + /// If this option is not set, the decoder does not limit the number of pending messages and the memory usage + /// of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] + pub pending_messages_limit: Option, + + /// The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + /// be dropped. If this option is not set, the decoder does not limit the length of chunks and + /// the per-chunk memory is unbounded. + /// + /// This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] + pub max_chunk_length: Option, + + /// The maximum length of a single GELF message, in bytes. Messages longer than this length will + /// be dropped. If this option is not set, the decoder does not limit the length of messages and + /// the per-message memory is unbounded. + /// + /// Note that a message can be composed of multiple chunks and this limit is applied to the whole + /// message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + /// This option is useful to limit the memory usage of the decoders's chunk buffer. + /// + /// This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + /// The message's payload is the concatenation of all the chunks' payloads. + #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] + pub max_message_length: Option, +} + +#[derive(Debug)] +struct MessageState { + total_chunks: u8, + chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize], + chunks_bitmap: u128, + current_message_length: usize, + timeout_task: JoinHandle<()>, +} + +impl MessageState { + pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self { + Self { + total_chunks, + chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize], + chunks_bitmap: 0, + current_message_length: 0, + timeout_task, + } + } + + fn is_chunk_present(&self, sequence_number: u8) -> bool { + let chunk_bitmap_id = 1 << sequence_number; + self.chunks_bitmap & chunk_bitmap_id != 0 + } + + fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) { + let chunk_bitmap_id = 1 << sequence_number; + self.chunks_bitmap |= chunk_bitmap_id; + self.current_message_length += chunk.remaining(); + self.chunks[sequence_number as usize] = chunk; + } + + fn is_complete(&self) -> bool { + self.chunks_bitmap.count_ones() == self.total_chunks as u32 + } + + fn current_message_length(&self) -> usize { + self.current_message_length + } + + fn retrieve_message(&self) -> Option { + if self.is_complete() { + self.timeout_task.abort(); + let chunks = &self.chunks[0..self.total_chunks as usize]; + let mut message = BytesMut::new(); + for chunk in chunks { + message.extend_from_slice(chunk); + } + Some(message.freeze()) + } else { + None + } + } +} + +#[derive(Debug, Snafu, PartialEq, Eq)] +pub enum ChunkedGelfDecoderError { + #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))] + InvalidChunkHeader { header: Bytes }, + #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."))] + InvalidTotalChunks { + message_id: u64, + sequence_number: u8, + total_chunks: u8, + }, + #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"))] + InvalidSequenceNumber { + message_id: u64, + sequence_number: u8, + total_chunks: u8, + }, + #[snafu(display("Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"))] + PendingMessagesLimitReached { + message_id: u64, + sequence_number: u8, + pending_messages_limit: usize, + }, + #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"))] + TotalChunksMismatch { + message_id: u64, + sequence_number: u8, + original_total_chunks: u8, + received_total_chunks: u8, + }, + #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has exceeded the maximum chunk length: got {chunk_length} bytes and max chunk length is {max_chunk_length} bytes"))] + MaxChunkLengthExceeded { + message_id: u64, + sequence_number: u8, + chunk_length: usize, + max_chunk_length: usize, + }, + #[snafu(display("Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {message_length} bytes and max message length is {max_message_length} bytes. Discarding all buffered chunks of that message"))] + MaxMessageLengthExceeded { + message_id: u64, + sequence_number: u8, + message_length: usize, + max_message_length: usize, + }, +} + +impl StreamDecodingError for ChunkedGelfDecoderError { + fn can_continue(&self) -> bool { + true + } +} + +impl FramingError for ChunkedGelfDecoderError { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP) +/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go). +#[derive(Debug, Clone)] +pub struct ChunkedGelfDecoder { + // We have to use this decoder to read all the bytes from the buffer first and don't let tokio + // read it buffered, as tokio FramedRead will not always call the decode method with the + // whole message. (see https://docs.rs/tokio-util/latest/src/tokio_util/codec/framed_impl.rs.html#26). + // This limitation is due to the fact that the GELF format does not specify the length of the + // message, so we have to read all the bytes from the message (datagram) + bytes_decoder: BytesDecoder, + state: Arc>>, + timeout: Duration, + pending_messages_limit: Option, + max_chunk_length: Option, + max_message_length: Option, +} + +impl ChunkedGelfDecoder { + /// Creates a new `ChunkedGelfDecoder`. + pub fn new( + timeout_secs: f64, + pending_messages_limit: Option, + max_chunk_length: Option, + max_message_length: Option, + ) -> Self { + Self { + bytes_decoder: BytesDecoder::new(), + state: Arc::new(Mutex::new(HashMap::new())), + timeout: Duration::from_secs_f64(timeout_secs), + pending_messages_limit, + max_chunk_length, + max_message_length, + } + } + + /// Decode a GELF chunk + pub fn decode_chunk( + &mut self, + mut chunk: Bytes, + ) -> Result, ChunkedGelfDecoderError> { + // Encoding scheme: + // + // +------------+-----------------+--------------+----------------------+ + // | Message id | Sequence number | Total chunks | Chunk payload | + // +------------+-----------------+--------------+----------------------+ + // | 64 bits | 8 bits | 8 bits | remaining bits | + // +------------+-----------------+--------------+----------------------+ + // + // As this codec is oriented for UDP, the chunks (datagrams) are not guaranteed to be received in order, + // nor to be received at all. So, we have to store the chunks in a buffer (state field) until we receive + // all the chunks of a message. When we receive all the chunks of a message, we can concatenate them + // and return the complete payload. + + // We need 10 bytes to read the message id, sequence number and total chunks + ensure!( + chunk.remaining() >= 10, + InvalidChunkHeaderSnafu { header: chunk } + ); + + let message_id = chunk.get_u64(); + let sequence_number = chunk.get_u8(); + let total_chunks = chunk.get_u8(); + + ensure!( + total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS, + InvalidTotalChunksSnafu { + message_id, + sequence_number, + total_chunks + } + ); + + ensure!( + sequence_number < total_chunks, + InvalidSequenceNumberSnafu { + message_id, + sequence_number, + total_chunks + } + ); + + if let Some(max_chunk_length) = self.max_chunk_length { + let chunk_length = chunk.remaining(); + ensure!( + chunk_length <= max_chunk_length, + MaxChunkLengthExceededSnafu { + message_id, + sequence_number, + chunk_length, + max_chunk_length + } + ); + } + + let mut state_lock = self.state.lock().expect("poisoned lock"); + + if let Some(pending_messages_limit) = self.pending_messages_limit { + ensure!( + state_lock.len() < pending_messages_limit, + PendingMessagesLimitReachedSnafu { + message_id, + sequence_number, + pending_messages_limit + } + ); + } + + let message_state = state_lock.entry(message_id).or_insert_with(|| { + // We need to spawn a task that will clear the message state after a certain time + // otherwise we will have a memory leak due to messages that never complete + let state = Arc::clone(&self.state); + let timeout = self.timeout; + let timeout_handle = tokio::spawn(async move { + tokio::time::sleep(timeout).await; + let mut state_lock = state.lock().expect("poisoned lock"); + if state_lock.remove(&message_id).is_some() { + warn!( + message_id = message_id, + timeout_secs = timeout.as_secs_f64(), + internal_log_rate_limit = true, + "Message was not fully received within the timeout window. Discarding it." + ); + } + }); + MessageState::new(total_chunks, timeout_handle) + }); + + ensure!( + message_state.total_chunks == total_chunks, + TotalChunksMismatchSnafu { + message_id, + sequence_number, + original_total_chunks: message_state.total_chunks, + received_total_chunks: total_chunks + } + ); + + if message_state.is_chunk_present(sequence_number) { + debug!( + message_id = message_id, + sequence_number = sequence_number, + internal_log_rate_limit = true, + "Received a duplicate chunk. Ignoring it." + ); + return Ok(None); + } + + message_state.add_chunk(sequence_number, chunk); + + if let Some(max_message_length) = self.max_message_length { + let message_length = message_state.current_message_length(); + if message_length > max_message_length { + state_lock.remove(&message_id); + return Err(ChunkedGelfDecoderError::MaxMessageLengthExceeded { + message_id, + sequence_number, + message_length, + max_message_length, + }); + } + } + + if let Some(message) = message_state.retrieve_message() { + state_lock.remove(&message_id); + Ok(Some(message)) + } else { + Ok(None) + } + } + + /// Decode a GELF message that may be chunked or not. The source bytes are expected to be + /// datagram-based (or message-based), so it must not contain multiple GELF messages + /// delimited by '\0', such as it would be in a stream-based protocol. + pub fn decode_message( + &mut self, + mut src: Bytes, + ) -> Result, ChunkedGelfDecoderError> { + if src.starts_with(GELF_MAGIC) { + src.advance(2); + self.decode_chunk(src) + } else { + Ok(Some(src)) + } + } +} + +impl Default for ChunkedGelfDecoder { + fn default() -> Self { + Self::new(DEFAULT_TIMEOUT_SECS, None, None, None) + } +} + +impl Decoder for ChunkedGelfDecoder { + type Item = Bytes; + + type Error = BoxedFramingError; + + fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { + if src.is_empty() { + return Ok(None); + } + + Ok(self + .bytes_decoder + .decode(src)? + .and_then(|frame| self.decode_message(frame).transpose()) + .transpose()?) + } + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + if buf.is_empty() { + return Ok(None); + } + + Ok(self + .bytes_decoder + .decode_eof(buf)? + .and_then(|frame| self.decode_message(frame).transpose()) + .transpose()?) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use bytes::{BufMut, BytesMut}; + use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; + use rstest::{fixture, rstest}; + use tracing_test::traced_test; + + fn create_chunk( + message_id: u64, + sequence_number: u8, + total_chunks: u8, + payload: &str, + ) -> BytesMut { + let mut chunk = BytesMut::new(); + chunk.put_slice(GELF_MAGIC); + chunk.put_u64(message_id); + chunk.put_u8(sequence_number); + chunk.put_u8(total_chunks); + chunk.extend_from_slice(payload.as_bytes()); + chunk + } + + #[fixture] + fn unchunked_message() -> (BytesMut, String) { + let payload = "foo"; + (BytesMut::from(payload), payload.to_string()) + } + + #[fixture] + fn two_chunks_message() -> ([BytesMut; 2], String) { + let message_id = 1u64; + let total_chunks = 2u8; + + let first_sequence_number = 0u8; + let first_payload = "foo"; + let first_chunk = create_chunk( + message_id, + first_sequence_number, + total_chunks, + first_payload, + ); + + let second_sequence_number = 1u8; + let second_payload = "bar"; + let second_chunk = create_chunk( + message_id, + second_sequence_number, + total_chunks, + second_payload, + ); + + ( + [first_chunk, second_chunk], + format!("{first_payload}{second_payload}"), + ) + } + + #[fixture] + fn three_chunks_message() -> ([BytesMut; 3], String) { + let message_id = 2u64; + let total_chunks = 3u8; + + let first_sequence_number = 0u8; + let first_payload = "foo"; + let first_chunk = create_chunk( + message_id, + first_sequence_number, + total_chunks, + first_payload, + ); + + let second_sequence_number = 1u8; + let second_payload = "bar"; + let second_chunk = create_chunk( + message_id, + second_sequence_number, + total_chunks, + second_payload, + ); + + let third_sequence_number = 2u8; + let third_payload = "baz"; + let third_chunk = create_chunk( + message_id, + third_sequence_number, + total_chunks, + third_payload, + ); + + ( + [first_chunk, second_chunk, third_chunk], + format!("{first_payload}{second_payload}{third_payload}"), + ) + } + + fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError { + error + .as_any() + .downcast_ref::() + .expect("Expected ChunkedGelfDecoderError to be downcasted") + } + + #[rstest] + #[tokio::test] + async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) { + let (mut chunks, expected_message) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut chunks[1]).unwrap(); + assert_eq!(frame, Some(Bytes::from(expected_message))); + } + + #[rstest] + #[tokio::test] + async fn decode_unchunked(unchunked_message: (BytesMut, String)) { + let (mut message, expected_message) = unchunked_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut message).unwrap(); + assert_eq!(frame, Some(Bytes::from(expected_message))); + } + + #[rstest] + #[tokio::test] + async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) { + let (mut chunks, expected_message) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunks[1]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert_eq!(frame, Some(Bytes::from(expected_message))); + } + + #[rstest] + #[tokio::test] + async fn decode_unordered_messages( + two_chunks_message: ([BytesMut; 2], String), + three_chunks_message: ([BytesMut; 3], String), + ) { + let (mut two_chunks, two_chunks_expected) = two_chunks_message; + let (mut three_chunks, three_chunks_expected) = three_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap(); + assert_eq!(frame, Some(Bytes::from(two_chunks_expected))); + + let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap(); + assert_eq!(frame, Some(Bytes::from(three_chunks_expected))); + } + + #[rstest] + #[tokio::test] + async fn decode_mixed_chunked_and_unchunked_messages( + unchunked_message: (BytesMut, String), + two_chunks_message: ([BytesMut; 2], String), + ) { + let (mut unchunked_message, expected_unchunked_message) = unchunked_message; + let (mut chunks, expected_chunked_message) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunks[1]).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut unchunked_message).unwrap(); + assert_eq!(frame, Some(Bytes::from(expected_unchunked_message))); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert_eq!(frame, Some(Bytes::from(expected_chunked_message))); + } + + #[tokio::test] + async fn decode_shuffled_messages() { + let mut rng = SmallRng::seed_from_u64(420); + let total_chunks = 100u8; + let first_message_id = 1u64; + let first_payload = "first payload"; + let second_message_id = 2u64; + let second_payload = "second payload"; + let first_message_chunks = (0..total_chunks).map(|sequence_number| { + create_chunk( + first_message_id, + sequence_number, + total_chunks, + first_payload, + ) + }); + let second_message_chunks = (0..total_chunks).map(|sequence_number| { + create_chunk( + second_message_id, + sequence_number, + total_chunks, + second_payload, + ) + }); + let expected_first_message = first_payload.repeat(total_chunks as usize); + let expected_second_message = second_payload.repeat(total_chunks as usize); + let mut merged_chunks = first_message_chunks + .chain(second_message_chunks) + .collect::>(); + merged_chunks.shuffle(&mut rng); + let mut decoder = ChunkedGelfDecoder::default(); + + let mut count = 0; + let first_retrieved_message = loop { + assert!(count < 2 * total_chunks as usize); + if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() { + break message; + } else { + count += 1; + } + }; + let second_retrieved_message = loop { + assert!(count < 2 * total_chunks as usize); + if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() { + break message; + } else { + count += 1 + } + }; + + assert_eq!(second_retrieved_message, expected_first_message); + assert_eq!(first_retrieved_message, expected_second_message); + } + + #[rstest] + #[tokio::test(start_paused = true)] + #[traced_test] + async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) { + let (mut chunks, _) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert!(frame.is_none()); + assert!(!decoder.state.lock().unwrap().is_empty()); + + // The message state should be cleared after a certain time + tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await; + assert!(decoder.state.lock().unwrap().is_empty()); + assert!(logs_contain( + "Message was not fully received within the timeout window. Discarding it." + )); + + let frame = decoder.decode_eof(&mut chunks[1]).unwrap(); + assert!(frame.is_none()); + + tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await; + assert!(decoder.state.lock().unwrap().is_empty()); + assert!(logs_contain( + "Message was not fully received within the timeout window. Discarding it" + )); + } + + #[tokio::test] + async fn decode_empty_input() { + let mut src = BytesMut::new(); + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut src).unwrap(); + assert!(frame.is_none()); + } + + #[tokio::test] + async fn decode_chunk_with_invalid_header() { + let mut src = BytesMut::new(); + src.extend_from_slice(GELF_MAGIC); + // Invalid chunk header with less than 10 bytes + let invalid_chunk = [0x12, 0x34]; + src.extend_from_slice(&invalid_chunk); + let mut decoder = ChunkedGelfDecoder::default(); + let frame = decoder.decode_eof(&mut src); + + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::InvalidChunkHeader { + header: Bytes::from_static(&[0x12, 0x34]) + } + ); + } + + #[tokio::test] + async fn decode_chunk_with_invalid_total_chunks() { + let message_id = 1u64; + let sequence_number = 1u8; + let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1; + let payload = "foo"; + let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, payload); + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunk); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::InvalidTotalChunks { + message_id: 1, + sequence_number: 1, + total_chunks: 129, + } + ); + } + + #[tokio::test] + async fn decode_chunk_with_invalid_sequence_number() { + let message_id = 1u64; + let total_chunks = 2u8; + let invalid_sequence_number = total_chunks + 1; + let payload = "foo"; + let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, payload); + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunk); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::InvalidSequenceNumber { + message_id: 1, + sequence_number: 3, + total_chunks: 2, + }, + ); + } + + #[rstest] + #[tokio::test] + async fn decode_reached_pending_messages_limit( + two_chunks_message: ([BytesMut; 2], String), + three_chunks_message: ([BytesMut; 3], String), + ) { + let (mut two_chunks, _) = two_chunks_message; + let (mut three_chunks, _) = three_chunks_message; + let mut decoder = ChunkedGelfDecoder::new(DEFAULT_TIMEOUT_SECS, Some(1), None, None); + + let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap(); + assert!(frame.is_none()); + assert!(decoder.state.lock().unwrap().len() == 1); + + let frame = decoder.decode_eof(&mut three_chunks[0]); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::PendingMessagesLimitReached { + message_id: 2u64, + sequence_number: 0u8, + pending_messages_limit: 1, + } + ); + assert!(decoder.state.lock().unwrap().len() == 1); + } + + #[rstest] + #[tokio::test] + async fn decode_chunk_with_different_total_chunks() { + let message_id = 1u64; + let sequence_number = 0u8; + let total_chunks = 2u8; + let payload = "foo"; + let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, payload); + let mut second_chunk = + create_chunk(message_id, sequence_number + 1, total_chunks + 1, payload); + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut first_chunk).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut second_chunk); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::TotalChunksMismatch { + message_id: 1, + sequence_number: 1, + original_total_chunks: 2, + received_total_chunks: 3, + } + ); + } + + #[rstest] + #[tokio::test] + async fn decode_chunk_greater_than_max_chunk_length() { + let message_id = 1u64; + let sequence_number = 0u8; + let total_chunks = 1u8; + let payload = "foo"; + let mut chunk = create_chunk(message_id, sequence_number, total_chunks, payload); + let mut decoder = ChunkedGelfDecoder::new(DEFAULT_TIMEOUT_SECS, None, Some(1), None); + + let frame = decoder.decode_eof(&mut chunk); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::MaxChunkLengthExceeded { + message_id: 1, + sequence_number: 0, + chunk_length: 3, + max_chunk_length: 1, + } + ); + assert_eq!(decoder.state.lock().unwrap().len(), 0); + } + + #[rstest] + #[tokio::test] + async fn decode_message_greater_than_max_message_length( + two_chunks_message: ([BytesMut; 2], String), + ) { + let (mut chunks, _) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::new(DEFAULT_TIMEOUT_SECS, None, None, Some(5)); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert!(frame.is_none()); + let frame = decoder.decode_eof(&mut chunks[1]); + let error = frame.unwrap_err(); + let downcasted_error = downcast_framing_error(&error); + assert_eq!( + *downcasted_error, + ChunkedGelfDecoderError::MaxMessageLengthExceeded { + message_id: 1, + sequence_number: 1, + message_length: 6, + max_message_length: 5, + } + ); + assert_eq!(decoder.state.lock().unwrap().len(), 0); + } + + #[rstest] + #[tokio::test] + #[traced_test] + async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) { + let (mut chunks, _) = two_chunks_message; + let mut decoder = ChunkedGelfDecoder::default(); + + let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap(); + assert!(frame.is_none()); + + let frame = decoder.decode_eof(&mut chunks[0]).unwrap(); + assert!(frame.is_none()); + assert!(logs_contain("Received a duplicate chunk. Ignoring it.")); + } +} diff --git a/lib/codecs/src/decoding/framing/mod.rs b/lib/codecs/src/decoding/framing/mod.rs index f991b969575a6..86740a50129f2 100644 --- a/lib/codecs/src/decoding/framing/mod.rs +++ b/lib/codecs/src/decoding/framing/mod.rs @@ -5,16 +5,18 @@ mod bytes; mod character_delimited; +mod chunked_gelf; mod length_delimited; mod newline_delimited; mod octet_counting; -use std::fmt::Debug; +use std::{any::Any, fmt::Debug}; use ::bytes::Bytes; pub use character_delimited::{ CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, }; +pub use chunked_gelf::{ChunkedGelfDecoder, ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions}; use dyn_clone::DynClone; pub use length_delimited::{LengthDelimitedDecoder, LengthDelimitedDecoderConfig}; pub use newline_delimited::{ @@ -34,23 +36,32 @@ use super::StreamDecodingError; /// It requires conformance to `TcpError` so that we can determine whether the /// error is recoverable or if trying to continue will lead to hanging up the /// TCP source indefinitely. -pub trait FramingError: std::error::Error + StreamDecodingError + Send + Sync {} +pub trait FramingError: std::error::Error + StreamDecodingError + Send + Sync + Any { + /// Coerces the error to a `dyn Any`. + /// This is useful for downcasting the error to a concrete type + fn as_any(&self) -> &dyn Any; +} impl std::error::Error for BoxedFramingError {} -impl FramingError for std::io::Error {} - -impl FramingError for LinesCodecError {} +impl FramingError for std::io::Error { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} -impl From for BoxedFramingError { - fn from(error: std::io::Error) -> Self { - Box::new(error) +impl FramingError for LinesCodecError { + fn as_any(&self) -> &dyn Any { + self as &dyn Any } } -impl From for BoxedFramingError { - fn from(error: LinesCodecError) -> Self { - Box::new(error) +impl From for BoxedFramingError +where + T: FramingError + 'static, +{ + fn from(value: T) -> Self { + Box::new(value) } } diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index 9590fb21da0ec..76f8f6466228e 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -20,10 +20,11 @@ pub use format::{ pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions}; pub use framing::{ BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder, - CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, FramingError, - LengthDelimitedDecoder, LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, - NewlineDelimitedDecoderConfig, NewlineDelimitedDecoderOptions, OctetCountingDecoder, - OctetCountingDecoderConfig, OctetCountingDecoderOptions, + CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder, + ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder, + LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, + NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig, + OctetCountingDecoderOptions, }; use smallvec::SmallVec; use std::fmt::Debug; @@ -99,6 +100,11 @@ pub enum FramingConfig { /// /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1 OctetCounting(OctetCountingDecoderConfig), + + /// Byte frames which are chunked GELF messages. + /// + /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + ChunkedGelf(ChunkedGelfDecoderConfig), } impl From for FramingConfig { @@ -131,6 +137,12 @@ impl From for FramingConfig { } } +impl From for FramingConfig { + fn from(config: ChunkedGelfDecoderConfig) -> Self { + Self::ChunkedGelf(config) + } +} + impl FramingConfig { /// Build the `Framer` from this configuration. pub fn build(&self) -> Framer { @@ -140,6 +152,7 @@ impl FramingConfig { FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()), FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()), FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()), + FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()), } } } @@ -159,6 +172,8 @@ pub enum Framer { OctetCounting(OctetCountingDecoder), /// Uses an opaque `Framer` implementation for framing. Boxed(BoxedFramer), + /// Uses a `ChunkedGelfDecoder` for framing. + ChunkedGelf(ChunkedGelfDecoder), } impl tokio_util::codec::Decoder for Framer { @@ -173,6 +188,7 @@ impl tokio_util::codec::Decoder for Framer { Framer::NewlineDelimited(framer) => framer.decode(src), Framer::OctetCounting(framer) => framer.decode(src), Framer::Boxed(framer) => framer.decode(src), + Framer::ChunkedGelf(framer) => framer.decode(src), } } @@ -184,6 +200,7 @@ impl tokio_util::codec::Decoder for Framer { Framer::NewlineDelimited(framer) => framer.decode_eof(src), Framer::OctetCounting(framer) => framer.decode_eof(src), Framer::Boxed(framer) => framer.decode_eof(src), + Framer::ChunkedGelf(framer) => framer.decode_eof(src), } } } @@ -361,6 +378,14 @@ impl DeserializerConfig { } } + /// Returns an appropriate default framing config for the given deserializer with message based inputs. + pub fn default_message_based_framing(&self) -> FramingConfig { + match self { + DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()), + _ => FramingConfig::Bytes, + } + } + /// Return the type of event build by this deserializer. pub fn output_type(&self) -> DataType { match self { diff --git a/lib/codecs/src/encoding/format/native_json.rs b/lib/codecs/src/encoding/format/native_json.rs index 90464bfcc0631..60d43725d8017 100644 --- a/lib/codecs/src/encoding/format/native_json.rs +++ b/lib/codecs/src/encoding/format/native_json.rs @@ -98,7 +98,9 @@ mod tests { serializer .encode(histogram_event.clone(), &mut bytes) .unwrap(); - let json = serializer.to_json_value(histogram_event).unwrap(); - assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap()); + assert_eq!( + bytes.freeze(), + serde_json::to_string(&histogram_event).unwrap() + ); } } diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index d184932633507..f61f82d5d2701 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -198,6 +198,8 @@ fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> enco // TODO: There's no equivalent octet counting framer for encoding... although // there's no particular reason that would make it hard to write. decoding::FramingConfig::OctetCounting(_) => todo!(), + // TODO: chunked gelf is not supported yet in encoding + decoding::FramingConfig::ChunkedGelf(_) => todo!(), }; framing_config.build() diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index e45b879d6c965..175c1f218ae3c 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -10,7 +10,6 @@ use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path}; use vrl::value::{kind::Collection, Kind}; #[cfg(unix)] -use crate::serde::default_framing_message_based; use crate::{ codecs::DecodingConfig, config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput}, @@ -152,12 +151,12 @@ impl SourceConfig for SocketConfig { } Mode::Udp(config) => { let log_namespace = cx.log_namespace(config.log_namespace); - let decoder = DecodingConfig::new( - config.framing().clone(), - config.decoding().clone(), - log_namespace, - ) - .build()?; + let decoding = config.decoding().clone(); + let framing = config + .framing() + .clone() + .unwrap_or_else(|| decoding.default_message_based_framing()); + let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?; Ok(udp::udp( config, decoder, @@ -169,15 +168,12 @@ impl SourceConfig for SocketConfig { #[cfg(unix)] Mode::UnixDatagram(config) => { let log_namespace = cx.log_namespace(config.log_namespace); - let decoder = DecodingConfig::new( - config - .framing - .clone() - .unwrap_or_else(default_framing_message_based), - config.decoding.clone(), - log_namespace, - ) - .build()?; + let decoding = config.decoding.clone(); + let framing = config + .framing + .clone() + .unwrap_or_else(|| decoding.default_message_based_framing()); + let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?; unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace) } @@ -334,17 +330,19 @@ mod test { use bytes::{BufMut, Bytes, BytesMut}; use futures::{stream, StreamExt}; + use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; + use serde_json::json; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::{ task::JoinHandle, time::{timeout, Duration, Instant}, }; - use vector_lib::codecs::NewlineDelimitedDecoderConfig; #[cfg(unix)] use vector_lib::codecs::{ decoding::CharacterDelimitedDecoderOptions, CharacterDelimitedDecoderConfig, }; + use vector_lib::codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig}; use vector_lib::event::EventContainer; use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; use vrl::value::ObjectMap; @@ -383,6 +381,53 @@ mod test { SourceSender, }; + fn get_gelf_payload(message: &str) -> String { + serde_json::to_string(&json!({ + "version": "1.1", + "host": "example.org", + "short_message": message, + "timestamp": 1234567890.123, + "level": 6, + "_foo": "bar", + })) + .unwrap() + } + + fn create_gelf_chunk( + message_id: u64, + sequence_number: u8, + total_chunks: u8, + payload: &[u8], + ) -> Bytes { + const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f]; + let mut chunk = BytesMut::new(); + chunk.put_slice(&GELF_MAGIC); + chunk.put_u64(message_id); + chunk.put_u8(sequence_number); + chunk.put_u8(total_chunks); + chunk.put(payload); + chunk.freeze() + } + + fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec { + let message_id = rand::random(); + let payload = get_gelf_payload(short_message); + let payload_chunks = payload.as_bytes().chunks(max_size).collect::>(); + let total_chunks = payload_chunks.len(); + assert!(total_chunks <= 128, "too many gelf chunks"); + + let mut chunks = payload_chunks + .into_iter() + .enumerate() + .map(|(i, payload_chunk)| { + create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk) + }) + .collect::>(); + // Shuffle the chunks to simulate out-of-order delivery + chunks.shuffle(rng); + chunks + } + #[test] fn generate_config() { crate::test_util::test_generate_config::(); @@ -855,20 +900,24 @@ mod test { //////// UDP TESTS //////// fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator) -> SocketAddr { + send_packets_udp(addr, lines.into_iter().map(|line| line.into())) + } + + fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator) -> SocketAddr { let bind = next_addr(); let socket = UdpSocket::bind(bind) .map_err(|error| panic!("{:}", error)) .ok() .unwrap(); - for line in lines { + for packet in packets { assert_eq!( socket - .send_to(line.as_bytes(), addr) + .send_to(&packet, addr) .map_err(|error| panic!("{:}", error)) .ok() .unwrap(), - line.as_bytes().len() + packet.len() ); // Space things out slightly to try to avoid dropped packets thread::sleep(Duration::from_millis(1)); @@ -1059,10 +1108,12 @@ mod test { let address = next_addr(); let mut config = UdpConfig::from_address(address.into()); config.max_length = 10; - config.framing = CharacterDelimitedDecoderConfig { - character_delimited: CharacterDelimitedDecoderOptions::new(b',', None), - } - .into(); + config.framing = Some( + CharacterDelimitedDecoderConfig { + character_delimited: CharacterDelimitedDecoderOptions::new(b',', None), + } + .into(), + ); let address = init_udp_with_config(tx, config).await; send_lines_udp( @@ -1083,6 +1134,40 @@ mod test { .await; } + #[tokio::test] + async fn udp_decodes_chunked_gelf_messages() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, rx) = SourceSender::new_test(); + let address = next_addr(); + let mut config = UdpConfig::from_address(address.into()); + config.decoding = GelfDeserializerConfig::default().into(); + let address = init_udp_with_config(tx, config).await; + let seed = 42; + let mut rng = SmallRng::seed_from_u64(seed); + let max_size = 300; + let big_message = "This is a very large message".repeat(500); + let another_big_message = "This is another very large message".repeat(500); + let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng); + let mut another_chunks = + get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng); + chunks.append(&mut another_chunks); + chunks.shuffle(&mut rng); + + send_packets_udp(address, chunks); + + let events = collect_n(rx, 2).await; + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + big_message.into() + ); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + another_big_message.into() + ); + }) + .await; + } + #[tokio::test] async fn udp_it_includes_host() { assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { @@ -1220,11 +1305,35 @@ mod test { } ////////////// UNIX TEST LIBS ////////////// + #[cfg(unix)] async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf { - let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test"); + init_unix_inner(sender, stream, use_vector_namespace, None).await + } + + #[cfg(unix)] + async fn init_unix_with_config( + sender: SourceSender, + stream: bool, + use_vector_namespace: bool, + config: UnixConfig, + ) -> PathBuf { + init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await + } + + #[cfg(unix)] + async fn init_unix_inner( + sender: SourceSender, + stream: bool, + use_vector_namespace: bool, + config: Option, + ) -> PathBuf { + let mut config = config.unwrap_or_else(|| { + UnixConfig::new(tempfile::tempdir().unwrap().into_path().join("unix_test")) + }); + + let in_path = config.path.clone(); - let mut config = UnixConfig::new(in_path.clone()); if use_vector_namespace { config.log_namespace = Some(true); } @@ -1234,6 +1343,7 @@ mod test { } else { Mode::UnixDatagram(config) }; + let server = SocketConfig { mode } .build(SourceContext::new_test(sender, None)) .await @@ -1323,11 +1433,17 @@ mod test { ////////////// UNIX DATAGRAM TESTS ////////////// #[cfg(unix)] async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) { + let packets = lines.iter().map(|line| Bytes::from(line.to_string())); + send_packets_unix_datagram(path, packets).await; + } + + #[cfg(unix)] + async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator) { let socket = UnixDatagram::unbound().unwrap(); socket.connect(path).unwrap(); - for line in lines { - socket.send(line.as_bytes()).await.unwrap(); + for packet in packets { + socket.send(&packet).await.unwrap(); } socket.shutdown(std::net::Shutdown::Both).unwrap(); } @@ -1399,6 +1515,41 @@ mod test { assert_eq!(dgram, bytes); } + #[cfg(unix)] + #[tokio::test] + async fn unix_datagram_chunked_gelf_messages() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, rx) = SourceSender::new_test(); + let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test"); + let mut config = UnixConfig::new(in_path.clone()); + config.decoding = GelfDeserializerConfig::default().into(); + let path = init_unix_with_config(tx, false, false, config).await; + let seed = 42; + let mut rng = SmallRng::seed_from_u64(seed); + let max_size = 20; + let big_message = "This is a very large message".repeat(5); + let another_big_message = "This is another very large message".repeat(5); + let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng); + let mut another_chunks = + get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng); + chunks.append(&mut another_chunks); + chunks.shuffle(&mut rng); + + send_packets_unix_datagram(path, chunks).await; + + let events = collect_n(rx, 2).await; + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + big_message.into() + ); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + another_big_message.into() + ); + }) + .await; + } + #[cfg(unix)] #[tokio::test] async fn unix_datagram_message_with_vector_namespace() { diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 5fb646ab82a95..a1b39c2fbcd33 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -23,7 +23,7 @@ use crate::{ SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError, }, net, - serde::{default_decoding, default_framing_message_based}, + serde::default_decoding, shutdown::ShutdownSignal, sources::{ socket::SocketConfig, @@ -74,12 +74,11 @@ pub struct UdpConfig { receive_buffer_bytes: Option, #[configurable(derived)] - #[serde(default = "default_framing_message_based")] - pub(super) framing: FramingConfig, + pub(super) framing: Option, #[configurable(derived)] #[serde(default = "default_decoding")] - decoding: DeserializerConfig, + pub(super) decoding: DeserializerConfig, /// The namespace to use for logs. This overrides the global setting. #[serde(default)] @@ -104,7 +103,7 @@ impl UdpConfig { &self.port_key } - pub(super) const fn framing(&self) -> &FramingConfig { + pub(super) const fn framing(&self) -> &Option { &self.framing } @@ -123,7 +122,7 @@ impl UdpConfig { host_key: None, port_key: default_port_key(), receive_buffer_bytes: None, - framing: default_framing_message_based(), + framing: None, decoding: default_decoding(), log_namespace: None, } @@ -168,7 +167,6 @@ pub(super) fn udp( let bytes_received = register!(BytesReceived::from(Protocol::UDP)); info!(message = "Listening.", address = %config.address); - // We add 1 to the max_length in order to determine if the received data has been truncated. let mut buf = BytesMut::with_capacity(max_length + 1); loop { @@ -199,10 +197,8 @@ pub(super) fn udp( }; bytes_received.emit(ByteSize(byte_size)); - let payload = buf.split_to(byte_size); let truncated = byte_size == max_length + 1; - let mut stream = FramedRead::new(payload.as_ref(), decoder.clone()).peekable(); while let Some(result) = stream.next().await { diff --git a/website/cue/reference/components/sources/base/amqp.cue b/website/cue/reference/components/sources/base/amqp.cue index d12ef5c2ae03d..efd8675348c6e 100644 --- a/website/cue/reference/components/sources/base/amqp.cue +++ b/website/cue/reference/components/sources/base/amqp.cue @@ -330,6 +330,58 @@ base: components: sources: amqp: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -365,8 +417,13 @@ base: components: sources: amqp: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue index 6df9221791960..ef112080255c4 100644 --- a/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue @@ -328,6 +328,58 @@ base: components: sources: aws_kinesis_firehose: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -363,8 +415,13 @@ base: components: sources: aws_kinesis_firehose: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/aws_s3.cue b/website/cue/reference/components/sources/base/aws_s3.cue index f47ddf8a87eac..feffca6a37193 100644 --- a/website/cue/reference/components/sources/base/aws_s3.cue +++ b/website/cue/reference/components/sources/base/aws_s3.cue @@ -428,6 +428,58 @@ base: components: sources: aws_s3: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -463,8 +515,13 @@ base: components: sources: aws_s3: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/aws_sqs.cue b/website/cue/reference/components/sources/base/aws_sqs.cue index bc5dc30aab9a8..28a58f46e7aef 100644 --- a/website/cue/reference/components/sources/base/aws_sqs.cue +++ b/website/cue/reference/components/sources/base/aws_sqs.cue @@ -432,6 +432,58 @@ base: components: sources: aws_sqs: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -467,8 +519,13 @@ base: components: sources: aws_sqs: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/datadog_agent.cue b/website/cue/reference/components/sources/base/datadog_agent.cue index 4877e76a20756..64dd7f7fb232f 100644 --- a/website/cue/reference/components/sources/base/datadog_agent.cue +++ b/website/cue/reference/components/sources/base/datadog_agent.cue @@ -325,6 +325,58 @@ base: components: sources: datadog_agent: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -360,8 +412,13 @@ base: components: sources: datadog_agent: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/demo_logs.cue b/website/cue/reference/components/sources/base/demo_logs.cue index 99c0f3c62ade3..4660526ecef0a 100644 --- a/website/cue/reference/components/sources/base/demo_logs.cue +++ b/website/cue/reference/components/sources/base/demo_logs.cue @@ -321,6 +321,58 @@ base: components: sources: demo_logs: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -356,8 +408,13 @@ base: components: sources: demo_logs: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/exec.cue b/website/cue/reference/components/sources/base/exec.cue index eb35bb8826222..61857b7d28a87 100644 --- a/website/cue/reference/components/sources/base/exec.cue +++ b/website/cue/reference/components/sources/base/exec.cue @@ -309,6 +309,58 @@ base: components: sources: exec: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -342,8 +394,13 @@ base: components: sources: exec: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/file_descriptor.cue b/website/cue/reference/components/sources/base/file_descriptor.cue index 84941688d4cac..b26a2af12b18c 100644 --- a/website/cue/reference/components/sources/base/file_descriptor.cue +++ b/website/cue/reference/components/sources/base/file_descriptor.cue @@ -287,6 +287,58 @@ base: components: sources: file_descriptor: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -320,8 +372,13 @@ base: components: sources: file_descriptor: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/gcp_pubsub.cue b/website/cue/reference/components/sources/base/gcp_pubsub.cue index 15059be769f5d..cd02efc81f8f7 100644 --- a/website/cue/reference/components/sources/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sources/base/gcp_pubsub.cue @@ -364,6 +364,58 @@ base: components: sources: gcp_pubsub: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -399,8 +451,13 @@ base: components: sources: gcp_pubsub: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/heroku_logs.cue b/website/cue/reference/components/sources/base/heroku_logs.cue index 1d556a2dd0392..ff45fbc204286 100644 --- a/website/cue/reference/components/sources/base/heroku_logs.cue +++ b/website/cue/reference/components/sources/base/heroku_logs.cue @@ -322,6 +322,58 @@ base: components: sources: heroku_logs: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -357,8 +409,13 @@ base: components: sources: heroku_logs: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/http.cue b/website/cue/reference/components/sources/base/http.cue index 380be1998be98..fcc7a7396b61a 100644 --- a/website/cue/reference/components/sources/base/http.cue +++ b/website/cue/reference/components/sources/base/http.cue @@ -337,6 +337,58 @@ base: components: sources: http: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -370,8 +422,13 @@ base: components: sources: http: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/http_client.cue b/website/cue/reference/components/sources/base/http_client.cue index edda8ad20cfd8..8282ea348a16e 100644 --- a/website/cue/reference/components/sources/base/http_client.cue +++ b/website/cue/reference/components/sources/base/http_client.cue @@ -325,6 +325,58 @@ base: components: sources: http_client: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -360,8 +412,13 @@ base: components: sources: http_client: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/http_server.cue b/website/cue/reference/components/sources/base/http_server.cue index 1db251b62a950..38e9658082e2b 100644 --- a/website/cue/reference/components/sources/base/http_server.cue +++ b/website/cue/reference/components/sources/base/http_server.cue @@ -337,6 +337,58 @@ base: components: sources: http_server: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -370,8 +422,13 @@ base: components: sources: http_server: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/kafka.cue b/website/cue/reference/components/sources/base/kafka.cue index 19dc44e6711bd..a23a8a2ec9ae0 100644 --- a/website/cue/reference/components/sources/base/kafka.cue +++ b/website/cue/reference/components/sources/base/kafka.cue @@ -358,6 +358,58 @@ base: components: sources: kafka: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -393,8 +445,13 @@ base: components: sources: kafka: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index 5c1346875842f..6e1c0e5fa8685 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -377,6 +377,58 @@ base: components: sources: nats: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -412,8 +464,13 @@ base: components: sources: nats: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index 7eb0cd6795116..d3d5b5f03b1ed 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -388,6 +388,58 @@ base: components: sources: pulsar: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -423,8 +475,13 @@ base: components: sources: pulsar: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/redis.cue b/website/cue/reference/components/sources/base/redis.cue index 05a73fbc1538e..f4d5599370b8b 100644 --- a/website/cue/reference/components/sources/base/redis.cue +++ b/website/cue/reference/components/sources/base/redis.cue @@ -295,6 +295,58 @@ base: components: sources: redis: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -330,8 +382,13 @@ base: components: sources: redis: configuration: { enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/socket.cue b/website/cue/reference/components/sources/base/socket.cue index 5416a40d78184..c11046ac2a13f 100644 --- a/website/cue/reference/components/sources/base/socket.cue +++ b/website/cue/reference/components/sources/base/socket.cue @@ -297,6 +297,58 @@ base: components: sources: socket: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -330,8 +382,13 @@ base: components: sources: socket: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format. diff --git a/website/cue/reference/components/sources/base/stdin.cue b/website/cue/reference/components/sources/base/stdin.cue index 3079d25a3c91b..bc706702a5aef 100644 --- a/website/cue/reference/components/sources/base/stdin.cue +++ b/website/cue/reference/components/sources/base/stdin.cue @@ -280,6 +280,58 @@ base: components: sources: stdin: configuration: { } } } + chunked_gelf: { + description: "Options for the chunked GELF decoder." + relevant_when: "method = \"chunked_gelf\"" + required: false + type: object: options: { + max_chunk_length: { + description: """ + The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of chunks and + the per-chunk memory is unbounded. + + This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation. + """ + required: false + type: uint: {} + } + max_message_length: { + description: """ + The maximum length of a single GELF message, in bytes. Messages longer than this length will + be dropped. If this option is not set, the decoder does not limit the length of messages and + the per-message memory is unbounded. + + Note that a message can be composed of multiple chunks and this limit is applied to the whole + message, not to individual chunks. This length should always be greater than the `max_chunk_length`. + This option is useful to limit the memory usage of the decoders's chunk buffer. + + This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation. + The message's payload is the concatenation of all the chunks' payloads. + """ + required: false + type: uint: {} + } + pending_messages_limit: { + description: """ + The maximum number of pending incomplete messages. If this limit is reached, the decoder starts + dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded. + If this option is not set, the decoder does not limit the number of pending messages and the memory usage + of its messages buffer can grow unbounded. This matches Graylog Server's behavior. + """ + required: false + type: uint: {} + } + timeout_secs: { + description: """ + The timeout, in seconds, for a message to be fully received. If the timeout is reached, the + decoder drops all the received chunks of the timed out message. + """ + required: false + type: float: default: 5.0 + } + } + } length_delimited: { description: "Options for the length delimited decoder." relevant_when: "method = \"length_delimited\"" @@ -313,8 +365,13 @@ base: components: sources: stdin: configuration: { type: string: enum: { bytes: "Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments)." character_delimited: "Byte frames which are delimited by a chosen character." - length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." - newline_delimited: "Byte frames which are delimited by a newline character." + chunked_gelf: """ + Byte frames which are chunked GELF messages. + + [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html + """ + length_delimited: "Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length." + newline_delimited: "Byte frames which are delimited by a newline character." octet_counting: """ Byte frames according to the [octet counting][octet_counting] format.