diff --git a/Cargo.lock b/Cargo.lock index 19915969d2622..928a53086d011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6077,6 +6077,57 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" +[[package]] +name = "netlink-packet-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4" +dependencies = [ + "anyhow", + "byteorder", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-sock-diag" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a495cb1de50560a7cd12fdcf023db70eec00e340df81be31cedbbfd4aadd6b76" +dependencies = [ + "anyhow", + "bitflags 1.3.2", + "byteorder", + "libc", + "netlink-packet-core", + "netlink-packet-utils", + "smallvec", +] + +[[package]] +name = "netlink-packet-utils" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34" +dependencies = [ + "anyhow", + "byteorder", + "paste", + "thiserror 1.0.68", +] + +[[package]] +name = "netlink-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c903aa70590cb93691bf97a767c8d1d6122d2cc9070433deb3bbf36ce8bd23" +dependencies = [ + "bytes 1.9.0", + "futures 0.3.31", + "libc", + "log", + "tokio", +] + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -10830,6 +10881,7 @@ dependencies = [ "base64 0.22.1", "bloomy", "bollard", + "byteorder", "bytes 1.9.0", "bytesize", "chrono", @@ -10890,6 +10942,10 @@ dependencies = [ "metrics-tracing-context", "mlua", "mongodb", + "netlink-packet-core", + "netlink-packet-sock-diag", + "netlink-packet-utils", + "netlink-sys", "nix 0.26.2", "nkeys 0.4.4", "nom", diff --git a/Cargo.toml b/Cargo.toml index 97e97495695ab..faf8f8eba5048 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -384,6 +384,7 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix" # make sure to update the external docs when the Lua version changes mlua = { version = "0.10.2", default-features = false, features = ["lua54", "send", "vendored", "macros"], optional = true } sysinfo = "0.32.1" +byteorder = "1.5.0" [target.'cfg(windows)'.dependencies] windows-service = "0.7.0" @@ -391,6 +392,12 @@ windows-service = "0.7.0" [target.'cfg(unix)'.dependencies] nix = { version = "0.26.2", default-features = false, features = ["socket", "signal"] } +[target.'cfg(target_os = "linux")'.dependencies] +netlink-packet-utils = "0.5.2" +netlink-packet-sock-diag = "0.4.2" +netlink-packet-core = "0.7.0" +netlink-sys = { version = "0.8.7", features = ["tokio_socket"] } + [build-dependencies] prost-build = { workspace = true, optional = true } tonic-build = { workspace = true, optional = true } diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 555c465625d3b..5527be948160c 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -385,6 +385,10 @@ mongodb,https://github.com/mongodb/mongo-rust-driver,Apache-2.0,"Saghm Rossi native-tls,https://github.com/sfackler/rust-native-tls,MIT OR Apache-2.0,Steven Fackler ndk-context,https://github.com/rust-windowing/android-ndk-rs,MIT OR Apache-2.0,The Rust Windowing contributors +netlink-packet-core,https://github.com/rust-netlink/netlink-packet-core,MIT,Corentin Henry +netlink-packet-sock-diag,https://github.com/rust-netlink/netlink-packet-sock-diag,MIT,"Flier Lu , Corentin Henry " +netlink-packet-utils,https://github.com/rust-netlink/netlink-packet-utils,MIT,Corentin Henry +netlink-sys,https://github.com/rust-netlink/netlink-sys,MIT,Corentin Henry nibble_vec,https://github.com/michaelsproul/rust_nibble_vec,MIT,Michael Sproul nix,https://github.com/nix-rust/nix,MIT,The nix-rust Project Developers nkeys,https://github.com/wasmcloud/nkeys,Apache-2.0,wasmCloud Team diff --git a/changelog.d/21972-add-tcp-collector-host-metrics.feature.md b/changelog.d/21972-add-tcp-collector-host-metrics.feature.md new file mode 100644 index 0000000000000..04eab2b91697b --- /dev/null +++ b/changelog.d/21972-add-tcp-collector-host-metrics.feature.md @@ -0,0 +1,14 @@ +The `host_metrics` source has a new collector, `tcp`. The `tcp` +collector exposes three metrics related to the TCP stack of the +system: + +* `tcp_connections_total`: The total number of TCP connections. It + includes the `state` of the connection as a tag. +* `tcp_tx_queued_bytes_total`: The sum of the number of bytes in the + send queue across all connections. +* `tcp_rx_queued_bytes_total`: The sum of the number of bytes in the + receive queue across all connections. + +This collector is enabled only on Linux systems. + +authors: aryan9600 diff --git a/license-tool.toml b/license-tool.toml index 92a78d9d30724..70e0ed9a741e5 100644 --- a/license-tool.toml +++ b/license-tool.toml @@ -11,6 +11,7 @@ # `ring` has a custom license that is mostly "ISC-style" but parts of it also fall under OpenSSL licensing. "ring-0.16.20" = { license = "ISC AND Custom" } "ring-0.17.5" = { license = "ISC AND Custom" } +"ring-0.17.8" = { license = "ISC AND Custom" } # `rustls-webpki` doesn't specify their license in the metadata, but the file contains the ISC terms. "rustls-webpki-0.100.1" = { license = "ISC" } diff --git a/src/api/schema/metrics/host.rs b/src/api/schema/metrics/host.rs index 4b0512e7f9287..6a814b1c5f311 100644 --- a/src/api/schema/metrics/host.rs +++ b/src/api/schema/metrics/host.rs @@ -259,6 +259,26 @@ impl DiskMetrics { } } +pub struct TCPMetrics(Vec); + +#[Object] +impl TCPMetrics { + /// Total TCP connections + async fn tcp_conns_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_connections_total") + } + + /// Total bytes in the send queue across all connections. + async fn tcp_tx_queued_bytes_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_tx_queued_bytes_total") + } + + /// Total bytes in the receive queue across all connections. + async fn tcp_rx_queued_bytes_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_rx_queued_bytes_total") + } +} + pub struct HostMetrics(host_metrics::HostMetrics); impl HostMetrics { @@ -324,6 +344,14 @@ impl HostMetrics { self.0.disk_metrics(&mut buffer).await; DiskMetrics(buffer.metrics) } + + #[cfg(target_os = "linux")] + /// TCP metrics + async fn tcp(&self) -> TCPMetrics { + let mut buffer = self.0.buffer(); + self.0.tcp_metrics(&mut buffer).await; + TCPMetrics(buffer.metrics) + } } /// Filters a [`Vec`] by name, returning the inner `value` or 0.00 if not found diff --git a/src/sources/host_metrics/mod.rs b/src/sources/host_metrics/mod.rs index 601862d9fad25..abdc8da7e8ce3 100644 --- a/src/sources/host_metrics/mod.rs +++ b/src/sources/host_metrics/mod.rs @@ -35,6 +35,8 @@ mod filesystem; mod memory; mod network; mod process; +#[cfg(target_os = "linux")] +mod tcp; /// Collector types. #[serde_as] @@ -70,6 +72,9 @@ pub enum Collector { /// Metrics related to network utilization. Network, + + /// Metrics related to TCP connections. + TCP, } /// Filtering configuration. @@ -178,7 +183,7 @@ pub fn default_namespace() -> Option { Some(String::from("host")) } -const fn example_collectors() -> [&'static str; 8] { +const fn example_collectors() -> [&'static str; 9] { [ "cgroups", "cpu", @@ -188,6 +193,7 @@ const fn example_collectors() -> [&'static str; 8] { "host", "memory", "network", + "tcp", ] } @@ -206,10 +212,12 @@ fn default_collectors() -> Option> { #[cfg(target_os = "linux")] { collectors.push(Collector::CGroups); + collectors.push(Collector::TCP); } #[cfg(not(target_os = "linux"))] if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() { collectors.push(Collector::CGroups); + collectors.push(Collector::TCP); } Some(collectors) @@ -284,6 +292,9 @@ impl SourceConfig for HostMetricsConfig { if self.cgroups.is_some() || self.has_collector(Collector::CGroups) { return Err("CGroups collector is only available on Linux systems".into()); } + if self.has_collector(Collector::TCP) { + return Err("TCP collector is only available on Linux systems".into()); + } } let mut config = self.clone(); @@ -399,6 +410,10 @@ impl HostMetrics { if self.config.has_collector(Collector::Network) { self.network_metrics(&mut buffer).await; } + #[cfg(target_os = "linux")] + if self.config.has_collector(Collector::TCP) { + self.tcp_metrics(&mut buffer).await; + } let metrics = buffer.metrics; self.events_received.emit(CountByteSize( diff --git a/src/sources/host_metrics/tcp.rs b/src/sources/host_metrics/tcp.rs new file mode 100644 index 0000000000000..67de25036db96 --- /dev/null +++ b/src/sources/host_metrics/tcp.rs @@ -0,0 +1,347 @@ +use crate::sources::host_metrics::HostMetricsScrapeDetailError; +use byteorder::{ByteOrder, NativeEndian}; +use std::{collections::HashMap, io, path::Path}; +use vector_lib::event::MetricTags; + +use netlink_packet_core::{ + NetlinkHeader, NetlinkMessage, NetlinkPayload, NLM_F_ACK, NLM_F_DUMP, NLM_F_REQUEST, +}; +use netlink_packet_sock_diag::{ + constants::*, + inet::{ExtensionFlags, InetRequest, InetResponseHeader, SocketId, StateFlags}, + SockDiagMessage, +}; +use netlink_sys::{ + protocols::NETLINK_SOCK_DIAG, AsyncSocket, AsyncSocketExt, SocketAddr, TokioSocket, +}; +use snafu::{ResultExt, Snafu}; + +use super::HostMetrics; + +const PROC_IPV6_FILE: &str = "/proc/net/if_inet6"; +const TCP_CONNS_TOTAL: &str = "tcp_connections_total"; +const TCP_TX_QUEUED_BYTES_TOTAL: &str = "tcp_tx_queued_bytes_total"; +const TCP_RX_QUEUED_BYTES_TOTAL: &str = "tcp_rx_queued_bytes_total"; +const STATE: &str = "state"; + +impl HostMetrics { + pub async fn tcp_metrics(&self, output: &mut super::MetricsBuffer) { + match build_tcp_stats().await { + Ok(stats) => { + output.name = "tcp"; + for (state, count) in stats.conn_states { + let tags = metric_tags! { + STATE => state + }; + output.gauge(TCP_CONNS_TOTAL, count, tags); + } + + output.gauge( + TCP_TX_QUEUED_BYTES_TOTAL, + stats.tx_queued_bytes, + MetricTags::default(), + ); + output.gauge( + TCP_RX_QUEUED_BYTES_TOTAL, + stats.rx_queued_bytes, + MetricTags::default(), + ); + } + Err(error) => { + emit!(HostMetricsScrapeDetailError { + message: "Failed to load tcp connection info.", + error, + }); + } + } + } +} + +#[derive(Debug, Snafu)] +enum TcpError { + #[snafu(display("Could not open new netlink socket: {}", source))] + NetlinkSocket { source: io::Error }, + #[snafu(display("Could not send netlink message: {}", source))] + NetlinkSend { source: io::Error }, + #[snafu(display("Could not parse netlink response: {}", source))] + NetlinkParse { + source: netlink_packet_utils::DecodeError, + }, + #[snafu(display("Could not recognize TCP state {state}"))] + InvalidTcpState { state: u8 }, + #[snafu(display("Received an error message from netlink; code: {code}"))] + NetlinkMsgError { code: i32 }, +} + +#[repr(u8)] +enum TcpState { + Established = 1, + SynSent = 2, + SynRecv = 3, + FinWait1 = 4, + FinWait2 = 5, + TimeWait = 6, + Close = 7, + CloseWait = 8, + LastAck = 9, + Listen = 10, + Closing = 11, +} + +impl From for String { + fn from(val: TcpState) -> Self { + match val { + TcpState::Established => "established".into(), + TcpState::SynSent => "syn_sent".into(), + TcpState::SynRecv => "syn_recv".into(), + TcpState::FinWait1 => "fin_wait1".into(), + TcpState::FinWait2 => "fin_wait2".into(), + TcpState::TimeWait => "time_wait".into(), + TcpState::Close => "close".into(), + TcpState::CloseWait => "close_wait".into(), + TcpState::LastAck => "last_ack".into(), + TcpState::Listen => "listen".into(), + TcpState::Closing => "closing".into(), + } + } +} + +impl TryFrom for TcpState { + type Error = TcpError; + + fn try_from(value: u8) -> Result { + match value { + 1 => Ok(TcpState::Established), + 2 => Ok(TcpState::SynSent), + 3 => Ok(TcpState::SynRecv), + 4 => Ok(TcpState::FinWait1), + 5 => Ok(TcpState::FinWait2), + 6 => Ok(TcpState::TimeWait), + 7 => Ok(TcpState::Close), + 8 => Ok(TcpState::CloseWait), + 9 => Ok(TcpState::LastAck), + 10 => Ok(TcpState::Listen), + 11 => Ok(TcpState::Closing), + _ => Err(TcpError::InvalidTcpState { state: value }), + } + } +} + +#[derive(Debug, Default)] +struct TcpStats { + conn_states: HashMap, + rx_queued_bytes: f64, + tx_queued_bytes: f64, +} + +async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result, TcpError> { + let unicast_socket: SocketAddr = SocketAddr::new(0, 0); + let mut socket = TokioSocket::new(NETLINK_SOCK_DIAG).context(NetlinkSocketSnafu)?; + + let mut inet_req = InetRequest { + family: addr_family, + protocol: IPPROTO_TCP, + extensions: ExtensionFlags::INFO, + states: StateFlags::all(), + socket_id: SocketId::new_v4(), + }; + if addr_family == AF_INET6 { + inet_req.socket_id = SocketId::new_v6(); + } + + let mut hdr = NetlinkHeader::default(); + hdr.flags = NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP; + let mut msg = NetlinkMessage::new(hdr, SockDiagMessage::InetRequest(inet_req).into()); + msg.finalize(); + + let mut buf = vec![0; msg.header.length as usize]; + msg.serialize(&mut buf[..]); + + socket + .send_to(&buf[..msg.buffer_len()], &unicast_socket) + .await + .context(NetlinkSendSnafu)?; + + let mut receive_buffer = vec![0; 4096]; + let mut inet_resp_hdrs: Vec = Vec::new(); + 'outer: while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await { + let mut offset = 0; + 'inner: loop { + let bytes = &receive_buffer[offset..]; + let length = NativeEndian::read_u32(&bytes[0..4]) as usize; + if length == 0 { + break 'inner; + } + let rx_packet = + >::deserialize(bytes).context(NetlinkParseSnafu)?; + + match rx_packet.payload { + NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => { + inet_resp_hdrs.push(response.header); + } + NetlinkPayload::Done(_) => { + break 'outer; + } + NetlinkPayload::Error(error) => { + if let Some(code) = error.code { + return Err(TcpError::NetlinkMsgError { code: code.get() }); + } + } + _ => {} + } + + offset += rx_packet.header.length as usize; + } + } + + Ok(inet_resp_hdrs) +} + +fn parse_nl_inet_hdrs( + hdrs: Vec, + tcp_stats: &mut TcpStats, +) -> Result<(), TcpError> { + for hdr in hdrs { + let state: TcpState = hdr.state.try_into()?; + let state_str: String = state.into(); + *tcp_stats.conn_states.entry(state_str).or_insert(0.0) += 1.0; + tcp_stats.tx_queued_bytes += f64::from(hdr.send_queue); + tcp_stats.rx_queued_bytes += f64::from(hdr.recv_queue) + } + + Ok(()) +} + +async fn build_tcp_stats() -> Result { + let mut tcp_stats = TcpStats::default(); + let resp = fetch_nl_inet_hdrs(AF_INET).await?; + parse_nl_inet_hdrs(resp, &mut tcp_stats)?; + + if is_ipv6_enabled() { + let resp = fetch_nl_inet_hdrs(AF_INET6).await?; + parse_nl_inet_hdrs(resp, &mut tcp_stats)?; + } + + Ok(tcp_stats) +} + +fn is_ipv6_enabled() -> bool { + Path::new(PROC_IPV6_FILE).exists() +} + +#[cfg(test)] +mod tests { + use tokio::net::{TcpListener, TcpStream}; + + use netlink_packet_sock_diag::{ + inet::{InetResponseHeader, SocketId}, + AF_INET, + }; + + use crate::sources::host_metrics::{HostMetrics, HostMetricsConfig, MetricsBuffer}; + + use super::{ + fetch_nl_inet_hdrs, parse_nl_inet_hdrs, TcpStats, STATE, TCP_CONNS_TOTAL, + TCP_RX_QUEUED_BYTES_TOTAL, TCP_TX_QUEUED_BYTES_TOTAL, + }; + + #[test] + fn parses_nl_inet_hdrs() { + let mut hdrs: Vec = Vec::new(); + for i in 1..4 { + let hdr = InetResponseHeader { + family: 0, + state: i, + timer: None, + socket_id: SocketId::new_v4(), + recv_queue: 3, + send_queue: 5, + uid: 0, + inode: 0, + }; + hdrs.push(hdr); + } + + let mut tcp_stats = TcpStats::default(); + parse_nl_inet_hdrs(hdrs, &mut tcp_stats).unwrap(); + + assert_eq!(tcp_stats.tx_queued_bytes, 15.0); + assert_eq!(tcp_stats.rx_queued_bytes, 9.0); + assert_eq!(tcp_stats.conn_states.len(), 3); + assert_eq!(*tcp_stats.conn_states.get("established").unwrap(), 1.0); + assert_eq!(*tcp_stats.conn_states.get("syn_sent").unwrap(), 1.0); + assert_eq!(*tcp_stats.conn_states.get("syn_recv").unwrap(), 1.0); + } + + #[tokio::test] + async fn fetches_nl_net_hdrs() { + // start a TCP server + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + // accept a connection + let (_stream, _socket) = listener.accept().await.unwrap(); + }); + // initiate a connection + let _stream = TcpStream::connect(addr).await.unwrap(); + + let hdrs = fetch_nl_inet_hdrs(AF_INET).await.unwrap(); + // there should be at least two connections, one for the server and one for the client + assert!(hdrs.len() >= 2); + + // assert that we have one connection with the server's port as the source port and + // one as the destination port + let mut source = false; + let mut dst = false; + for hdr in hdrs { + if hdr.socket_id.source_port == addr.port() { + source = true; + } + if hdr.socket_id.destination_port == addr.port() { + dst = true; + } + } + assert!(source); + assert!(dst); + } + + #[tokio::test] + async fn generates_tcp_metrics() { + let _listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + let mut buffer = MetricsBuffer::new(None); + HostMetrics::new(HostMetricsConfig::default()) + .tcp_metrics(&mut buffer) + .await; + let metrics = buffer.metrics; + + assert!(!metrics.is_empty()); + + let mut n_tx_queued_bytes_metric = 0; + let mut n_rx_queued_bytes_metric = 0; + + for metric in metrics { + if metric.name() == TCP_CONNS_TOTAL { + let tags = metric.tags(); + assert!( + tags.is_some(), + "Metric tcp_connections_total must have a tag" + ); + let tags = tags.unwrap(); + assert!( + tags.contains_key(STATE), + "Metric tcp_connections_total must have a mode tag" + ); + } else if metric.name() == TCP_TX_QUEUED_BYTES_TOTAL { + n_tx_queued_bytes_metric += 1; + } else if metric.name() == TCP_RX_QUEUED_BYTES_TOTAL { + n_rx_queued_bytes_metric += 1; + } else { + panic!("unrecognized metric name"); + } + } + + assert_eq!(n_tx_queued_bytes_metric, 1); + assert_eq!(n_rx_queued_bytes_metric, 1); + } +} diff --git a/website/cue/reference/components/sources/base/host_metrics.cue b/website/cue/reference/components/sources/base/host_metrics.cue index 21a66fa6811df..35fa29c8138c9 100644 --- a/website/cue/reference/components/sources/base/host_metrics.cue +++ b/website/cue/reference/components/sources/base/host_metrics.cue @@ -72,7 +72,7 @@ base: components: sources: host_metrics: configuration: { """ required: false type: array: { - default: ["cpu", "disk", "filesystem", "load", "host", "memory", "network", "process", "cgroups"] + default: ["cpu", "disk", "filesystem", "load", "host", "memory", "network", "process", "cgroups", "tcp"] items: type: string: { enum: { cgroups: """ @@ -88,8 +88,9 @@ base: components: sources: host_metrics: configuration: { memory: "Metrics related to memory utilization." network: "Metrics related to network utilization." process: "Metrics related to Process utilization." + tcp: "Metrics related to TCP connections." } - examples: ["cgroups", "cpu", "disk", "filesystem", "load", "host", "memory", "network"] + examples: ["cgroups", "cpu", "disk", "filesystem", "load", "host", "memory", "network", "tcp"] } } } diff --git a/website/cue/reference/components/sources/host_metrics.cue b/website/cue/reference/components/sources/host_metrics.cue index e2b966eabb2ae..c92894b441bb2 100644 --- a/website/cue/reference/components/sources/host_metrics.cue +++ b/website/cue/reference/components/sources/host_metrics.cue @@ -177,6 +177,23 @@ components: sources: host_metrics: { network_transmit_packets_drop_total: _host & _network_nomac & {description: "The number of packets dropped during transmits on this interface."} network_transmit_packets_total: _host & _network_nomac & {description: "The number of packets transmitted on this interface."} + // Host tcp + tcp_connections_total: _host & _tcp_linux & _tcp_gauge & {description: "The number of TCP connections."} + tcp_tx_queued_bytes_total: _host & _tcp_linux & { + description: "The number of bytes in the send queue across all connections." + type: "gauge" + tags: _host_metrics_tags & { + collector: examples: ["tcp"] + } + } + tcp_rx_queued_bytes_total: _host & _tcp_linux & { + description: "The number of bytes in the receive queue across all connections." + type: "gauge" + tags: _host_metrics_tags & { + collector: examples: ["tcp"] + } + } + // Helpers _host: { default_namespace: "host" @@ -278,5 +295,18 @@ components: sources: host_metrics: { collector: examples: ["process"] } } + + _tcp_linux: {relevant_when: "OS is Linux"} + _tcp_gauge: { + type: "gauge" + tags: _host_metrics_tags & { + collector: examples: ["tcp"] + state: { + description: "The connection state." + required: true + examples: ["established", "time_wait"] + } + } + } } }