Skip to content

Commit

Permalink
feat: multicast udp socket support
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgehermo9 committed Dec 31, 2024
1 parent 029a2ff commit a8563a3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
43 changes: 43 additions & 0 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand Down Expand Up @@ -41,6 +43,17 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// TODO: document this.
/// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr.
/// multicast wont work with systemd{N} fd sockets.
/// Also, if the address is IPv4, the multicast address should be IPv4 too.
/// TODO: should we support a list of groups or a single group? The `join_multicast` method supports
/// just one group per call.
/// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because
/// the `join_multicast_v6` is not supported due to the need of using an interface index.s
#[serde(default)]
multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +131,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +166,35 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
todo!("handle this error, IPv6 multicast is not supported")
}
// TODO: if we need to support systemd{N} fd sockets, we should use the
// `UdpSocket::local_addr` method to get the address of the socket.
// that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config,
// the `UdpSocket::local_addr` would return the real interface address that the
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address
// for the multicast group join that the user has set in the config.
// if systemd{N} fd sockets are required to work too, we should investigate on this.
SocketListenAddr::SystemdFd(_) => todo!("handle this error"),
};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
.map_err(|error| {
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case?
emit!(SocketBindError {
mode: SocketMode::Udp,
error,
})
})?;
// TODO: add debug (or info) logs here to inform the user that the socket has joined the multicast group.
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down

0 comments on commit a8563a3

Please sign in to comment.