diff --git a/webrtc/src/mux/mod.rs b/webrtc/src/mux/mod.rs index c4b265f08..d5bea2e65 100644 --- a/webrtc/src/mux/mod.rs +++ b/webrtc/src/mux/mod.rs @@ -14,6 +14,7 @@ use util::{Buffer, Conn}; use crate::error::Result; use crate::mux::endpoint::Endpoint; use crate::mux::mux_func::MatchFunc; +use crate::util::Error; /// mux multiplexes packets on a single socket (RFC7983) @@ -64,8 +65,6 @@ impl Mux { let id = self.id.fetch_add(1, Ordering::SeqCst); // Set a maximum size of the buffer in bytes. - // NOTE: We actually won't get anywhere close to this limit. - // SRTP will constantly read from the endpoint and drop packets if it's full. let e = Arc::new(Endpoint { id, buffer: Buffer::new(0, MAX_BUFFER_SIZE), @@ -135,7 +134,14 @@ impl Mux { } if let Some(ep) = endpoint { - ep.buffer.write(buf).await?; + match ep.buffer.write(buf).await { + // Expected when bytes are received faster than the endpoint can process them + Err(Error::ErrBufferFull) => { + log::info!("mux: endpoint buffer is full, dropping packet") + } + Ok(_) => (), + Err(e) => return Err(crate::Error::Util(e)), + } } else if !buf.is_empty() { log::warn!( "Warning: mux: no endpoint for packet starting with {}", diff --git a/webrtc/src/mux/mux_test.rs b/webrtc/src/mux/mux_test.rs index d0782ed52..36e665100 100644 --- a/webrtc/src/mux/mux_test.rs +++ b/webrtc/src/mux/mux_test.rs @@ -6,26 +6,10 @@ use async_trait::async_trait; use util::conn::conn_pipe::pipe; use super::*; -use crate::mux::mux_func::match_all; +use crate::mux::mux_func::{match_all, match_srtp}; const TEST_PIPE_BUFFER_SIZE: usize = 8192; -async fn pipe_memory() -> (Arc, impl Conn) { - // In memory pipe - let (ca, cb) = pipe(); - - let mut m = Mux::new(Config { - conn: Arc::new(ca), - buffer_size: TEST_PIPE_BUFFER_SIZE, - }); - - let e = m.new_endpoint(Box::new(match_all)).await; - m.remove_endpoint(&e).await; - let e = m.new_endpoint(Box::new(match_all)).await; - - (e, cb) -} - #[tokio::test] async fn test_no_endpoints() -> crate::error::Result<()> { // In memory pipe @@ -129,3 +113,25 @@ async fn test_non_fatal_read() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_non_fatal_dispatch() -> Result<()> { + let (ca, cb) = pipe(); + + let mut m = Mux::new(Config { + conn: Arc::new(ca), + buffer_size: TEST_PIPE_BUFFER_SIZE, + }); + + let e = m.new_endpoint(Box::new(match_srtp)).await; + e.buffer.set_limit_size(1).await; + + for _ in 0..25 { + let srtp_packet = [128, 1, 2, 3, 4].to_vec(); + cb.send(&srtp_packet).await?; + } + + m.close().await; + + Ok(()) +}