Skip to content

Commit

Permalink
Bugfix/fix interceptor/webrtc unit test memory leak (webrtc-rs#629)
Browse files Browse the repository at this point in the history
* Fix MockStream cyclic dependency to itself

- it often binds itself into Interceptor and owns the returned sender/writer. The returned sender/writer might contain the Arc<> to MockStream itself. Thus, the cyclic dependency is formed. We create an internal struct to avoid this.

* Fix cyclic dependency between PeerConnectionInternal and StatsInterceptor

- PeerConnectionInternal should not own StatsInterceptor. Make it Weak<>

* Fix cyclic dependency for dtls_transport

- IceTransport holds OnConnectionStateChangeFn, which hods DtlsTransport and DtlsTransport holds IceTransport. In the callback we should use Weak<>

* Fix self reference test case in data_channel_test.rs

* Fix formatting
  • Loading branch information
mutexd authored and tubzby committed Nov 26, 2024
1 parent 8e6e597 commit aad7f38
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 230 deletions.
72 changes: 38 additions & 34 deletions interceptor/src/mock/mock_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub struct MockStream {
rtcp_writer: Mutex<Option<Arc<dyn RTCPWriter + Send + Sync>>>,
rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>,

internal: Arc<MockStreamInternal>,
}

struct MockStreamInternal {
rtcp_out_modified_tx: mpsc::Sender<RTCPPackets>,
rtp_out_modified_tx: mpsc::Sender<rtp::packet::Packet>,
rtcp_in_rx: Mutex<mpsc::Receiver<RTCPPackets>>,
Expand Down Expand Up @@ -46,44 +50,44 @@ impl MockStream {

let stream = Arc::new(MockStream {
interceptor: Arc::clone(&interceptor),

rtcp_writer: Mutex::new(None),
rtp_writer: Mutex::new(None),

rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)),
rtp_in_tx: Mutex::new(Some(rtp_in_tx)),
rtcp_in_rx: Mutex::new(rtcp_in_rx),
rtp_in_rx: Mutex::new(rtp_in_rx),

rtcp_out_modified_tx,
rtp_out_modified_tx,
rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx),
rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx),

rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx),
rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx),
internal: Arc::new(MockStreamInternal {
rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)),
rtp_in_tx: Mutex::new(Some(rtp_in_tx)),
rtcp_in_rx: Mutex::new(rtcp_in_rx),
rtp_in_rx: Mutex::new(rtp_in_rx),

rtcp_out_modified_tx,
rtp_out_modified_tx,
rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx),
rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx),

rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx),
rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx),
}),
});

let rtcp_writer = interceptor
.bind_rtcp_writer(Arc::clone(&stream) as Arc<dyn RTCPWriter + Send + Sync>)
.bind_rtcp_writer(Arc::clone(&stream.internal) as Arc<dyn RTCPWriter + Send + Sync>)
.await;
{
let mut rw = stream.rtcp_writer.lock().await;
*rw = Some(rtcp_writer);
*rw = Some(Arc::clone(&rtcp_writer));
}
let rtp_writer = interceptor
.bind_local_stream(
info,
Arc::clone(&stream) as Arc<dyn RTPWriter + Send + Sync>,
Arc::clone(&stream.internal) as Arc<dyn RTPWriter + Send + Sync>,
)
.await;
{
let mut rw = stream.rtp_writer.lock().await;
*rw = Some(rtp_writer);
*rw = Some(Arc::clone(&rtp_writer));
}

let rtcp_reader = interceptor
.bind_rtcp_reader(Arc::clone(&stream) as Arc<dyn RTCPReader + Send + Sync>)
.bind_rtcp_reader(Arc::clone(&stream.internal) as Arc<dyn RTCPReader + Send + Sync>)
.await;
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
Expand All @@ -104,7 +108,7 @@ impl MockStream {
let rtp_reader = interceptor
.bind_remote_stream(
info,
Arc::clone(&stream) as Arc<dyn RTPReader + Send + Sync>,
Arc::clone(&stream.internal) as Arc<dyn RTPReader + Send + Sync>,
)
.await;
tokio::spawn(async move {
Expand Down Expand Up @@ -153,23 +157,23 @@ impl MockStream {

/// receive_rtcp schedules a new rtcp batch, so it can be read be the stream
pub async fn receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>) {
let rtcp_in_tx = self.rtcp_in_tx.lock().await;
let rtcp_in_tx = self.internal.rtcp_in_tx.lock().await;
if let Some(tx) = &*rtcp_in_tx {
let _ = tx.send(pkts).await;
}
}

/// receive_rtp schedules a rtp packet, so it can be read be the stream
pub async fn receive_rtp(&self, pkt: rtp::packet::Packet) {
let rtp_in_tx = self.rtp_in_tx.lock().await;
let rtp_in_tx = self.internal.rtp_in_tx.lock().await;
if let Some(tx) = &*rtp_in_tx {
let _ = tx.send(pkt).await;
}
}

/// written_rtcp returns a channel containing the rtcp batches written, modified by the interceptor
pub async fn written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
let mut rtcp_out_modified_rx = self.internal.rtcp_out_modified_rx.lock().await;
rtcp_out_modified_rx.recv().await
}

Expand All @@ -180,7 +184,7 @@ impl MockStream {
&self,
) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
let mut last = None;
let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
let mut rtcp_out_modified_rx = self.internal.rtcp_out_modified_rx.lock().await;

while let Ok(v) = rtcp_out_modified_rx.try_recv() {
last = Some(v);
Expand All @@ -191,40 +195,40 @@ impl MockStream {

/// written_rtp returns a channel containing rtp packets written, modified by the interceptor
pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> {
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
let mut rtp_out_modified_rx = self.internal.rtp_out_modified_rx.lock().await;
rtp_out_modified_rx.recv().await
}

/// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
pub async fn read_rtcp(
&self,
) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>> {
let mut rtcp_in_modified_rx = self.rtcp_in_modified_rx.lock().await;
let mut rtcp_in_modified_rx = self.internal.rtcp_in_modified_rx.lock().await;
rtcp_in_modified_rx.recv().await
}

/// read_rtp returns a channel containing the rtp packets read, modified by the interceptor
pub async fn read_rtp(&self) -> Option<Result<rtp::packet::Packet>> {
let mut rtp_in_modified_rx = self.rtp_in_modified_rx.lock().await;
let mut rtp_in_modified_rx = self.internal.rtp_in_modified_rx.lock().await;
rtp_in_modified_rx.recv().await
}

/// close closes the stream and the underlying interceptor
/// close closes the stream
pub async fn close(&self) -> Result<()> {
{
let mut rtcp_in_tx = self.rtcp_in_tx.lock().await;
let mut rtcp_in_tx = self.internal.rtcp_in_tx.lock().await;
rtcp_in_tx.take();
}
{
let mut rtp_in_tx = self.rtp_in_tx.lock().await;
let mut rtp_in_tx = self.internal.rtp_in_tx.lock().await;
rtp_in_tx.take();
}
self.interceptor.close().await
}
}

#[async_trait]
impl RTCPWriter for MockStream {
impl RTCPWriter for MockStreamInternal {
async fn write(
&self,
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
Expand All @@ -237,7 +241,7 @@ impl RTCPWriter for MockStream {
}

#[async_trait]
impl RTCPReader for MockStream {
impl RTCPReader for MockStreamInternal {
async fn read(
&self,
buf: &mut [u8],
Expand All @@ -260,15 +264,15 @@ impl RTCPReader for MockStream {
}

#[async_trait]
impl RTPWriter for MockStream {
impl RTPWriter for MockStreamInternal {
async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize> {
let _ = self.rtp_out_modified_tx.send(pkt.clone()).await;
Ok(0)
}
}

#[async_trait]
impl RTPReader for MockStream {
impl RTPReader for MockStreamInternal {
async fn read(
&self,
buf: &mut [u8],
Expand Down
26 changes: 17 additions & 9 deletions webrtc/src/data_channel/data_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ async fn test_data_channel_send_before_signaling() -> Result<()> {
return Box::pin(async {});
}
Box::pin(async move {
let d2 = Arc::clone(&d);
let d2 = Arc::downgrade(&d);
d.on_message(Box::new(move |_: DataChannelMessage| {
let d3 = Arc::clone(&d2);
let d3 = d2.clone();
Box::pin(async move {
let result = d3.send(&Bytes::from(b"Pong".to_vec())).await;
let result = d3
.upgrade()
.unwrap()
.send(&Bytes::from(b"Pong".to_vec()))
.await;
assert!(result.is_ok(), "Failed to send string on data channel");
})
}));
Expand All @@ -218,11 +222,11 @@ async fn test_data_channel_send_before_signaling() -> Result<()> {

assert!(dc.ordered(), "Ordered should be set to true");

let dc2 = Arc::clone(&dc);
let dc2 = Arc::downgrade(&dc);
dc.on_open(Box::new(move || {
let dc3 = Arc::clone(&dc2);
let dc3 = dc2.clone();
Box::pin(async move {
let result = dc3.send_text("Ping".to_owned()).await;
let result = dc3.upgrade().unwrap().send_text("Ping".to_owned()).await;
assert!(result.is_ok(), "Failed to send string on data channel");
})
}));
Expand Down Expand Up @@ -258,12 +262,16 @@ async fn test_data_channel_send_after_connected() -> Result<()> {
return Box::pin(async {});
}
Box::pin(async move {
let d2 = Arc::clone(&d);
let d2 = Arc::downgrade(&d);
d.on_message(Box::new(move |_: DataChannelMessage| {
let d3 = Arc::clone(&d2);
let d3 = d2.clone();

Box::pin(async move {
let result = d3.send(&Bytes::from(b"Pong".to_vec())).await;
let result = d3
.upgrade()
.unwrap()
.send(&Bytes::from(b"Pong".to_vec()))
.await;
assert!(result.is_ok(), "Failed to send string on data channel");
})
}));
Expand Down
10 changes: 7 additions & 3 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,13 @@ impl RTCPeerConnection {
};

let weak_interceptor = Arc::downgrade(&interceptor);
let (internal, configuration) =
PeerConnectionInternal::new(api, weak_interceptor, stats_interceptor, configuration)
.await?;
let (internal, configuration) = PeerConnectionInternal::new(
api,
weak_interceptor,
Arc::downgrade(&stats_interceptor),
configuration,
)
.await?;
let internal_rtcp_writer = Arc::clone(&internal) as Arc<dyn RTCPWriter + Send + Sync>;
let interceptor_rtcp_writer = interceptor.bind_rtcp_writer(internal_rtcp_writer).await;

Expand Down
Loading

0 comments on commit aad7f38

Please sign in to comment.