Skip to content

Commit

Permalink
transport.fbs: Make flags: SocketFlags required
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Jan 2, 2024
1 parent 395dcd0 commit 722fe2e
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 19 deletions.
39 changes: 38 additions & 1 deletion node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as

if (!IS_WINDOWS)
{
test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () =>
test('two transports binding to the same IP:port with udpReusePort flag succeed', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;
Expand Down Expand Up @@ -357,6 +357,43 @@ if (!IS_WINDOWS)
transport1?.close();
transport2?.close();
}, 2000);

test('two transports binding to the same IP:port without udpReusePort flag fails', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});
}).rejects.toThrow();

transport1?.close();
transport2?.close();
}, 2000);
}

test('plainTransport.getStats() succeeds', async () =>
Expand Down
16 changes: 10 additions & 6 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,33 @@ impl ListenInfo {
ip: self.ip.to_string(),
announced_ip: self.announced_ip.map(|ip| ip.to_string()),
port: self.port.unwrap_or(0),
flags: self.flags.map(|flags| Box::new(flags.to_fbs())),
flags: Box::new(self.flags.unwrap_or_default().to_fbs()),
send_buffer_size: self.send_buffer_size.unwrap_or(0),
recv_buffer_size: self.recv_buffer_size.unwrap_or(0),
}
}
}

/// UDP/TCP socket flags.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[derive(
Default, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize,
)]
#[serde(rename_all = "camelCase")]
pub struct SocketFlags {
/// Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
pub ipv6_only: bool,
/// Defaults to false.
pub ipv6_only: Option<bool>,
/// Make different transports bind to the same ip and port (only for UDP).
/// Useful for multicast scenarios with plain transport. Use with caution.
pub udp_reuse_port: bool,
/// Defaults to false.
pub udp_reuse_port: Option<bool>,
}

impl SocketFlags {
pub(crate) fn to_fbs(self) -> transport::SocketFlags {
transport::SocketFlags {
ipv6_only: self.ipv6_only,
udp_reuse_port: self.udp_reuse_port,
ipv6_only: self.ipv6_only.unwrap_or_else(false),
udp_reuse_port: self.udp_reuse_port.unwrap_or_else(Some(false)),
}
}
}
Expand Down
98 changes: 97 additions & 1 deletion rust/tests/integration/plain_transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures_lite::future;
use hash_hasher::HashedSet;
use mediasoup::data_structures::{AppData, ListenInfo, Protocol, SctpState, TransportTuple};
use mediasoup::data_structures::{
AppData, ListenInfo, Protocol, SctpState, SocketFlags, TransportTuple,
};
use mediasoup::plain_transport::{PlainTransportOptions, PlainTransportRemoteParameters};
use mediasoup::prelude::*;
use mediasoup::router::{Router, RouterOptions};
Expand Down Expand Up @@ -420,6 +422,100 @@ fn create_non_bindable_ip() {
});
}

#[cfg(not(target_os = "windows"))]
#[test]
fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succeed() {
future::block_on(async move {
let (_worker, router) = init().await;

let multicast_ip = "224.0.0.1".parse().unwrap();
let port = pick_unused_port().unwrap();

// Transport 1.
let _ = router
.create_plain_transport({
PlainTransportOptions::new(ListenInfo {
protocol: Protocol::Udp,
ip: multicast_ip,
announced_ip: None,
port: Some(port),
flags: Some(SocketFlags {
udp_reuse_port: true,
}),
send_buffer_size: None,
recv_buffer_size: None,
})
})
.await
.expect("Failed to create first Plain transport");

// Transport 2.
let _ = router
.create_plain_transport({
PlainTransportOptions::new(ListenInfo {
protocol: Protocol::Udp,
ip: multicast_ip,
announced_ip: None,
port: Some(port),
flags: Some(SocketFlags {
udp_reuse_port: true,
}),
send_buffer_size: None,
recv_buffer_size: None,
})
})
.await
.expect("Failed to create second Plain transport");
});
}

#[cfg(not(target_os = "windows"))]
#[test]
fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fails() {
future::block_on(async move {
let (_worker, router) = init().await;

let multicast_ip = "224.0.0.1".parse().unwrap();
let port = pick_unused_port().unwrap();

// Transport 1.
let _ = router
.create_plain_transport({
PlainTransportOptions::new(ListenInfo {
protocol: Protocol::Udp,
ip: multicast_ip,
announced_ip: None,
port: Some(port),
flags: Some(SocketFlags {
udp_reuse_port: false,
}),
send_buffer_size: None,
recv_buffer_size: None,
})
})
.await
.expect("Failed to create first Plain transport");

// Transport 2.
assert!(matches!(
router
.create_plain_transport(PlainTransportOptions::new(ListenInfo {
protocol: Protocol::Udp,
ip: multicast_ip,
announced_ip: None,
port: Some(port),
flags: Some(SocketFlags {
udp_reuse_port: false,
}),
send_buffer_size: None,
recv_buffer_size: None,
}))
.await,
Err(RequestError::Response { .. }),
));
});
}

#[test]
fn get_stats_succeeds() {
future::block_on(async move {
Expand Down
2 changes: 1 addition & 1 deletion worker/fbs/transport.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ table ListenInfo {
ip: string (required);
announced_ip: string;
port: uint16 = 0;
flags: SocketFlags;
flags: SocketFlags (required);
send_buffer_size: uint32 = 0;
recv_buffer_size: uint32 = 0;
}
Expand Down
10 changes: 0 additions & 10 deletions worker/src/RTC/PlainTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ namespace RTC
this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str());
}

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_FLAGS))
{
MS_DUMP("---- PRESENT !!!!!!");
}
else
{
MS_DUMP("---- NOT PRESENT !!!!!!");
}

this->listenInfo.port = options->listenInfo()->port();
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
Expand Down

0 comments on commit 722fe2e

Please sign in to comment.