Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed throughput issues by fixing lack of guarentee on SCTP -> DTLS packet ordering #513

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 25 additions & 29 deletions sctp/src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use association_internal::*;
use association_stats::*;
use bytes::{Bytes, BytesMut};
use rand::random;
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex};
use util::Conn;

use crate::chunk::chunk_abort::ChunkAbort;
Expand Down Expand Up @@ -487,18 +487,6 @@ impl Association {
let done = Arc::new(AtomicBool::new(false));
let name = Arc::new(name);

let limit = {
#[cfg(test)]
{
1
}
#[cfg(not(test))]
{
8
}
};

let sem = Arc::new(Semaphore::new(limit));
while !done.load(Ordering::Relaxed) {
//log::debug!("[{}] gather_outbound begin", name);
let (packets, continue_loop) = {
Expand All @@ -507,35 +495,43 @@ impl Association {
};
//log::debug!("[{}] gather_outbound done with {}", name, packets.len());

// We schedule a new task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress
let net_conn = Arc::clone(&net_conn);
let bytes_sent = Arc::clone(&bytes_sent);
let name2 = Arc::clone(&name);
let done2 = Arc::clone(&done);
let sem = Arc::clone(&sem);
sem.acquire().await.unwrap().forget();
tokio::task::spawn(async move {
let mut buf = BytesMut::with_capacity(16 * 1024);
for raw in packets {
buf.clear();
if let Err(err) = raw.marshal_to(&mut buf) {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
} else {
let mut buffer = None;
for raw in packets {
let mut buf = buffer
.take()
.unwrap_or_else(|| BytesMut::with_capacity(16 * 1024));

// We do the marshalling work in a blocking task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress
match tokio::task::spawn_blocking(move || raw.marshal_to(&mut buf).map(|_| buf))
.await
.unwrap()
{
Ok(mut buf) => {
let raw = buf.as_ref();
if let Err(err) = net_conn.send(raw.as_ref()).await {
log::warn!("[{}] failed to write packets on net_conn: {}", name2, err);
done2.store(true, Ordering::Relaxed)
} else {
bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
}

// Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take ownership.
buf.clear();
buffer = Some(buf);
}
Err(err) => {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
}
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}
sem.add_permits(1);
});
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}

if !continue_loop {
break;
Expand Down
Loading