Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired from the SocketAddr implementation a few lines above

fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
43 changes: 43 additions & 0 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand Down Expand Up @@ -41,6 +43,17 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// TODO: document this.
/// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr.
/// multicast wont work with systemd{N} fd sockets.
/// Also, if the address is IPv4, the multicast address should be IPv4 too.
/// TODO: should we support a list of groups or a single group? The `join_multicast` method supports
/// just one group per call.
/// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because
/// the `join_multicast_v6` is not supported due to the need of using an interface index.s
#[serde(default)]
multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +131,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +166,35 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
todo!("handle this error, IPv6 multicast is not supported")
}
// TODO: if we need to support systemd{N} fd sockets, we should use the
// `UdpSocket::local_addr` method to get the address of the socket.
// that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config,
// the `UdpSocket::local_addr` would return the real interface address that the
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address
// for the multicast group join that the user has set in the config.
// if systemd{N} fd sockets are required to work too, we should investigate on this.
SocketListenAddr::SystemdFd(_) => todo!("handle this error"),
};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
.map_err(|error| {
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case?
emit!(SocketBindError {
mode: SocketMode::Udp,
error,
})
})?;
// TODO: add debug (or info) logs here to inform the user that the socket has joined the multicast group.
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down