From a8563a3dd11c31c876a1ff681b51e7a86684e1f5 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Tue, 31 Dec 2024 17:16:18 +0100 Subject: [PATCH] feat: multicast udp socket support --- lib/vector-config/src/stdlib.rs | 27 ++++++++++++++++++++- src/sources/socket/udp.rs | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/lib/vector-config/src/stdlib.rs b/lib/vector-config/src/stdlib.rs index ad3ae47aac32d..add05f87fd9ea 100644 --- a/lib/vector-config/src/stdlib.rs +++ b/lib/vector-config/src/stdlib.rs @@ -2,7 +2,7 @@ use std::{ cell::RefCell, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, hash::Hash, - net::SocketAddr, + net::{Ipv4Addr, SocketAddr}, num::{ NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64, NonZeroU8, NonZeroUsize, @@ -402,6 +402,31 @@ impl ToValue for SocketAddr { } } +impl Configurable for Ipv4Addr { + fn referenceable_name() -> Option<&'static str> { + Some("stdlib::Ipv4Addr") + } + + fn metadata() -> Metadata { + let mut metadata = Metadata::default(); + metadata.set_description("An IPv4 address."); + metadata + } + + fn generate_schema(_: &RefCell) -> Result { + // TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`, + // but we eventually should have validation since the format for the possible permutations + // is well-known and can be easily codified. + Ok(generate_string_schema()) + } +} + +impl ToValue for Ipv4Addr { + fn to_value(&self) -> Value { + Value::String(self.to_string()) + } +} + impl Configurable for PathBuf { fn referenceable_name() -> Option<&'static str> { Some("stdlib::PathBuf") diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index a1b39c2fbcd33..ef3b27dfda5a0 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -1,3 +1,5 @@ +use std::net::{Ipv4Addr, SocketAddr}; + use super::default_host_key; use bytes::BytesMut; use chrono::Utc; @@ -41,6 +43,17 @@ pub struct UdpConfig { #[configurable(derived)] address: SocketListenAddr, + /// TODO: document this. + /// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr. + /// multicast wont work with systemd{N} fd sockets. + /// Also, if the address is IPv4, the multicast address should be IPv4 too. + /// TODO: should we support a list of groups or a single group? The `join_multicast` method supports + /// just one group per call. + /// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because + /// the `join_multicast_v6` is not supported due to the need of using an interface index.s + #[serde(default)] + multicast_groups: Vec, + /// The maximum buffer size of incoming messages. /// /// Messages larger than this are truncated. @@ -118,6 +131,7 @@ impl UdpConfig { pub fn from_address(address: SocketListenAddr) -> Self { Self { address, + multicast_groups: Vec::new(), max_length: default_max_length(), host_key: None, port_key: default_port_key(), @@ -152,6 +166,35 @@ pub(super) fn udp( }) })?; + if !config.multicast_groups.is_empty() { + let listen_addr = match config.address() { + SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr, + SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => { + todo!("handle this error, IPv6 multicast is not supported") + } + // TODO: if we need to support systemd{N} fd sockets, we should use the + // `UdpSocket::local_addr` method to get the address of the socket. + // that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config, + // the `UdpSocket::local_addr` would return the real interface address that the + // socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address + // for the multicast group join that the user has set in the config. + // if systemd{N} fd sockets are required to work too, we should investigate on this. + SocketListenAddr::SystemdFd(_) => todo!("handle this error"), + }; + for group_addr in config.multicast_groups { + socket + .join_multicast_v4(group_addr, *listen_addr.ip()) + .map_err(|error| { + // TODO: is this considered a `SocketBindError`? or should we create a new error for this case? + emit!(SocketBindError { + mode: SocketMode::Udp, + error, + }) + })?; + // TODO: add debug (or info) logs here to inform the user that the socket has joined the multicast group. + } + } + if let Some(receive_buffer_bytes) = config.receive_buffer_bytes { if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);