Skip to content

Commit

Permalink
ListenInfo: Add transport socket flags (#1291)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc authored Jan 3, 2024
1 parent a36f2c2 commit ac2bbcb
Show file tree
Hide file tree
Showing 48 changed files with 519 additions and 101 deletions.
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:

env:
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker-prebuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### NEXT

* Avoid modification of user input data ([PR #1285](https://github.com/versatica/mediasoup/pull/1285)).
* `ListenInfo`: Add transport socket flags ([PR #1291](https://github.com/versatica/mediasoup/pull/1291)).


### 3.13.13
Expand Down
17 changes: 16 additions & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
Transport,
TransportListenInfo,
TransportListenIp,
TransportProtocol
TransportProtocol,
TransportSocketFlags
} from './Transport';
import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport';
import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport';
Expand Down Expand Up @@ -570,6 +571,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
));
Expand Down Expand Up @@ -749,6 +751,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand All @@ -759,6 +762,7 @@ export class Router<RouterAppData extends AppData = AppData>
rtcpListenInfo.ip,
rtcpListenInfo.announcedIp,
rtcpListenInfo.port,
socketFlagsToFbs(rtcpListenInfo.flags),
rtcpListenInfo.sendBufferSize,
rtcpListenInfo.recvBufferSize
) : undefined,
Expand Down Expand Up @@ -897,6 +901,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand Down Expand Up @@ -1619,3 +1624,13 @@ export function parseRouterDumpResponse(
mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId')
};
}

export function socketFlagsToFbs(
flags: TransportSocketFlags = {}
): FbsTransport.SocketFlagsT
{
return new FbsTransport.SocketFlagsT(
Boolean(flags.ipv6Only),
Boolean(flags.udpReusePort)
);
}
21 changes: 21 additions & 0 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type TransportListenInfo =
*/
port?: number;

/**
* Socket flags.
*/
flags?: TransportSocketFlags;

/**
* Send buffer size (bytes).
*/
Expand Down Expand Up @@ -107,6 +112,22 @@ export type TransportListenIp =
*/
export type TransportProtocol = 'udp' | 'tcp';

/**
* UDP/TCP socket flags.
*/
export type TransportSocketFlags =
{
/**
* Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
*/
ipv6Only?: boolean;
/**
* Make different transports bind to the same ip and port (only for UDP).
* Useful for multicast scenarios with plain transport. Use with caution.
*/
udpReusePort?: boolean;
};

export type TransportTuple =
{
localIp: string;
Expand Down
3 changes: 2 additions & 1 deletion node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import * as ortc from './ortc';
import { Channel } from './Channel';
import { Router, RouterOptions } from './Router';
import { Router, RouterOptions, socketFlagsToFbs } from './Router';
import { WebRtcServer, WebRtcServerOptions } from './WebRtcServer';
import { RtpCodecCapability } from './RtpParameters';
import { AppData } from './types';
Expand Down Expand Up @@ -699,6 +699,7 @@ export class Worker<WorkerAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize)
);
Expand Down
80 changes: 80 additions & 0 deletions node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import * as os from 'node:os';
// @ts-ignore
import * as pickPort from 'pick-port';
import * as mediasoup from '../';

const IS_WINDOWS = os.platform() === 'win32';

let worker: mediasoup.types.Worker;
let router: mediasoup.types.Router;
let transport: mediasoup.types.PlainTransport;
Expand Down Expand Up @@ -316,6 +319,83 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as
.toThrow(Error);
}, 2000);

if (!IS_WINDOWS)
{
test('two transports binding to the same IP:port with udpReusePort flag succeed', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});
}).not.toThrow();

transport1?.close();
transport2?.close();
}, 2000);

test('two transports binding to the same IP:port without udpReusePort flag fails', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});
}).rejects.toThrow();

transport1?.close();
transport2?.close();
}, 2000);
}

test('plainTransport.getStats() succeeds', async () =>
{
const data = await transport.getStats();
Expand Down
8 changes: 4 additions & 4 deletions npm-scripts.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import os from 'node:os';
import fs from 'node:fs';
import path from 'node:path';
import * as process from 'node:process';
import * as os from 'node:os';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
import fetch from 'node-fetch';
import tar from 'tar';
Expand Down
1 change: 1 addition & 0 deletions rust/benches/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
1 change: 1 addition & 0 deletions rust/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
2 changes: 2 additions & 0 deletions rust/examples/multiopus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
});
Expand Down Expand Up @@ -235,6 +236,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}),
Expand Down
1 change: 1 addition & 0 deletions rust/examples/svc-simulcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl SvcSimulcastConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
1 change: 1 addition & 0 deletions rust/examples/videoroom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ mod participant {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
28 changes: 28 additions & 0 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ListenInfo {
/// Listening port.
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
/// Socket flags.
#[serde(skip_serializing_if = "Option::is_none")]
pub flags: Option<SocketFlags>,
/// Send buffer size (bytes).
#[serde(skip_serializing_if = "Option::is_none")]
pub send_buffer_size: Option<u32>,
Expand All @@ -80,12 +83,37 @@ impl ListenInfo {
ip: self.ip.to_string(),
announced_ip: self.announced_ip.map(|ip| ip.to_string()),
port: self.port.unwrap_or(0),
flags: Box::new(self.flags.unwrap_or_default().to_fbs()),
send_buffer_size: self.send_buffer_size.unwrap_or(0),
recv_buffer_size: self.recv_buffer_size.unwrap_or(0),
}
}
}

/// UDP/TCP socket flags.
#[derive(
Default, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize,
)]
#[serde(rename_all = "camelCase")]
pub struct SocketFlags {
/// Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
/// Defaults to false.
pub ipv6_only: bool,
/// Make different transports bind to the same ip and port (only for UDP).
/// Useful for multicast scenarios with plain transport. Use with caution.
/// Defaults to false.
pub udp_reuse_port: bool,
}

impl SocketFlags {
pub(crate) fn to_fbs(self) -> transport::SocketFlags {
transport::SocketFlags {
ipv6_only: self.ipv6_only,
udp_reuse_port: self.udp_reuse_port,
}
}
}

/// ICE role.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down
Loading

0 comments on commit ac2bbcb

Please sign in to comment.