Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decoding): Implement chunked GELF decoding #20859

Open
wants to merge 67 commits into
base: master
Choose a base branch
from

Conversation

jorgehermo9
Copy link
Contributor

@jorgehermo9 jorgehermo9 commented Jul 15, 2024

Closes #20769. This PR is kind of large(900 lines of code, 600 are from generated docs), if your prefer to chat via discord (and in order to be more agile while merging this), I'm in the vector community server, username @jorgehermo9.

Implementation is based on Graylog's documentation and Graylog's go-gelf library

In my local environment some tests are failing. Could you please trigger the CI so I can see if it is a problem of my environment and if not, I can proceed to fix them?

@jorgehermo9 jorgehermo9 requested a review from a team as a code owner July 15, 2024 09:09
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Jul 15, 2024
Cargo.toml Outdated
@@ -139,6 +139,7 @@ serde_json = { version = "1.0.120", default-features = false, features = ["raw_v
serde = { version = "1.0.204", default-features = false, features = ["alloc", "derive", "rc"] }
toml = { version = "0.8.14", default-features = false, features = ["display", "parse"] }
vrl = { version = "0.16.1", features = ["arbitrary", "cli", "test", "test_framework"] }
tokio = { version = "1.38.0", default-features = false, features = ["full"] }
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to use tokio inside the lib/codecs crate, in order to implement gelf decoding timeouts with tokio tasks, so I added the dependency as a workspace one. If there is any problem with this, we may find another solution

[const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize];
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000;
// TODO: ask what would be an appropriate default value for this
const DEFAULT_PENDING_MESSAGES_LIMIT: usize = 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what value is appropriate here. Do you have any recommendation?

This limit was enforced so we have a memory-bounded decoder.

The maximum UDP packet size is 65536 bytes... so with this limit, I think we have roughly 65MB of memory limit for pending messages storage.

However, the framing is agnostic of the transport protocol, so maybe other protocols does not have that per-message size limit and thus this can be "theoretically unbounded" (for example, reading raw bytes from a file).

Should we enforce too a per-message limit such as

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping the reference implementation would serve as prior art here, but https://github.com/Graylog2/graylog2-server/blob/3c7f9df250f7d58d99e9c554d9307dc1eec9fdac/graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java seems like they have no pending message limit, just the timeout of 5 seconds as you have. I think I'd suggest having this as an option for people that do want to bound the memory, but default to unlimited to match Graylog server behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in 85edb00. Feel free to resolve this thread if the change is what you expected

pub timeout_millis: u64,

/// The maximum number of pending uncomplete messages. If this limit is reached, the decoder will start
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded.
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bound is per total messages, but there is no per-message memory usage limit. We can theoretically have a 100GB single message and it won't be limited by this setting.

As stated before, should we include a per-message limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having a configurable bound on the number of pending messages.

The chunked encoding is only used for UDP, yes? Shouldn't that provide a defacto bound on size? How can we have a 100 GB message?

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunked encoding is only used for UDP, yes?

Yes, it is intended to use it only for UDP and therefore it would be limited by the UDP packets limit of 65KB.
Nevertheless, as the chunked_gelf is a framing method, nothing blocks user to use that method with other types of sources, for example, with file sources and explictly stating the config framing.method="chunked-gelf". Although, it really does not have sense to use that framing method outside of UDP socket sources, and no one will use that in real environments... So maybe it is ok to leave this as it is.

Copy link
Member

@jszwedko jszwedko Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a max_length option. This would be consistent with other framers: https://vector.dev/docs/reference/configuration/sources/socket/#framing.newline_delimited.max_length

In chunked_gelf's case, I think we'd want to limit the length of the accumulated chunks in addition to each individual chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the nature of gelf's messages, as they are just json, I don't think it would be fine to just truncate the input, as the json message would be most likely broken and the GELF deserialization would fail in nearly all cases after truncating.

Should we instead discard the whole message (including previously stored cunks) if an individual chunk reaches its defined limit or the accumulated chunks limit is reached? I don't know if its worth to do this, but I'm open to implement it if you see that it would be worth

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, I think we'd want to discard messages that exceed the limit (in the future we can route them to a "dead letter" output). This is consistent with the other framers (see example:

warn!(
message = "Discarding frame larger than max_length.",
buf_len = buf.len(),
max_length = self.max_length,
internal_log_rate_limit = true
);
).

I think it'd be worth it to remove a DOS risk if it isn't too much effort to add.

@jorgehermo9 jorgehermo9 requested a review from a team as a code owner October 18, 2024 15:32
&mut self,
mut chunk: Bytes,
) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
// Encoding scheme:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to record a metric for tracking the number of current pending messages. How can we approach this? I think this is usually done at Vector's binary level and not in inner libs... as I see a lot of metrics located at src/internal_events.

Also, it would be useful to record the number of timed out messages

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be useful telemetry. Unfortunately I don't think there is a precedent for codecs emitting metrics, so this may require a bit of scaffolding if you want to add it. I don't think we want to emit telemetry directly from the codec crate so it'd look something like exposing metrics that then the caller (Vector) can emit.

I think this could mean adding a trait method to Framer,

pub trait Framer:
tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError> + DynClone + Debug + Send + Sync
{
}
/// Default implementation for `Framer`s that implement
/// `tokio_util::codec::Decoder`.
impl<Decoder> Framer for Decoder where
Decoder: tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError>
+ Clone
+ Debug
+ Send
+ Sync
{
}
, that exposes telemetry that the caller than polls and emits; or maybe it takes a callback that is called when the codec wants to emit telemetry (example from the kafka source)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, yes, we generally try to have only internal_events actually emit metrics to centralize it all in one place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think emitting metrics from this point would require a refactor that in my opinion is slightly off from this PR. Maybe we can create an issue to track metrics in decoders in general.

I don't mind to address it myself as I found it interesting, but I prefer to narrow the scope of this PR

}

#[tokio::test]
async fn decode_shuffled_messages() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

included this new test in 2ab9bf3

@jorgehermo9
Copy link
Contributor Author

Hi @jszwedko, thank you very much for the review, addressed all the comments and I'm ready for another review round.

Things left:

Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jorgehermo9 ! I left a couple more comments. I think this is looking pretty close.

// This limitation is due to the fact that the GELF format does not specify the length of the
// message, so we have to read all the bytes from the message (datagram)
bytes_decoder: BytesDecoder,
state: Arc<Mutex<HashMap<u64, MessageState>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a performance optimization, but we could think about using dashmap here to reduce lock contention. As it stands, it seems like only one packet can be processed at a time due to the locking, which may limit throughput.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the udp socket source does not parallelize and the whole process of reading from the socket -> framing -> decoding is entirely sequential from the udp socket's perspective.

I may be wrong but I think

recv = socket.recv_from(&mut buf) => {
is purely sequential.

So I think the lock would only compete with the message timeout acquires, but in that case, if there are a lot of concurrent timeouts, you are right that dashmap couls help

Anyway, I think it would be useful for this last case, but I'd like if you can confirm the first part about the socket's sequential reads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented it with dashmap in jorgehermo9@3e5f57f at another branch (https://github.com/jorgehermo9/vector/tree/feature/chunked-gelf-dashmap) and it seems that my implementation with it causes deadlocks and several tests fails
image

I'm concerned about the usage of the entry and remove methods, which seems to be the cause of the deadlocks. The message_state reference that is returned by the entry method lives until the end of the scope, but we need to call remove in some branches before returning in the decode_chunk method, so I think thats the cause of the deadlock.

I don't know how to model what I'm doing with DashMap

#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub pending_messages_limit: Option<usize>,

/// The maximum length of a single GELF chunk, in bytes. Chunks longer than this length will
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please @jszwedko take a look into the max_chunk_length and max_message_length documentation. I hope it is well explained

chunk_length: usize,
max_chunk_length: usize,
},
#[snafu(display("Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {message_length} bytes and max message length is {max_message_length} bytes. Discarding all buffered chunks of that message"))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jszwedko Note that the discarding notice to the user is done inside this error. It will be logged outside of this framer when the MaxMessageLengthExceede error is thrown. This is a bit different from what you depicted about

warn!(
message = "Discarding frame larger than max_length.",
buf_len = buf.len(),
max_length = self.max_length,
internal_log_rate_limit = true
);
where the message is silently discarded and no error is returned to the caller.
What do you prefer? To add a discarding warning or an error? In my opinion, this is not really an error and fits more a warning, but as data is discarded, you said in previous comments that we should return an error in those cases.

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Oct 25, 2024

Hi @jszwedko, thank you for your review. Addressed the max_chunk_length and max_message_length in 2139ae4. I also refactored the tests' assertions a little, take a look into that too please.

I have still pending to answer to the metric emitting comment and the DashMap one. I think using DashMap won't hurt and it will be easy to do, but I wonder if we would really see a performance improvement.

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Oct 25, 2024

I just noticed that I missed to do a max_message_length test. Will do it as soon as I can, but it will be very similar to the max_chunk_length one

EDIT: addressed in f4630e9

///
/// This limit takes only into account the chunk's payload and the GELF header bytes are excluded from the calculation.
#[serde(skip_serializing_if = "vector_core::serde::is_default")]
pub max_chunk_length: Option<usize>,
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Oct 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use no limit by default (None) Or use the max data (payload) size of UDP datagrams of 65507 bytes (65535 bytes max UDP size − 8-byte UDP header − 20-byte IP header source)?

(actually, we should use 65507 - 10 , because for this chunk length count we exclude the 10 bytes of the chunked gelf header)

/// This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation.
/// The message's payload is the concatenation of all the chunks' payloads.
#[serde(skip_serializing_if = "vector_core::serde::is_default")]
pub max_message_length: Option<usize>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, should we use None by default or 65,535 * 128 bytes, which is the max UDP datagram size times the maximum number of chunks.

Both of those defaults would work (theoretically) the same for the UDP socket source.


if let Some(max_chunk_length) = self.max_chunk_length {
let chunk_length = chunk.remaining();
ensure!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the state of the message id of this chunk?

I don't know if it makes sense to keep the buffered chunks if we discars one, as the message would never complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also do this for the InvalidTotalChunks error for example, or in any other case..?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Chunked GELF Decoding
3 participants