diff --git a/.github/workflows/mediasoup-node.yaml b/.github/workflows/mediasoup-node.yaml index 5e61e6bc1a..0618603bc7 100644 --- a/.github/workflows/mediasoup-node.yaml +++ b/.github/workflows/mediasoup-node.yaml @@ -28,6 +28,7 @@ jobs: env: MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout diff --git a/.github/workflows/mediasoup-worker-prebuild.yaml b/.github/workflows/mediasoup-worker-prebuild.yaml index b399fbe89d..1291c95adc 100644 --- a/.github/workflows/mediasoup-worker-prebuild.yaml +++ b/.github/workflows/mediasoup-worker-prebuild.yaml @@ -32,6 +32,7 @@ jobs: CC: ${{ matrix.build.cc }} CXX: ${{ matrix.build.cxx }} MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout diff --git a/.github/workflows/mediasoup-worker.yaml b/.github/workflows/mediasoup-worker.yaml index 5405601601..6192e75475 100644 --- a/.github/workflows/mediasoup-worker.yaml +++ b/.github/workflows/mediasoup-worker.yaml @@ -44,6 +44,7 @@ jobs: CC: ${{ matrix.build.cc }} CXX: ${{ matrix.build.cxx }} MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true' + MEDIASOUP_LOCAL_DEV: 'true' steps: - name: Checkout diff --git a/CHANGELOG.md b/CHANGELOG.md index f24bb46fc8..0f01d306c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### NEXT * Avoid modification of user input data ([PR #1285](https://github.com/versatica/mediasoup/pull/1285)). +* `ListenInfo`: Add transport socket flags ([PR #1291](https://github.com/versatica/mediasoup/pull/1291)). ### 3.13.13 diff --git a/node/src/Router.ts b/node/src/Router.ts index 9b836f67ad..9de04034d5 100644 --- a/node/src/Router.ts +++ b/node/src/Router.ts @@ -7,7 +7,8 @@ import { Transport, TransportListenInfo, TransportListenIp, - TransportProtocol + TransportProtocol, + TransportSocketFlags } from './Transport'; import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport'; import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport'; @@ -570,6 +571,7 @@ export class Router listenInfo.ip, listenInfo.announcedIp, listenInfo.port, + socketFlagsToFbs(listenInfo.flags), listenInfo.sendBufferSize, listenInfo.recvBufferSize )); @@ -749,6 +751,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, + socketFlagsToFbs(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -759,6 +762,7 @@ export class Router rtcpListenInfo.ip, rtcpListenInfo.announcedIp, rtcpListenInfo.port, + socketFlagsToFbs(rtcpListenInfo.flags), rtcpListenInfo.sendBufferSize, rtcpListenInfo.recvBufferSize ) : undefined, @@ -897,6 +901,7 @@ export class Router listenInfo!.ip, listenInfo!.announcedIp, listenInfo!.port, + socketFlagsToFbs(listenInfo!.flags), listenInfo!.sendBufferSize, listenInfo!.recvBufferSize ), @@ -1619,3 +1624,13 @@ export function parseRouterDumpResponse( mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId') }; } + +export function socketFlagsToFbs( + flags: TransportSocketFlags = {} +): FbsTransport.SocketFlagsT +{ + return new FbsTransport.SocketFlagsT( + Boolean(flags.ipv6Only), + Boolean(flags.udpReusePort) + ); +} diff --git a/node/src/Transport.ts b/node/src/Transport.ts index 7ae79d39df..d04f863927 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -73,6 +73,11 @@ export type TransportListenInfo = */ port?: number; + /** + * Socket flags. + */ + flags?: TransportSocketFlags; + /** * Send buffer size (bytes). */ @@ -107,6 +112,22 @@ export type TransportListenIp = */ export type TransportProtocol = 'udp' | 'tcp'; +/** + * UDP/TCP socket flags. + */ +export type TransportSocketFlags = +{ + /** + * Disable dual-stack support so only IPv6 is used (only if ip is IPv6). + */ + ipv6Only?: boolean; + /** + * Make different transports bind to the same ip and port (only for UDP). + * Useful for multicast scenarios with plain transport. Use with caution. + */ + udpReusePort?: boolean; +}; + export type TransportTuple = { localIp: string; diff --git a/node/src/Worker.ts b/node/src/Worker.ts index 44cee98b4b..56a6d7c813 100644 --- a/node/src/Worker.ts +++ b/node/src/Worker.ts @@ -6,7 +6,7 @@ import { Logger } from './Logger'; import { EnhancedEventEmitter } from './EnhancedEventEmitter'; import * as ortc from './ortc'; import { Channel } from './Channel'; -import { Router, RouterOptions } from './Router'; +import { Router, RouterOptions, socketFlagsToFbs } from './Router'; import { WebRtcServer, WebRtcServerOptions } from './WebRtcServer'; import { RtpCodecCapability } from './RtpParameters'; import { AppData } from './types'; @@ -699,6 +699,7 @@ export class Worker listenInfo.ip, listenInfo.announcedIp, listenInfo.port, + socketFlagsToFbs(listenInfo.flags), listenInfo.sendBufferSize, listenInfo.recvBufferSize) ); diff --git a/node/src/tests/test-PlainTransport.ts b/node/src/tests/test-PlainTransport.ts index 3e75c50586..554d07444a 100644 --- a/node/src/tests/test-PlainTransport.ts +++ b/node/src/tests/test-PlainTransport.ts @@ -1,7 +1,10 @@ +import * as os from 'node:os'; // @ts-ignore import * as pickPort from 'pick-port'; import * as mediasoup from '../'; +const IS_WINDOWS = os.platform() === 'win32'; + let worker: mediasoup.types.Worker; let router: mediasoup.types.Router; let transport: mediasoup.types.PlainTransport; @@ -316,6 +319,83 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as .toThrow(Error); }, 2000); +if (!IS_WINDOWS) +{ + test('two transports binding to the same IP:port with udpReusePort flag succeed', async () => + { + let transport1: mediasoup.types.PlainTransport | undefined; + let transport2: mediasoup.types.PlainTransport | undefined; + + await expect(async () => + { + const multicastIp = '224.0.0.1'; + const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + + transport1 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: true } + } + }); + + transport2 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: true } + } + }); + }).not.toThrow(); + + transport1?.close(); + transport2?.close(); + }, 2000); + + test('two transports binding to the same IP:port without udpReusePort flag fails', async () => + { + let transport1: mediasoup.types.PlainTransport | undefined; + let transport2: mediasoup.types.PlainTransport | undefined; + + await expect(async () => + { + const multicastIp = '224.0.0.1'; + const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 }); + + transport1 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: false } + } + }); + + transport2 = await router.createPlainTransport( + { + listenInfo : + { + protocol : 'udp', + ip : multicastIp, + port : port, + flags : { udpReusePort: false } + } + }); + }).rejects.toThrow(); + + transport1?.close(); + transport2?.close(); + }, 2000); +} + test('plainTransport.getStats() succeeds', async () => { const data = await transport.getStats(); diff --git a/npm-scripts.mjs b/npm-scripts.mjs index 6dab36a258..a734c35e33 100644 --- a/npm-scripts.mjs +++ b/npm-scripts.mjs @@ -1,7 +1,7 @@ -import process from 'node:process'; -import os from 'node:os'; -import fs from 'node:fs'; -import path from 'node:path'; +import * as process from 'node:process'; +import * as os from 'node:os'; +import * as fs from 'node:fs'; +import * as path from 'node:path'; import { execSync } from 'node:child_process'; import fetch from 'node-fetch'; import tar from 'tar'; diff --git a/rust/benches/producer.rs b/rust/benches/producer.rs index a6a5e87c64..c9b98870dc 100644 --- a/rust/benches/producer.rs +++ b/rust/benches/producer.rs @@ -67,6 +67,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/examples/echo.rs b/rust/examples/echo.rs index fd8027d83c..c64ed714d4 100644 --- a/rust/examples/echo.rs +++ b/rust/examples/echo.rs @@ -187,6 +187,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/examples/multiopus.rs b/rust/examples/multiopus.rs index 341c9d8a0c..a4f2f7be02 100644 --- a/rust/examples/multiopus.rs +++ b/rust/examples/multiopus.rs @@ -150,6 +150,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -235,6 +236,7 @@ impl EchoConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), diff --git a/rust/examples/svc-simulcast.rs b/rust/examples/svc-simulcast.rs index 3cfd318b9b..959d7ce056 100644 --- a/rust/examples/svc-simulcast.rs +++ b/rust/examples/svc-simulcast.rs @@ -207,6 +207,7 @@ impl SvcSimulcastConnection { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/examples/videoroom.rs b/rust/examples/videoroom.rs index ef818d8be2..ff7a6a0794 100644 --- a/rust/examples/videoroom.rs +++ b/rust/examples/videoroom.rs @@ -501,6 +501,7 @@ mod participant { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/data_structures.rs b/rust/src/data_structures.rs index b4d41a6167..07fb5247af 100644 --- a/rust/src/data_structures.rs +++ b/rust/src/data_structures.rs @@ -62,6 +62,9 @@ pub struct ListenInfo { /// Listening port. #[serde(skip_serializing_if = "Option::is_none")] pub port: Option, + /// Socket flags. + #[serde(skip_serializing_if = "Option::is_none")] + pub flags: Option, /// Send buffer size (bytes). #[serde(skip_serializing_if = "Option::is_none")] pub send_buffer_size: Option, @@ -80,12 +83,37 @@ impl ListenInfo { ip: self.ip.to_string(), announced_ip: self.announced_ip.map(|ip| ip.to_string()), port: self.port.unwrap_or(0), + flags: Box::new(self.flags.unwrap_or_default().to_fbs()), send_buffer_size: self.send_buffer_size.unwrap_or(0), recv_buffer_size: self.recv_buffer_size.unwrap_or(0), } } } +/// UDP/TCP socket flags. +#[derive( + Default, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize, +)] +#[serde(rename_all = "camelCase")] +pub struct SocketFlags { + /// Disable dual-stack support so only IPv6 is used (only if ip is IPv6). + /// Defaults to false. + pub ipv6_only: bool, + /// Make different transports bind to the same ip and port (only for UDP). + /// Useful for multicast scenarios with plain transport. Use with caution. + /// Defaults to false. + pub udp_reuse_port: bool, +} + +impl SocketFlags { + pub(crate) fn to_fbs(self) -> transport::SocketFlags { + transport::SocketFlags { + ipv6_only: self.ipv6_only, + udp_reuse_port: self.udp_reuse_port, + } + } +} + /// ICE role. #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] diff --git a/rust/src/router.rs b/rust/src/router.rs index c2d09a0228..931593932c 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -147,6 +147,7 @@ impl PipeToRouterOptions { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -609,6 +610,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -696,6 +698,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// })) @@ -761,6 +764,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// })) @@ -971,6 +975,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1013,6 +1018,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1198,6 +1204,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, @@ -1229,6 +1236,7 @@ impl Router { /// ip: IpAddr::V4(Ipv4Addr::LOCALHOST), /// announced_ip: Some("9.9.9.1".parse().unwrap()), /// port: None, + /// flags: None, /// send_buffer_size: None, /// recv_buffer_size: None, /// }, diff --git a/rust/src/router/consumer/tests.rs b/rust/src/router/consumer/tests.rs index 8f4fe2a74a..94a5b2f9b6 100644 --- a/rust/src/router/consumer/tests.rs +++ b/rust/src/router/consumer/tests.rs @@ -87,6 +87,7 @@ async fn init() -> (Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/data_consumer/tests.rs b/rust/src/router/data_consumer/tests.rs index 7ef9a80588..64a2dd884f 100644 --- a/rust/src/router/data_consumer/tests.rs +++ b/rust/src/router/data_consumer/tests.rs @@ -41,6 +41,7 @@ async fn init() -> (Router, DataProducer) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -74,6 +75,7 @@ fn data_producer_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -127,6 +129,7 @@ fn transport_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/router/data_producer/tests.rs b/rust/src/router/data_producer/tests.rs index 3bbb34db02..52b585b162 100644 --- a/rust/src/router/data_producer/tests.rs +++ b/rust/src/router/data_producer/tests.rs @@ -41,6 +41,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/pipe_transport/tests.rs b/rust/src/router/pipe_transport/tests.rs index 54a1f22a48..4e57984bcf 100644 --- a/rust/src/router/pipe_transport/tests.rs +++ b/rust/src/router/pipe_transport/tests.rs @@ -103,6 +103,7 @@ async fn init() -> (Router, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/plain_transport.rs b/rust/src/router/plain_transport.rs index 8988b6d37b..33debb2fbf 100644 --- a/rust/src/router/plain_transport.rs +++ b/rust/src/router/plain_transport.rs @@ -837,8 +837,8 @@ impl PlainTransport { /// # async fn f( /// # plain_transport: mediasoup::plain_transport::PlainTransport, /// # ) -> Result<(), Box> { - /// // Calling connect() on a PlainTransport created with comedia unset, rtcpMux - /// // set and enableSrtp enabled. + /// // Calling connect() on a PlainTransport created with comedia unset, + /// // rtcp_mux set and enableSrtp enabled. /// plain_transport /// .connect(PlainTransportRemoteParameters { /// ip: Some("1.2.3.4".parse().unwrap()), diff --git a/rust/src/router/plain_transport/tests.rs b/rust/src/router/plain_transport/tests.rs index fcff706c5f..a0f0af8d16 100644 --- a/rust/src/router/plain_transport/tests.rs +++ b/rust/src/router/plain_transport/tests.rs @@ -42,6 +42,7 @@ fn router_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/router/producer/tests.rs b/rust/src/router/producer/tests.rs index 92533c3c32..21e18c21d7 100644 --- a/rust/src/router/producer/tests.rs +++ b/rust/src/router/producer/tests.rs @@ -72,6 +72,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/src/router/webrtc_transport/tests.rs b/rust/src/router/webrtc_transport/tests.rs index 42d54678ee..fe8a81de43 100644 --- a/rust/src/router/webrtc_transport/tests.rs +++ b/rust/src/router/webrtc_transport/tests.rs @@ -57,6 +57,7 @@ fn create_with_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -65,6 +66,7 @@ fn create_with_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -233,6 +235,7 @@ fn router_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -274,6 +277,7 @@ fn webrtc_server_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -282,6 +286,7 @@ fn webrtc_server_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/src/webrtc_server/tests.rs b/rust/src/webrtc_server/tests.rs index a07c32c648..95f257bc77 100644 --- a/rust/src/webrtc_server/tests.rs +++ b/rust/src/webrtc_server/tests.rs @@ -38,6 +38,7 @@ fn worker_close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, diff --git a/rust/tests/integration/consumer.rs b/rust/tests/integration/consumer.rs index 8a4746365b..54338b00a2 100644 --- a/rust/tests/integration/consumer.rs +++ b/rust/tests/integration/consumer.rs @@ -352,6 +352,7 @@ async fn init() -> ( ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index d18e21d540..8ae3ed3cf1 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -68,6 +68,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, DataProducer) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -99,6 +100,7 @@ fn consume_data_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -208,6 +210,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -572,6 +575,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/data_producer.rs b/rust/tests/integration/data_producer.rs index e753ff5eb5..31516da931 100644 --- a/rust/tests/integration/data_producer.rs +++ b/rust/tests/integration/data_producer.rs @@ -54,6 +54,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, PlainTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -72,6 +73,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, PlainTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/multiopus.rs b/rust/tests/integration/multiopus.rs index 650b450cd0..f05170401e 100644 --- a/rust/tests/integration/multiopus.rs +++ b/rust/tests/integration/multiopus.rs @@ -135,6 +135,7 @@ async fn init() -> (Router, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/pipe_transport.rs b/rust/tests/integration/pipe_transport.rs index 242c37a483..32a8170e96 100644 --- a/rust/tests/integration/pipe_transport.rs +++ b/rust/tests/integration/pipe_transport.rs @@ -261,6 +261,7 @@ async fn init() -> ( ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -605,6 +606,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -639,6 +641,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }) @@ -662,6 +665,7 @@ fn create_with_enable_rtx_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -774,6 +778,7 @@ fn create_with_enable_srtp_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -829,6 +834,7 @@ fn create_with_invalid_srtp_parameters_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -1155,6 +1161,7 @@ fn pipe_to_router_called_twice_generates_single_pair() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/plain_transport.rs b/rust/tests/integration/plain_transport.rs index c968c395b5..5fc334fca6 100644 --- a/rust/tests/integration/plain_transport.rs +++ b/rust/tests/integration/plain_transport.rs @@ -1,5 +1,7 @@ use futures_lite::future; use hash_hasher::HashedSet; +#[cfg(not(target_os = "windows"))] +use mediasoup::data_structures::SocketFlags; use mediasoup::data_structures::{AppData, ListenInfo, Protocol, SctpState, TransportTuple}; use mediasoup::plain_transport::{PlainTransportOptions, PlainTransportRemoteParameters}; use mediasoup::prelude::*; @@ -94,6 +96,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -132,6 +135,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -205,6 +209,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -215,6 +220,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(rtcp_port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -282,6 +288,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }) @@ -305,6 +312,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -338,6 +346,7 @@ fn create_enable_srtp_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -403,6 +412,7 @@ fn create_non_bindable_ip() { ip: "8.8.8.8".parse().unwrap(), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -412,6 +422,105 @@ fn create_non_bindable_ip() { }); } +#[cfg(not(target_os = "windows"))] +#[test] +fn create_two_transports_binding_to_same_ip_port_with_udp_reuse_port_flag_succeed() { + future::block_on(async move { + let (_worker, router) = init().await; + + let multicast_ip = "224.0.0.1".parse().unwrap(); + let port = pick_unused_port().unwrap(); + + let transport1 = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + ipv6_only: false, + udp_reuse_port: true, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create first Plain transport"); + + let transport2 = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + ipv6_only: false, + udp_reuse_port: true, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create second Plain transport"); + + assert_eq!(transport1.tuple().local_port(), port); + assert_eq!(transport2.tuple().local_port(), port); + }); +} + +#[cfg(not(target_os = "windows"))] +#[test] +fn create_two_transports_binding_to_same_ip_port_without_udp_reuse_port_flag_fails() { + future::block_on(async move { + let (_worker, router) = init().await; + + let multicast_ip = "224.0.0.1".parse().unwrap(); + let port = pick_unused_port().unwrap(); + + let transport1 = router + .create_plain_transport({ + PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + ipv6_only: false, + udp_reuse_port: false, + }), + send_buffer_size: None, + recv_buffer_size: None, + }) + }) + .await + .expect("Failed to create first Plain transport"); + + assert!(matches!( + router + .create_plain_transport(PlainTransportOptions::new(ListenInfo { + protocol: Protocol::Udp, + ip: multicast_ip, + announced_ip: None, + port: Some(port), + flags: Some(SocketFlags { + ipv6_only: false, + udp_reuse_port: false, + }), + send_buffer_size: None, + recv_buffer_size: None, + })) + .await, + Err(RequestError::Response { .. }), + )); + + assert_eq!(transport1.tuple().local_port(), port); + }); +} + #[test] fn get_stats_succeeds() { future::block_on(async move { @@ -424,6 +533,7 @@ fn get_stats_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -481,6 +591,7 @@ fn connect_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -556,6 +667,7 @@ fn connect_wrong_arguments() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -596,6 +708,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("4.4.4.4".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/producer.rs b/rust/tests/integration/producer.rs index 7b05c34083..909d5cbac4 100644 --- a/rust/tests/integration/producer.rs +++ b/rust/tests/integration/producer.rs @@ -207,6 +207,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); diff --git a/rust/tests/integration/smoke.rs b/rust/tests/integration/smoke.rs index 2f1a46f658..7008ad5e83 100644 --- a/rust/tests/integration/smoke.rs +++ b/rust/tests/integration/smoke.rs @@ -88,6 +88,7 @@ fn smoke() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, })); @@ -274,6 +275,7 @@ fn smoke() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); diff --git a/rust/tests/integration/webrtc_server.rs b/rust/tests/integration/webrtc_server.rs index f02cb765be..76b25a8dc7 100644 --- a/rust/tests/integration/webrtc_server.rs +++ b/rust/tests/integration/webrtc_server.rs @@ -64,6 +64,7 @@ fn create_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -72,6 +73,7 @@ fn create_webrtc_server_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -158,6 +160,7 @@ fn create_webrtc_server_without_specifying_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -166,6 +169,7 @@ fn create_webrtc_server_without_specifying_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -239,6 +243,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -247,6 +252,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), announced_ip: None, port: Some(port2), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -270,6 +276,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -278,6 +285,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), announced_ip: Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))), port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }); @@ -301,6 +309,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -315,6 +324,7 @@ fn unavailable_infos_fails() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port1), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -343,6 +353,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, }, diff --git a/rust/tests/integration/webrtc_transport.rs b/rust/tests/integration/webrtc_transport.rs index e8c45b6761..9eec456aa4 100644 --- a/rust/tests/integration/webrtc_transport.rs +++ b/rust/tests/integration/webrtc_transport.rs @@ -100,6 +100,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -137,6 +138,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -145,6 +147,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED), announced_ip: Some("9.9.9.2".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -153,6 +156,7 @@ fn create_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }, @@ -263,6 +267,7 @@ fn create_with_fixed_port_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: Some(port), + flags: None, send_buffer_size: None, recv_buffer_size: None, })) @@ -286,6 +291,7 @@ fn weak() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -316,6 +322,7 @@ fn create_non_bindable_ip() { ip: "8.8.8.8".parse().unwrap(), announced_ip: None, port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, },) @@ -338,6 +345,7 @@ fn get_stats_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -389,6 +397,7 @@ fn connect_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -438,6 +447,7 @@ fn set_max_incoming_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -470,6 +480,7 @@ fn set_max_outgoing_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -502,6 +513,7 @@ fn set_min_outgoing_bitrate_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -534,6 +546,7 @@ fn set_max_outgoing_bitrate_fails_if_value_is_lower_than_current_min_limit() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -571,6 +584,7 @@ fn set_min_outgoing_bitrate_fails_if_value_is_higher_than_current_max_limit() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -608,6 +622,7 @@ fn restart_ice_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -643,6 +658,7 @@ fn enable_trace_event_succeeds() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), @@ -718,6 +734,7 @@ fn close_event() { ip: IpAddr::V4(Ipv4Addr::LOCALHOST), announced_ip: Some("9.9.9.1".parse().unwrap()), port: None, + flags: None, send_buffer_size: None, recv_buffer_size: None, }), diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index d2a77d4ae9..001661b89f 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -13,11 +13,17 @@ enum Protocol: uint8 { TCP } +table SocketFlags { + ipv6_only: bool = false; + udp_reuse_port: bool = false; +} + table ListenInfo { - protocol: FBS.Transport.Protocol = UDP; + protocol: Protocol = UDP; ip: string (required); announced_ip: string; port: uint16 = 0; + flags: SocketFlags (required); send_buffer_size: uint32 = 0; recv_buffer_size: uint32 = 0; } @@ -85,7 +91,7 @@ table Tuple { local_port: uint16; remote_ip: string; remote_port: uint16; - protocol: FBS.Transport.Protocol = UDP; + protocol: Protocol = UDP; } table RtpListener { diff --git a/worker/include/Logger.hpp b/worker/include/Logger.hpp index 53c16f32a4..e3c22bb71f 100644 --- a/worker/include/Logger.hpp +++ b/worker/include/Logger.hpp @@ -126,6 +126,19 @@ ((value & 0x02) ? '1' : '0'), \ ((value & 0x01) ? '1' : '0') +// Usage: +// MS_DEBUG_DEV("Leading text "MS_UINT8_TO_BINARY_PATTERN, MS_UINT8_TO_BINARY(value)); +#define MS_UINT8_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c" +#define MS_UINT8_TO_BINARY(value) \ + ((value & 0x80) ? '1' : '0'), \ + ((value & 0x40) ? '1' : '0'), \ + ((value & 0x20) ? '1' : '0'), \ + ((value & 0x10) ? '1' : '0'), \ + ((value & 0x08) ? '1' : '0'), \ + ((value & 0x04) ? '1' : '0'), \ + ((value & 0x02) ? '1' : '0'), \ + ((value & 0x01) ? '1' : '0') + class Logger { public: diff --git a/worker/include/RTC/PortManager.hpp b/worker/include/RTC/PortManager.hpp index 4300548385..17294f24ca 100644 --- a/worker/include/RTC/PortManager.hpp +++ b/worker/include/RTC/PortManager.hpp @@ -3,6 +3,7 @@ #include "common.hpp" #include "Settings.hpp" +#include "RTC/Transport.hpp" #include #include #include @@ -20,21 +21,21 @@ namespace RTC }; public: - static uv_udp_t* BindUdp(std::string& ip) + static uv_udp_t* BindUdp(std::string& ip, RTC::Transport::SocketFlags& flags) { - return reinterpret_cast(Bind(Transport::UDP, ip)); + return reinterpret_cast(Bind(Transport::UDP, ip, flags)); } - static uv_udp_t* BindUdp(std::string& ip, uint16_t port) + static uv_udp_t* BindUdp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { - return reinterpret_cast(Bind(Transport::UDP, ip, port)); + return reinterpret_cast(Bind(Transport::UDP, ip, port, flags)); } - static uv_tcp_t* BindTcp(std::string& ip) + static uv_tcp_t* BindTcp(std::string& ip, RTC::Transport::SocketFlags& flags) { - return reinterpret_cast(Bind(Transport::TCP, ip)); + return reinterpret_cast(Bind(Transport::TCP, ip, flags)); } - static uv_tcp_t* BindTcp(std::string& ip, uint16_t port) + static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { - return reinterpret_cast(Bind(Transport::TCP, ip, port)); + return reinterpret_cast(Bind(Transport::TCP, ip, port, flags)); } static void UnbindUdp(std::string& ip, uint16_t port) { @@ -46,10 +47,12 @@ namespace RTC } private: - static uv_handle_t* Bind(Transport transport, std::string& ip); - static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port); + static uv_handle_t* Bind(Transport transport, std::string& ip, RTC::Transport::SocketFlags& flags); + static uv_handle_t* Bind( + Transport transport, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags); static void Unbind(Transport transport, std::string& ip, uint16_t port); static std::vector& GetPorts(Transport transport, const std::string& ip); + static uint8_t ConvertSocketFlags(RTC::Transport::SocketFlags& flags); private: thread_local static absl::flat_hash_map> mapUdpIpPorts; diff --git a/worker/include/RTC/TcpServer.hpp b/worker/include/RTC/TcpServer.hpp index 89e03ca6b8..6d1aeb7376 100644 --- a/worker/include/RTC/TcpServer.hpp +++ b/worker/include/RTC/TcpServer.hpp @@ -3,6 +3,7 @@ #include "common.hpp" #include "RTC/TcpConnection.hpp" +#include "RTC/Transport.hpp" #include "handles/TcpConnectionHandle.hpp" #include "handles/TcpServerHandle.hpp" #include @@ -23,9 +24,17 @@ namespace RTC }; public: - TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip); TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port); + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + RTC::Transport::SocketFlags& flags); + TcpServer( + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + uint16_t port, + RTC::Transport::SocketFlags& flags); ~TcpServer() override; /* Pure virtual methods inherited from ::TcpServerHandle. */ diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index 61bdd3ec65..95814a4920 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -119,11 +119,18 @@ namespace RTC }; public: + struct SocketFlags + { + bool ipv6Only{ false }; + bool udpReusePort{ false }; + }; + struct ListenInfo { std::string ip; std::string announcedIp; uint16_t port{ 0u }; + SocketFlags flags; uint32_t sendBufferSize{ 0u }; uint32_t recvBufferSize{ 0u }; }; diff --git a/worker/include/RTC/UdpSocket.hpp b/worker/include/RTC/UdpSocket.hpp index 46e90f3edd..ad9bde10d6 100644 --- a/worker/include/RTC/UdpSocket.hpp +++ b/worker/include/RTC/UdpSocket.hpp @@ -2,6 +2,7 @@ #define MS_RTC_UDP_SOCKET_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include "handles/UdpSocketHandle.hpp" #include @@ -21,8 +22,8 @@ namespace RTC }; public: - UdpSocket(Listener* listener, std::string& ip); - UdpSocket(Listener* listener, std::string& ip, uint16_t port); + UdpSocket(Listener* listener, std::string& ip, RTC::Transport::SocketFlags& flags); + UdpSocket(Listener* listener, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags); ~UdpSocket() override; /* Pure virtual methods inherited from ::UdpSocketHandle. */ diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index 950697b111..8706f5a60b 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -50,19 +50,11 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - } + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags.ipv6Only = options->listenInfo()->flags()->ipv6Only(); + this->listenInfo.flags.udpReusePort = options->listenInfo()->flags()->udpReusePort(); this->rtx = options->enableRtx(); @@ -77,11 +69,12 @@ namespace RTC // This may throw. if (this->listenInfo.port != 0) { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port); + this->udpSocket = new RTC::UdpSocket( + this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags); } else { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip); + this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags); } if (this->listenInfo.sendBufferSize != 0) diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index 305e67516f..672fe54cc2 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -52,19 +52,11 @@ namespace RTC this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str()); } - this->listenInfo.port = options->listenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); - } + this->listenInfo.port = options->listenInfo()->port(); + this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize(); + this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize(); + this->listenInfo.flags.ipv6Only = options->listenInfo()->flags()->ipv6Only(); + this->listenInfo.flags.udpReusePort = options->listenInfo()->flags()->udpReusePort(); this->rtcpMux = options->rtcpMux(); this->comedia = options->comedia(); @@ -90,19 +82,11 @@ namespace RTC this->rtcpListenInfo.announcedIp.assign(options->rtcpListenInfo()->announcedIp()->str()); } - this->rtcpListenInfo.port = options->rtcpListenInfo()->port(); - - if (flatbuffers::IsFieldPresent( - options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE)) - { - this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize(); - } - - if (flatbuffers::IsFieldPresent( - options->rtcpListenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE)) - { - this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize(); - } + this->rtcpListenInfo.port = options->rtcpListenInfo()->port(); + this->rtcpListenInfo.sendBufferSize = options->rtcpListenInfo()->sendBufferSize(); + this->rtcpListenInfo.recvBufferSize = options->rtcpListenInfo()->recvBufferSize(); + this->rtcpListenInfo.flags.ipv6Only = options->rtcpListenInfo()->flags()->ipv6Only(); + this->rtcpListenInfo.flags.udpReusePort = options->rtcpListenInfo()->flags()->udpReusePort(); } // If rtcpListenInfo is not given, just clone listenInfo. else @@ -160,11 +144,12 @@ namespace RTC // This may throw. if (this->listenInfo.port != 0) { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port); + this->udpSocket = new RTC::UdpSocket( + this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags); } else { - this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip); + this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags); } if (this->listenInfo.sendBufferSize != 0) @@ -184,12 +169,13 @@ namespace RTC // This may throw. if (this->rtcpListenInfo.port != 0) { - this->rtcpUdpSocket = - new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port); + this->rtcpUdpSocket = new RTC::UdpSocket( + this, this->rtcpListenInfo.ip, this->rtcpListenInfo.port, this->rtcpListenInfo.flags); } else { - this->rtcpUdpSocket = new RTC::UdpSocket(this, this->rtcpListenInfo.ip); + this->rtcpUdpSocket = + new RTC::UdpSocket(this, this->rtcpListenInfo.ip, this->rtcpListenInfo.flags); } if (this->rtcpListenInfo.sendBufferSize != 0) diff --git a/worker/src/RTC/PortManager.cpp b/worker/src/RTC/PortManager.cpp index 0d19d686f3..051a318d45 100644 --- a/worker/src/RTC/PortManager.cpp +++ b/worker/src/RTC/PortManager.cpp @@ -38,7 +38,7 @@ namespace RTC /* Class methods. */ - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip) + uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, RTC::Transport::SocketFlags& flags) { MS_TRACE(); @@ -49,23 +49,29 @@ namespace RTC const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) size_t portIdx; - int flags{ 0 }; std::vector& ports = PortManager::GetPorts(transport, ip); size_t attempt{ 0u }; const size_t numAttempts = ports.size(); uv_handle_t* uvHandle{ nullptr }; uint16_t port; std::string transportStr; + uint8_t bitFlags = ConvertSocketFlags(flags); switch (transport) { case Transport::UDP: + { transportStr.assign("udp"); + break; + } case Transport::TCP: + { transportStr.assign("tcp"); + break; + } } switch (family) @@ -91,9 +97,6 @@ namespace RTC MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err)); } - // Don't also bind into IPv4 when listening in IPv6. - flags |= UV_UDP_IPV6ONLY; - break; } @@ -160,12 +163,18 @@ namespace RTC switch (family) { case AF_INET: + { (reinterpret_cast(&bindAddr))->sin_port = htons(port); + break; + } case AF_INET6: + { (reinterpret_cast(&bindAddr))->sin6_port = htons(port); + break; + } } // Try to bind on it. @@ -220,7 +229,7 @@ namespace RTC err = uv_udp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -242,7 +251,7 @@ namespace RTC err = uv_tcp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -356,7 +365,8 @@ namespace RTC return static_cast(uvHandle); } - uv_handle_t* PortManager::Bind(Transport transport, std::string& ip, uint16_t port) + uv_handle_t* PortManager::Bind( + Transport transport, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) { MS_TRACE(); @@ -366,19 +376,25 @@ namespace RTC int err; const int family = Utils::IP::GetFamily(ip); struct sockaddr_storage bindAddr; // NOLINT(cppcoreguidelines-pro-type-member-init) - int flags{ 0 }; uv_handle_t* uvHandle{ nullptr }; std::string transportStr; + uint8_t bitFlags = ConvertSocketFlags(flags); switch (transport) { case Transport::UDP: + { transportStr.assign("udp"); + break; + } case Transport::TCP: + { transportStr.assign("tcp"); + break; + } } switch (family) @@ -404,9 +420,6 @@ namespace RTC MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err)); } - // Don't also bind into IPv4 when listening in IPv6. - flags |= UV_UDP_IPV6ONLY; - break; } @@ -421,27 +434,39 @@ namespace RTC switch (family) { case AF_INET: + { (reinterpret_cast(&bindAddr))->sin_port = htons(port); + break; + } case AF_INET6: + { (reinterpret_cast(&bindAddr))->sin6_port = htons(port); + break; + } } // Try to bind on it. switch (transport) { case Transport::UDP: + { uvHandle = reinterpret_cast(new uv_udp_t()); err = uv_udp_init_ex( DepLibUV::GetLoop(), reinterpret_cast(uvHandle), UV_UDP_RECVMMSG); + break; + } case Transport::TCP: + { uvHandle = reinterpret_cast(new uv_tcp_t()); err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast(uvHandle)); + break; + } } if (err != 0) @@ -475,7 +500,7 @@ namespace RTC err = uv_udp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -498,7 +523,7 @@ namespace RTC err = uv_tcp_bind( reinterpret_cast(uvHandle), reinterpret_cast(&bindAddr), - flags); + bitFlags); if (err != 0) { @@ -666,4 +691,24 @@ namespace RTC return emptyPorts; } + + uint8_t PortManager::ConvertSocketFlags(RTC::Transport::SocketFlags& flags) + { + MS_TRACE(); + + uint8_t bitFlags{ 0b00000000 }; + + if (flags.ipv6Only) + { + bitFlags |= UV_UDP_IPV6ONLY; + bitFlags |= UV_TCP_IPV6ONLY; // Same flag number but anyway. + } + + if (flags.udpReusePort) + { + bitFlags |= UV_UDP_REUSEADDR; + } + + return bitFlags; + } } // namespace RTC diff --git a/worker/src/RTC/TcpServer.cpp b/worker/src/RTC/TcpServer.cpp index ddd1d2da82..0cef5aa800 100644 --- a/worker/src/RTC/TcpServer.cpp +++ b/worker/src/RTC/TcpServer.cpp @@ -10,19 +10,27 @@ namespace RTC { /* Instance methods. */ - TcpServer::TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip) + TcpServer::TcpServer( + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + RTC::Transport::SocketFlags& flags) : // This may throw. - ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip)), listener(listener), + ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, flags)), listener(listener), connListener(connListener) { MS_TRACE(); } TcpServer::TcpServer( - Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port) + Listener* listener, + RTC::TcpConnection::Listener* connListener, + std::string& ip, + uint16_t port, + RTC::Transport::SocketFlags& flags) : // This may throw. - ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, port)), listener(listener), - connListener(connListener), fixedPort(true) + ::TcpServerHandle::TcpServerHandle(RTC::PortManager::BindTcp(ip, port, flags)), + listener(listener), connListener(connListener), fixedPort(true) { MS_TRACE(); } diff --git a/worker/src/RTC/UdpSocket.cpp b/worker/src/RTC/UdpSocket.cpp index 6c47ab4671..b599620f25 100644 --- a/worker/src/RTC/UdpSocket.cpp +++ b/worker/src/RTC/UdpSocket.cpp @@ -10,16 +10,17 @@ namespace RTC { /* Instance methods. */ - UdpSocket::UdpSocket(Listener* listener, std::string& ip) + UdpSocket::UdpSocket(Listener* listener, std::string& ip, RTC::Transport::SocketFlags& flags) : // This may throw. - ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip)), listener(listener) + ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, flags)), listener(listener) { MS_TRACE(); } - UdpSocket::UdpSocket(Listener* listener, std::string& ip, uint16_t port) + UdpSocket::UdpSocket( + Listener* listener, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags) : // This may throw. - ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, port)), listener(listener), + ::UdpSocketHandle::UdpSocketHandle(PortManager::BindUdp(ip, port, flags)), listener(listener), fixedPort(true) { MS_TRACE(); diff --git a/worker/src/RTC/WebRtcServer.cpp b/worker/src/RTC/WebRtcServer.cpp index 8fe9f760a3..a1f234c9db 100644 --- a/worker/src/RTC/WebRtcServer.cpp +++ b/worker/src/RTC/WebRtcServer.cpp @@ -60,6 +60,11 @@ namespace RTC announcedIp = listenInfo->announcedIp()->str(); } + RTC::Transport::SocketFlags flags; + + flags.ipv6Only = listenInfo->flags()->ipv6Only(); + flags.udpReusePort = listenInfo->flags()->udpReusePort(); + if (listenInfo->protocol() == FBS::Transport::Protocol::UDP) { // This may throw. @@ -67,11 +72,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags); } else { - udpSocket = new RTC::UdpSocket(this, ip); + udpSocket = new RTC::UdpSocket(this, ip, flags); } this->udpSocketOrTcpServers.emplace_back(udpSocket, nullptr, announcedIp); @@ -99,11 +104,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags); } else { - tcpServer = new RTC::TcpServer(this, this, ip); + tcpServer = new RTC::TcpServer(this, this, ip, flags); } this->udpSocketOrTcpServers.emplace_back(nullptr, tcpServer, announcedIp); diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 19e33d5f6d..6eb1f9717d 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -62,6 +62,11 @@ namespace RTC announcedIp = listenInfo->announcedIp()->str(); } + RTC::Transport::SocketFlags flags; + + flags.ipv6Only = listenInfo->flags()->ipv6Only(); + flags.udpReusePort = listenInfo->flags()->udpReusePort(); + const uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; const uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); @@ -72,11 +77,11 @@ namespace RTC if (listenInfo->port() != 0) { - udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port()); + udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags); } else { - udpSocket = new RTC::UdpSocket(this, ip); + udpSocket = new RTC::UdpSocket(this, ip, flags); } this->udpSockets[udpSocket] = announcedIp; @@ -114,11 +119,11 @@ namespace RTC if (listenInfo->port() != 0) { - tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port()); + tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags); } else { - tcpServer = new RTC::TcpServer(this, this, ip); + tcpServer = new RTC::TcpServer(this, this, ip, flags); } this->tcpServers[tcpServer] = announcedIp;