From be89907f381c267c8fd1db60e49d9b7909b29050 Mon Sep 17 00:00:00 2001 From: Stuart Hunt Date: Tue, 14 Nov 2023 16:35:59 -0500 Subject: [PATCH 1/3] Fixed lack of guarentee on SCTP -> DTLS packet ordering by removing tokio::spawn. The ordering mixup triggered SCTPs congestion control, severely limitting throughput in practice. --- sctp/src/association/mod.rs | 65 +++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/sctp/src/association/mod.rs b/sctp/src/association/mod.rs index fdf2cda4f..08baca447 100644 --- a/sctp/src/association/mod.rs +++ b/sctp/src/association/mod.rs @@ -14,7 +14,7 @@ use association_internal::*; use association_stats::*; use bytes::{Bytes, BytesMut}; use rand::random; -use tokio::sync::{broadcast, mpsc, Mutex, Semaphore}; +use tokio::sync::{broadcast, mpsc, Mutex}; use util::Conn; use crate::chunk::chunk_abort::ChunkAbort; @@ -487,18 +487,6 @@ impl Association { let done = Arc::new(AtomicBool::new(false)); let name = Arc::new(name); - let limit = { - #[cfg(test)] - { - 1 - } - #[cfg(not(test))] - { - 8 - } - }; - - let sem = Arc::new(Semaphore::new(limit)); while !done.load(Ordering::Relaxed) { //log::debug!("[{}] gather_outbound begin", name); let (packets, continue_loop) = { @@ -507,35 +495,40 @@ impl Association { }; //log::debug!("[{}] gather_outbound done with {}", name, packets.len()); - // We schedule a new task here for a reason: - // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread - // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio - // Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress let net_conn = Arc::clone(&net_conn); let bytes_sent = Arc::clone(&bytes_sent); let name2 = Arc::clone(&name); let done2 = Arc::clone(&done); - let sem = Arc::clone(&sem); - sem.acquire().await.unwrap().forget(); - tokio::task::spawn(async move { - let mut buf = BytesMut::with_capacity(16 * 1024); - for raw in packets { - buf.clear(); - if let Err(err) = raw.marshal_to(&mut buf) { - log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); - } else { - let raw = buf.as_ref(); - if let Err(err) = net_conn.send(raw.as_ref()).await { - log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); - done2.store(true, Ordering::Relaxed) - } else { - bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); + let mut buffer = None; + for raw in packets { + let mut buf = buffer.take().unwrap_or_else(|| BytesMut::with_capacity(16 * 1024)); + + // We do the marshalling work in a blocking task here for a reason: + // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread + // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio + // Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress + match tokio::task::spawn_blocking(move || { + raw.marshal_to(&mut buf).map(|_| buf) + }).await.unwrap() { + Ok(mut buf) => { + let raw = buf.as_ref(); + if let Err(err) = net_conn.send(raw.as_ref()).await { + log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); + done2.store(true, Ordering::Relaxed) + } else { + bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); + } + + // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship. + buf.clear(); + buffer = Some(buf); + }, + Err(err) => { + log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); } } - //log::debug!("[{}] sending {} bytes done", name, raw.len()); - } - sem.add_permits(1); - }); + //log::debug!("[{}] sending {} bytes done", name, raw.len()); + } if !continue_loop { break; From 64ae3734c653e1cbc900eef7f581a52a68cc8ada Mon Sep 17 00:00:00 2001 From: Stuart Hunt Date: Wed, 15 Nov 2023 12:16:14 -0500 Subject: [PATCH 2/3] Formatted --- sctp/src/association/mod.rs | 41 ++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/sctp/src/association/mod.rs b/sctp/src/association/mod.rs index 08baca447..f56b80933 100644 --- a/sctp/src/association/mod.rs +++ b/sctp/src/association/mod.rs @@ -501,32 +501,35 @@ impl Association { let done2 = Arc::clone(&done); let mut buffer = None; for raw in packets { - let mut buf = buffer.take().unwrap_or_else(|| BytesMut::with_capacity(16 * 1024)); + let mut buf = buffer + .take() + .unwrap_or_else(|| BytesMut::with_capacity(16 * 1024)); // We do the marshalling work in a blocking task here for a reason: // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio // Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress - match tokio::task::spawn_blocking(move || { - raw.marshal_to(&mut buf).map(|_| buf) - }).await.unwrap() { - Ok(mut buf) => { - let raw = buf.as_ref(); - if let Err(err) = net_conn.send(raw.as_ref()).await { - log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); - done2.store(true, Ordering::Relaxed) - } else { - bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); - } - - // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship. - buf.clear(); - buffer = Some(buf); - }, - Err(err) => { - log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); + match tokio::task::spawn_blocking(move || raw.marshal_to(&mut buf).map(|_| buf)) + .await + .unwrap() + { + Ok(mut buf) => { + let raw = buf.as_ref(); + if let Err(err) = net_conn.send(raw.as_ref()).await { + log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); + done2.store(true, Ordering::Relaxed) + } else { + bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); } + + // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship. + buf.clear(); + buffer = Some(buf); } + Err(err) => { + log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); + } + } //log::debug!("[{}] sending {} bytes done", name, raw.len()); } From 6c7abd1cc6275d21a0c5abd88f7ed88bea689fb4 Mon Sep 17 00:00:00 2001 From: Stuart Hunt Date: Wed, 15 Nov 2023 13:01:04 -0500 Subject: [PATCH 3/3] Fixed typo --- sctp/src/association/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sctp/src/association/mod.rs b/sctp/src/association/mod.rs index f56b80933..44526a09b 100644 --- a/sctp/src/association/mod.rs +++ b/sctp/src/association/mod.rs @@ -522,7 +522,7 @@ impl Association { bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); } - // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship. + // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take ownership. buf.clear(); buffer = Some(buf); }