diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 70aebaeb08..80c51c236d 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -2,6 +2,9 @@ name: Bench on: workflow_call: workflow_dispatch: + schedule: + # Run at 1 AM each day, so there is a `main`-branch baseline in the cache. + - cron: '0 1 * * *' env: CARGO_PROFILE_BENCH_BUILD_OVERRIDE_DEBUG: true CARGO_PROFILE_RELEASE_DEBUG: true @@ -181,7 +184,7 @@ jobs: echo "### Benchmark results" echo } > results.md - SHA=$(cat target/criterion/baseline-sha.txt) + SHA=$(cat target/criterion/baseline-sha.txt || true) if [ -n "$SHA" ]; then { echo "Performance differences relative to $SHA." diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 10085ffda6..9dc8ff2b7f 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -113,7 +113,7 @@ jobs: - name: Run tests and determine coverage run: | # shellcheck disable=SC2086 - cargo +${{ matrix.rust-toolchain }} llvm-cov nextest $BUILD_TYPE --all-targets --features ci --no-fail-fast --lcov --output-path lcov.info + cargo +${{ matrix.rust-toolchain }} llvm-cov nextest $BUILD_TYPE --features ci --no-fail-fast --lcov --output-path lcov.info cargo +${{ matrix.rust-toolchain }} bench --features bench --no-run - name: Run client/server transfer @@ -122,6 +122,8 @@ jobs: cargo +${{ matrix.rust-toolchain }} build $BUILD_TYPE --bin neqo-client --bin neqo-server "target/$BUILD_DIR/neqo-server" "$HOST:4433" & PID=$! + # Give the server time to start. + sleep 1 "target/$BUILD_DIR/neqo-client" --output-dir . "https://$HOST:4433/$SIZE" kill $PID [ "$(wc -c <"$SIZE")" -eq "$SIZE" ] || exit 1 @@ -146,7 +148,7 @@ jobs: # respective default features only. Can reveal warnings otherwise # hidden given that a plain cargo clippy combines all features of the # workspace. See e.g. https://github.com/mozilla/neqo/pull/1695. - cargo +${{ matrix.rust-toolchain }} hack clippy --all-targets -- -D warnings || ${{ matrix.rust-toolchain == 'nightly' }} + cargo +${{ matrix.rust-toolchain }} hack clippy --all-targets --feature-powerset --exclude-features gecko -- -D warnings || ${{ matrix.rust-toolchain == 'nightly' }} if: success() || failure() - name: Check rustdoc links diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index d36d2ecdca..04210e00db 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -25,6 +25,7 @@ workspace = true [dependencies] # neqo-bin is not used in Firefox, so we can be liberal with dependency versions clap = { version = "4.4", default-features = false, features = ["std", "color", "help", "usage", "error-context", "suggestions", "derive"] } +clap-verbosity-flag = { version = "2.2", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false } diff --git a/neqo-bin/src/bin/client/http09.rs b/neqo-bin/src/bin/client/http09.rs index 6d9a26fec2..372a112853 100644 --- a/neqo-bin/src/bin/client/http09.rs +++ b/neqo-bin/src/bin/client/http09.rs @@ -17,7 +17,7 @@ use std::{ time::Instant, }; -use neqo_common::{event::Provider, Datagram}; +use neqo_common::{event::Provider, qdebug, qinfo, qwarn, Datagram}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, @@ -50,13 +50,13 @@ impl<'a> super::Handler for Handler<'a> { self.read(client, stream_id)?; } ConnectionEvent::SendStreamWritable { stream_id } => { - println!("stream {stream_id} writable"); + qdebug!("stream {stream_id} writable"); } ConnectionEvent::SendStreamComplete { stream_id } => { - println!("stream {stream_id} complete"); + qdebug!("stream {stream_id} complete"); } ConnectionEvent::SendStreamCreatable { stream_type } => { - println!("stream {stream_type:?} creatable"); + qdebug!("stream {stream_type:?} creatable"); if stream_type == StreamType::BiDi { self.download_urls(client); } @@ -64,7 +64,7 @@ impl<'a> super::Handler for Handler<'a> { ConnectionEvent::StateChange( State::WaitInitial | State::Handshaking | State::Connected, ) => { - println!("{event:?}"); + qdebug!("{event:?}"); self.download_urls(client); } ConnectionEvent::StateChange(State::Confirmed) => { @@ -74,7 +74,7 @@ impl<'a> super::Handler for Handler<'a> { self.token = Some(token); } _ => { - println!("Unhandled event {event:?}"); + qwarn!("Unhandled event {event:?}"); } } } @@ -153,6 +153,10 @@ impl super::Client for Connection { fn is_closed(&self) -> bool { matches!(self.state(), State::Closed(..)) } + + fn stats(&self) -> neqo_transport::Stats { + self.stats() + } } impl<'b> Handler<'b> { @@ -183,7 +187,7 @@ impl<'b> Handler<'b> { fn download_next(&mut self, client: &mut Connection) -> bool { if self.key_update.needed() { - println!("Deferring requests until after first key update"); + qdebug!("Deferring requests until after first key update"); return false; } let url = self @@ -192,7 +196,7 @@ impl<'b> Handler<'b> { .expect("download_next called with empty queue"); match client.stream_create(StreamType::BiDi) { Ok(client_stream_id) => { - println!("Created stream {client_stream_id} for {url}"); + qinfo!("Created stream {client_stream_id} for {url}"); let req = format!("GET {}\r\n", url.path()); _ = client .stream_send(client_stream_id, req.as_bytes()) @@ -203,7 +207,7 @@ impl<'b> Handler<'b> { true } Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => { - println!("Cannot create stream {e:?}"); + qwarn!("Cannot create stream {e:?}"); self.url_queue.push_front(url); false } @@ -231,9 +235,9 @@ impl<'b> Handler<'b> { if let Some(out_file) = maybe_out_file { out_file.write_all(&data[..sz])?; } else if !output_read_data { - println!("READ[{stream_id}]: {sz} bytes"); + qdebug!("READ[{stream_id}]: {sz} bytes"); } else { - println!( + qdebug!( "READ[{}]: {}", stream_id, String::from_utf8(data.clone()).unwrap() @@ -248,7 +252,7 @@ impl<'b> Handler<'b> { fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> { match self.streams.get_mut(&stream_id) { None => { - println!("Data on unexpected stream: {stream_id}"); + qwarn!("Data on unexpected stream: {stream_id}"); return Ok(()); } Some(maybe_out_file) => { @@ -263,7 +267,7 @@ impl<'b> Handler<'b> { if let Some(mut out_file) = maybe_out_file.take() { out_file.flush()?; } else { - println!(""); + qinfo!(""); } self.streams.remove(&stream_id); self.download_urls(client); diff --git a/neqo-bin/src/bin/client/http3.rs b/neqo-bin/src/bin/client/http3.rs index 07cc0e4cde..e9f5e406a5 100644 --- a/neqo-bin/src/bin/client/http3.rs +++ b/neqo-bin/src/bin/client/http3.rs @@ -18,7 +18,7 @@ use std::{ time::Instant, }; -use neqo_common::{event::Provider, hex, Datagram, Header}; +use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ @@ -125,6 +125,10 @@ impl super::Client for Http3Client { { self.close(now, app_error, msg); } + + fn stats(&self) -> neqo_transport::Stats { + self.transport_stats() + } } impl<'a> super::Handler for Handler<'a> { @@ -145,7 +149,7 @@ impl<'a> super::Handler for Handler<'a> { if let Some(handler) = self.url_handler.stream_handler(stream_id) { handler.process_header_ready(stream_id, fin, headers); } else { - println!("Data on unexpected stream: {stream_id}"); + qwarn!("Data on unexpected stream: {stream_id}"); } if fin { self.url_handler.on_stream_fin(client, stream_id); @@ -155,7 +159,7 @@ impl<'a> super::Handler for Handler<'a> { let mut stream_done = false; match self.url_handler.stream_handler(stream_id) { None => { - println!("Data on unexpected stream: {stream_id}"); + qwarn!("Data on unexpected stream: {stream_id}"); } Some(handler) => loop { let mut data = vec![0; 4096]; @@ -189,7 +193,7 @@ impl<'a> super::Handler for Handler<'a> { Http3ClientEvent::DataWritable { stream_id } => { match self.url_handler.stream_handler(stream_id) { None => { - println!("Data on unexpected stream: {stream_id}"); + qwarn!("Data on unexpected stream: {stream_id}"); } Some(handler) => { handler.process_data_writable(client, stream_id); @@ -202,7 +206,7 @@ impl<'a> super::Handler for Handler<'a> { } Http3ClientEvent::ResumptionToken(t) => self.token = Some(t), _ => { - println!("Unhandled event {event:?}"); + qwarn!("Unhandled event {event:?}"); } } } @@ -275,7 +279,7 @@ struct DownloadStreamHandler { impl StreamHandler for DownloadStreamHandler { fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { if self.out_file.is_none() { - println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); + qdebug!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); } } @@ -293,18 +297,18 @@ impl StreamHandler for DownloadStreamHandler { } return Ok(true); } else if !output_read_data { - println!("READ[{stream_id}]: {sz} bytes"); + qdebug!("READ[{stream_id}]: {sz} bytes"); } else if let Ok(txt) = String::from_utf8(data.clone()) { - println!("READ[{stream_id}]: {txt}"); + qdebug!("READ[{stream_id}]: {txt}"); } else { - println!("READ[{}]: 0x{}", stream_id, hex(&data)); + qdebug!("READ[{}]: 0x{}", stream_id, hex(&data)); } if fin { if let Some(mut out_file) = self.out_file.take() { out_file.flush()?; } else { - println!(""); + qdebug!(""); } } @@ -323,7 +327,7 @@ struct UploadStreamHandler { impl StreamHandler for UploadStreamHandler { fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { - println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); + qdebug!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); } fn process_data_readable( @@ -339,7 +343,7 @@ impl StreamHandler for UploadStreamHandler { let parsed: usize = trimmed_txt.parse().unwrap(); if parsed == self.data.len() { let upload_time = Instant::now().duration_since(self.start); - println!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}"); + qinfo!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}"); } } else { panic!("Unexpected data [{}]: 0x{}", stream_id, hex(&data)); @@ -407,7 +411,7 @@ impl<'a> UrlHandler<'a> { Priority::default(), ) { Ok(client_stream_id) => { - println!("Successfully created stream id {client_stream_id} for {url}"); + qdebug!("Successfully created stream id {client_stream_id} for {url}"); let handler: Box = StreamHandlerType::make_handler( &self.handler_type, diff --git a/neqo-bin/src/bin/client/main.rs b/neqo-bin/src/bin/client/main.rs index 7b1a5928a6..63aa12db13 100644 --- a/neqo-bin/src/bin/client/main.rs +++ b/neqo-bin/src/bin/client/main.rs @@ -22,7 +22,7 @@ use futures::{ FutureExt, TryFutureExt, }; use neqo_bin::udp; -use neqo_common::{self as common, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role}; +use neqo_common::{self as common, qdebug, qerror, qinfo, qlog::NeqoQlog, qwarn, Datagram, Role}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init, Cipher, ResumptionToken, @@ -103,7 +103,7 @@ impl KeyUpdateState { _ => return Err(e), } } else { - println!("Keys updated"); + qerror!("Keys updated"); self.0 = false; } } @@ -119,6 +119,9 @@ impl KeyUpdateState { #[command(author, version, about, long_about = None)] #[allow(clippy::struct_excessive_bools)] // Not a good use of that lint. pub struct Args { + #[command(flatten)] + verbose: clap_verbosity_flag::Verbosity, + #[command(flatten)] shared: neqo_bin::SharedArgs, @@ -179,6 +182,10 @@ pub struct Args { /// The request size that will be used for upload test. #[arg(name = "upload-size", long, default_value = "100")] upload_size: usize, + + /// Print connection stats after close. + #[arg(name = "stats", long)] + stats: bool, } impl Args { @@ -207,7 +214,7 @@ impl Args { "http3" => { if let Some(testcase) = &self.test { if testcase.as_str() != "upload" { - eprintln!("Unsupported test case: {testcase}"); + qerror!("Unsupported test case: {testcase}"); exit(127) } @@ -219,7 +226,7 @@ impl Args { } "zerortt" | "resumption" => { if self.urls.len() < 2 { - eprintln!("Warning: resumption tests won't work without >1 URL"); + qerror!("Warning: resumption tests won't work without >1 URL"); exit(127); } self.shared.use_old_http = true; @@ -268,11 +275,11 @@ fn get_output_file( out_path.push(url_path); if all_paths.contains(&out_path) { - eprintln!("duplicate path {}", out_path.display()); + qerror!("duplicate path {}", out_path.display()); return None; } - eprintln!("Saving {url} to {out_path:?}"); + qinfo!("Saving {url} to {out_path:?}"); if let Some(parent) = out_path.parent() { create_dir_all(parent).ok()?; @@ -327,6 +334,7 @@ trait Client { where S: AsRef + Display; fn is_closed(&self) -> bool; + fn stats(&self) -> neqo_transport::Stats; } struct Runner<'a, H: Handler> { @@ -361,6 +369,9 @@ impl<'a, H: Handler> Runner<'a, H> { self.process(None).await?; if self.client.is_closed() { + if self.args.stats { + qinfo!("{:?}", self.client.stats()); + } return Ok(self.handler.take_token()); } @@ -390,7 +401,7 @@ impl<'a, H: Handler> Runner<'a, H> { self.socket.send(dgram)?; } Output::Callback(new_timeout) => { - qinfo!("Setting timeout of {:?}", new_timeout); + qdebug!("Setting timeout of {:?}", new_timeout); self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); break; } @@ -436,11 +447,12 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { #[tokio::main] async fn main() -> Res<()> { - init(); - let mut args = Args::parse(); + neqo_common::log::init(Some(args.verbose.log_level_filter())); args.update_for_tests(); + init(); + let urls_by_origin = args .urls .clone() @@ -453,14 +465,14 @@ async fn main() -> Res<()> { .filter_map(|(origin, urls)| match origin { Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), Origin::Opaque(x) => { - eprintln!("Opaque origin {x:?}"); + qwarn!("Opaque origin {x:?}"); None } }); for ((host, port), mut urls) in urls_by_origin { if args.resume && urls.len() < 2 { - eprintln!("Resumption to {host} cannot work without at least 2 URLs."); + qerror!("Resumption to {host} cannot work without at least 2 URLs."); exit(127); } @@ -471,7 +483,7 @@ async fn main() -> Res<()> { ) }); let Some(remote_addr) = remote_addr else { - eprintln!("No compatible address found for: {host}"); + qerror!("No compatible address found for: {host}"); exit(1); }; @@ -482,7 +494,7 @@ async fn main() -> Res<()> { let mut socket = udp::Socket::bind(local_addr)?; let real_local = socket.local_addr().unwrap(); - println!( + qinfo!( "{} Client connecting: {:?} -> {:?}", if args.shared.use_old_http { "H9" } else { "H3" }, real_local, diff --git a/neqo-bin/src/bin/server/main.rs b/neqo-bin/src/bin/server/main.rs index f694cf98c1..753794d6f6 100644 --- a/neqo-bin/src/bin/server/main.rs +++ b/neqo-bin/src/bin/server/main.rs @@ -25,7 +25,7 @@ use futures::{ FutureExt, }; use neqo_bin::udp; -use neqo_common::{hex, qinfo, qwarn, Datagram, Header}; +use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, generate_ech_keys, init_db, random, AntiReplay, Cipher, @@ -89,6 +89,9 @@ impl std::error::Error for ServerError {} #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] struct Args { + #[command(flatten)] + verbose: clap_verbosity_flag::Verbosity, + #[command(flatten)] shared: neqo_bin::SharedArgs, @@ -166,17 +169,17 @@ fn qns_read_response(filename: &str) -> Option> { OpenOptions::new() .read(true) .open(&file_path) - .map_err(|_e| eprintln!("Could not open {}", file_path.display())) + .map_err(|_e| qerror!("Could not open {}", file_path.display())) .ok() .and_then(|mut f| { let mut data = Vec::new(); match f.read_to_end(&mut data) { Ok(sz) => { - println!("{} bytes read from {}", sz, file_path.display()); + qinfo!("{} bytes read from {}", sz, file_path.display()); Some(data) } Err(e) => { - eprintln!("Error reading data: {e:?}"); + qerror!("Error reading data: {e:?}"); None } } @@ -312,7 +315,7 @@ impl HttpServer for SimpleServer { headers, fin, } => { - println!("Headers (request={stream} fin={fin}): {headers:?}"); + qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); let post = if let Some(method) = headers.iter().find(|&h| h.name() == ":method") { @@ -428,7 +431,7 @@ impl ServersRunner { pub fn new(args: Args) -> Result { let hosts = args.listen_addresses(); if hosts.is_empty() { - eprintln!("No valid hosts defined"); + qerror!("No valid hosts defined"); return Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts")); } let sockets = hosts @@ -436,7 +439,7 @@ impl ServersRunner { .map(|host| { let socket = udp::Socket::bind(host)?; let local_addr = socket.local_addr()?; - println!("Server waiting for connection on: {local_addr:?}"); + qinfo!("Server waiting for connection on: {local_addr:?}"); Ok((host, socket)) }) @@ -479,7 +482,7 @@ impl ServersRunner { } if args.ech { let cfg = svr.enable_ech(); - println!("ECHConfigList: {}", hex(cfg)); + qinfo!("ECHConfigList: {}", hex(cfg)); } svr } @@ -507,7 +510,7 @@ impl ServersRunner { socket.send(dgram)?; } Output::Callback(new_timeout) => { - qinfo!("Setting timeout of {:?}", new_timeout); + qdebug!("Setting timeout of {:?}", new_timeout); self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); break; } @@ -573,6 +576,7 @@ async fn main() -> Result<(), io::Error> { const HQ_INTEROP: &str = "hq-interop"; let mut args = Args::parse(); + neqo_common::log::init(Some(args.verbose.log_level_filter())); assert!(!args.key.is_empty(), "Need at least one key"); init_db(args.db.clone()); diff --git a/neqo-bin/src/bin/server/old_https.rs b/neqo-bin/src/bin/server/old_https.rs index a159029007..505a16578f 100644 --- a/neqo-bin/src/bin/server/old_https.rs +++ b/neqo-bin/src/bin/server/old_https.rs @@ -8,7 +8,7 @@ use std::{ cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant, }; -use neqo_common::{event::Provider, hex, qdebug, Datagram}; +use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram}; use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher}; use neqo_http3::Error; use neqo_transport::{ @@ -149,7 +149,7 @@ impl Http09Server { } Some(path) => { let path = path.as_str(); - eprintln!("Path = '{path}'"); + qdebug!("Path = '{path}'"); if args.shared.qns_test.is_some() { qns_read_response(path) } else { @@ -164,7 +164,7 @@ impl Http09Server { fn stream_writable(&mut self, stream_id: StreamId, conn: &mut ActiveConnectionRef) { match self.write_state.get_mut(&stream_id) { None => { - eprintln!("Unknown stream {stream_id}, ignoring event"); + qwarn!("Unknown stream {stream_id}, ignoring event"); } Some(stream_state) => { stream_state.writable = true; @@ -177,7 +177,7 @@ impl Http09Server { *offset += sent; self.server.add_to_waiting(conn); if *offset == data.len() { - eprintln!("Sent {sent} on {stream_id}, closing"); + qinfo!("Sent {sent} on {stream_id}, closing"); conn.borrow_mut().stream_close_send(stream_id).unwrap(); self.write_state.remove(&stream_id); } else { @@ -222,7 +222,7 @@ impl HttpServer for Http09Server { ConnectionEvent::StateChange(_) | ConnectionEvent::SendStreamCreatable { .. } | ConnectionEvent::SendStreamComplete { .. } => (), - e => eprintln!("unhandled event {e:?}"), + e => qwarn!("unhandled event {e:?}"), } } } diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index 89eaa53890..069d67b834 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -21,6 +21,7 @@ qlog = { version = "0.12", default-features = false } time = { version = "0.3", default-features = false, features = ["formatting"] } [dev-dependencies] +criterion = { version = "0.5", default-features = false, features = ["html_reports"] } test-fixture = { path = "../test-fixture" } [features] @@ -33,3 +34,7 @@ features = ["timeapi"] [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "timer" +harness = false diff --git a/neqo-common/benches/timer.rs b/neqo-common/benches/timer.rs new file mode 100644 index 0000000000..5ac8019db4 --- /dev/null +++ b/neqo-common/benches/timer.rs @@ -0,0 +1,39 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use neqo_common::timer::Timer; +use test_fixture::now; + +fn benchmark_timer(c: &mut Criterion) { + c.bench_function("drain a timer quickly", |b| { + b.iter_batched_ref( + make_timer, + |(_now, timer)| { + while let Some(t) = timer.next_time() { + assert!(timer.take_next(t).is_some()); + } + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +fn make_timer() -> (Instant, Timer<()>) { + const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34]; + + let now = now(); + let mut timer = Timer::new(now, Duration::from_millis(777), 100); + for &t in TIMES { + timer.add(now + Duration::from_secs(t), ()); + } + (now, timer) +} + +criterion_group!(benches, benchmark_timer); +criterion_main!(benches); diff --git a/neqo-common/src/log.rs b/neqo-common/src/log.rs index c5b89be8a6..04028a26bd 100644 --- a/neqo-common/src/log.rs +++ b/neqo-common/src/log.rs @@ -50,7 +50,7 @@ fn since_start() -> Duration { START_TIME.get_or_init(Instant::now).elapsed() } -pub fn init() { +pub fn init(level_filter: Option) { static INIT_ONCE: Once = Once::new(); if ::log::STATIC_MAX_LEVEL == ::log::LevelFilter::Off { @@ -59,6 +59,9 @@ pub fn init() { INIT_ONCE.call_once(|| { let mut builder = Builder::from_env("RUST_LOG"); + if let Some(filter) = level_filter { + builder.filter_level(filter); + } builder.format(|buf, record| { let elapsed = since_start(); writeln!( @@ -71,9 +74,9 @@ pub fn init() { ) }); if let Err(e) = builder.try_init() { - do_log!(::log::Level::Info, "Logging initialization error {:?}", e); + do_log!(::log::Level::Warn, "Logging initialization error {:?}", e); } else { - do_log!(::log::Level::Info, "Logging initialized"); + do_log!(::log::Level::Debug, "Logging initialized"); } }); } @@ -81,32 +84,32 @@ pub fn init() { #[macro_export] macro_rules! log_invoke { ($lvl:expr, $ctx:expr, $($arg:tt)*) => ( { - ::neqo_common::log::init(); + ::neqo_common::log::init(None); ::neqo_common::do_log!($lvl, "[{}] {}", $ctx, format!($($arg)*)); } ) } #[macro_export] macro_rules! qerror { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Error, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } ); } #[macro_export] macro_rules! qwarn { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Warn, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } ); } #[macro_export] macro_rules! qinfo { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Info, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } ); } #[macro_export] macro_rules! qdebug { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Debug, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } ); } #[macro_export] macro_rules! qtrace { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Trace, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } ); } diff --git a/neqo-common/src/timer.rs b/neqo-common/src/timer.rs index a413252e08..3feddb2226 100644 --- a/neqo-common/src/timer.rs +++ b/neqo-common/src/timer.rs @@ -5,6 +5,7 @@ // except according to those terms. use std::{ + collections::VecDeque, mem, time::{Duration, Instant}, }; @@ -27,7 +28,7 @@ impl TimerItem { /// points). Time is relative, the wheel has an origin time and it is unable to represent times that /// are more than `granularity * capacity` past that time. pub struct Timer { - items: Vec>>, + items: Vec>>, now: Instant, granularity: Duration, cursor: usize, @@ -55,9 +56,14 @@ impl Timer { /// Return a reference to the time of the next entry. #[must_use] pub fn next_time(&self) -> Option { - for i in 0..self.items.len() { - let idx = self.bucket(i); - if let Some(t) = self.items[idx].first() { + let idx = self.bucket(0); + for i in idx..self.items.len() { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + for i in 0..idx { + if let Some(t) = self.items[i].front() { return Some(t.time); } } @@ -145,6 +151,9 @@ impl Timer { /// Given knowledge of the time an item was added, remove it. /// This requires use of a predicate that identifies matching items. + /// + /// # Panics + /// Impossible, I think. pub fn remove(&mut self, time: Instant, mut selector: F) -> Option where F: FnMut(&T) -> bool, @@ -167,7 +176,7 @@ impl Timer { break; } if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).item); + return Some(self.items[bucket].remove(i).unwrap().item); } } // ... then forwards. @@ -176,7 +185,7 @@ impl Timer { break; } if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).item); + return Some(self.items[bucket].remove(i).unwrap().item); } } None @@ -185,10 +194,25 @@ impl Timer { /// Take the next item, unless there are no items with /// a timeout in the past relative to `until`. pub fn take_next(&mut self, until: Instant) -> Option { - for i in 0..self.items.len() { - let idx = self.bucket(i); - if !self.items[idx].is_empty() && self.items[idx][0].time <= until { - return Some(self.items[idx].remove(0).item); + fn maybe_take(v: &mut VecDeque>, until: Instant) -> Option { + if !v.is_empty() && v[0].time <= until { + Some(v.pop_front().unwrap().item) + } else { + None + } + } + + let idx = self.bucket(0); + for i in idx..self.items.len() { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + for i in 0..idx { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; } } None @@ -201,7 +225,7 @@ impl Timer { if until >= self.now + self.span() { // Drain everything, so a clean sweep. let mut empty_items = Vec::with_capacity(self.items.len()); - empty_items.resize_with(self.items.len(), Vec::default); + empty_items.resize_with(self.items.len(), VecDeque::default); let mut items = mem::replace(&mut self.items, empty_items); self.now = until; self.cursor = 0; diff --git a/neqo-crypto/src/aead_fuzzing.rs b/neqo-crypto/src/aead_fuzzing.rs index 4e5a6de07f..1f3bfb14bd 100644 --- a/neqo-crypto/src/aead_fuzzing.rs +++ b/neqo-crypto/src/aead_fuzzing.rs @@ -20,6 +20,7 @@ pub struct FuzzingAead { } impl FuzzingAead { + #[allow(clippy::missing_errors_doc)] pub fn new( fuzzing: bool, version: Version, @@ -44,6 +45,7 @@ impl FuzzingAead { } } + #[allow(clippy::missing_errors_doc)] pub fn encrypt<'a>( &self, count: u64, @@ -61,6 +63,7 @@ impl FuzzingAead { Ok(&output[..l + 16]) } + #[allow(clippy::missing_errors_doc)] pub fn decrypt<'a>( &self, count: u64, diff --git a/neqo-crypto/src/agent.rs b/neqo-crypto/src/agent.rs index 82a6dacd48..90085cb759 100644 --- a/neqo-crypto/src/agent.rs +++ b/neqo-crypto/src/agent.rs @@ -670,7 +670,7 @@ impl SecretAgent { let info = self.capture_error(SecretAgentInfo::new(self.fd))?; HandshakeState::Complete(info) }; - qinfo!([self], "state -> {:?}", self.state); + qdebug!([self], "state -> {:?}", self.state); Ok(()) } @@ -898,7 +898,7 @@ impl Client { let len = usize::try_from(len).unwrap(); let mut v = Vec::with_capacity(len); v.extend_from_slice(null_safe_slice(token, len)); - qinfo!( + qdebug!( [format!("{fd:p}")], "Got resumption token {}", hex_snip_middle(&v) diff --git a/neqo-crypto/src/lib.rs b/neqo-crypto/src/lib.rs index 2ec1b4a3ea..45f61f6127 100644 --- a/neqo-crypto/src/lib.rs +++ b/neqo-crypto/src/lib.rs @@ -9,7 +9,7 @@ mod aead; #[cfg(feature = "fuzzing")] -mod aead_fuzzing; +pub mod aead_fuzzing; pub mod agent; mod agentio; mod auth; diff --git a/neqo-http3/src/connection.rs b/neqo-http3/src/connection.rs index 287ea2c2af..cfa78df787 100644 --- a/neqo-http3/src/connection.rs +++ b/neqo-http3/src/connection.rs @@ -354,7 +354,7 @@ impl Http3Connection { /// This function creates and initializes, i.e. send stream type, the control and qpack /// streams. fn initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()> { - qinfo!([self], "Initialize the http3 connection."); + qdebug!([self], "Initialize the http3 connection."); self.control_stream_local.create(conn)?; self.send_settings(); @@ -704,7 +704,7 @@ impl Http3Connection { ); } NewStreamType::Decoder => { - qinfo!([self], "A new remote qpack encoder stream {}", stream_id); + qdebug!([self], "A new remote qpack encoder stream {}", stream_id); self.check_stream_exists(Http3StreamType::Decoder)?; self.recv_streams.insert( stream_id, @@ -715,7 +715,7 @@ impl Http3Connection { ); } NewStreamType::Encoder => { - qinfo!([self], "A new remote qpack decoder stream {}", stream_id); + qdebug!([self], "A new remote qpack decoder stream {}", stream_id); self.check_stream_exists(Http3StreamType::Encoder)?; self.recv_streams.insert( stream_id, @@ -766,7 +766,7 @@ impl Http3Connection { /// This is called when an application closes the connection. pub fn close(&mut self, error: AppError) { - qinfo!([self], "Close connection error {:?}.", error); + qdebug!([self], "Close connection error {:?}.", error); self.state = Http3State::Closing(ConnectionError::Application(error)); if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) { qwarn!("close(0) called when streams still active"); @@ -952,7 +952,7 @@ impl Http3Connection { stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)> { - qinfo!([self], "read_data from stream {}.", stream_id); + qdebug!([self], "read_data from stream {}.", stream_id); let res = self .recv_streams .get_mut(&stream_id) @@ -1091,7 +1091,7 @@ impl Http3Connection { /// This is called when an application wants to close the sending side of a stream. pub fn stream_close_send(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> { - qinfo!([self], "Close the sending side for stream {}.", stream_id); + qdebug!([self], "Close the sending side for stream {}.", stream_id); debug_assert!(self.state.active()); let send_stream = self .send_streams @@ -1402,7 +1402,7 @@ impl Http3Connection { /// `PriorityUpdateRequestPush` which handling is specific to the client and server, we must /// give them to the specific client/server handler. fn handle_control_frame(&mut self, f: HFrame) -> Res> { - qinfo!([self], "Handle a control frame {:?}", f); + qdebug!([self], "Handle a control frame {:?}", f); if !matches!(f, HFrame::Settings { .. }) && !matches!( self.settings_state, @@ -1433,7 +1433,7 @@ impl Http3Connection { } fn handle_settings(&mut self, new_settings: HSettings) -> Res<()> { - qinfo!([self], "Handle SETTINGS frame."); + qdebug!([self], "Handle SETTINGS frame."); match &self.settings_state { Http3RemoteSettingsState::NotReceived => { self.set_qpack_settings(&new_settings)?; diff --git a/neqo-http3/src/connection_client.rs b/neqo-http3/src/connection_client.rs index 52572a760d..836816b337 100644 --- a/neqo-http3/src/connection_client.rs +++ b/neqo-http3/src/connection_client.rs @@ -590,7 +590,7 @@ impl Http3Client { /// /// An error will be return if stream does not exist. pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> { - qinfo!([self], "Close sending side stream={}.", stream_id); + qdebug!([self], "Close sending side stream={}.", stream_id); self.base_handler .stream_close_send(&mut self.conn, stream_id) } @@ -652,7 +652,7 @@ impl Http3Client { stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)> { - qinfo!([self], "read_data from stream {}.", stream_id); + qdebug!([self], "read_data from stream {}.", stream_id); let res = self.base_handler.read_data(&mut self.conn, stream_id, buf); if let Err(e) = &res { if e.connection_error() { diff --git a/neqo-http3/src/connection_server.rs b/neqo-http3/src/connection_server.rs index 097209a226..dcf759f177 100644 --- a/neqo-http3/src/connection_server.rs +++ b/neqo-http3/src/connection_server.rs @@ -98,7 +98,7 @@ impl Http3ServerHandler { /// /// An error will be returned if stream does not exist. pub fn stream_close_send(&mut self, stream_id: StreamId, conn: &mut Connection) -> Res<()> { - qinfo!([self], "Close sending side stream={}.", stream_id); + qdebug!([self], "Close sending side stream={}.", stream_id); self.base_handler.stream_close_send(conn, stream_id)?; self.base_handler.stream_has_pending_data(stream_id); self.needs_processing = true; @@ -408,7 +408,7 @@ impl Http3ServerHandler { stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)> { - qinfo!([self], "read_data from stream {}.", stream_id); + qdebug!([self], "read_data from stream {}.", stream_id); let res = self.base_handler.read_data(conn, stream_id, buf); if let Err(e) = &res { if e.connection_error() { diff --git a/neqo-http3/src/recv_message.rs b/neqo-http3/src/recv_message.rs index be58b7e47c..55970849ef 100644 --- a/neqo-http3/src/recv_message.rs +++ b/neqo-http3/src/recv_message.rs @@ -271,7 +271,7 @@ impl RecvMessage { } (None, false) => break Ok(()), (Some(frame), fin) => { - qinfo!( + qdebug!( [self], "A new frame has been received: {:?}; state={:?} fin={}", frame, diff --git a/neqo-http3/src/send_message.rs b/neqo-http3/src/send_message.rs index c50e3e056a..15965c44f6 100644 --- a/neqo-http3/src/send_message.rs +++ b/neqo-http3/src/send_message.rs @@ -6,7 +6,7 @@ use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc}; -use neqo_common::{qdebug, qinfo, qtrace, Encoder, Header, MessageType}; +use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType}; use neqo_qpack::encoder::QPackEncoder; use neqo_transport::{Connection, StreamId}; @@ -119,7 +119,7 @@ impl SendMessage { encoder: Rc>, conn_events: Box, ) -> Self { - qinfo!("Create a request stream_id={}", stream_id); + qdebug!("Create a request stream_id={}", stream_id); Self { state: MessageState::WaitingForHeaders, message_type, @@ -193,7 +193,7 @@ impl SendStream for SendMessage { min(buf.len(), available - 9) }; - qinfo!( + qdebug!( [self], "send_request_body: available={} to_send={}.", available, diff --git a/neqo-http3/src/server_events.rs b/neqo-http3/src/server_events.rs index a85ece0bfb..214a48c757 100644 --- a/neqo-http3/src/server_events.rs +++ b/neqo-http3/src/server_events.rs @@ -13,7 +13,7 @@ use std::{ rc::Rc, }; -use neqo_common::{qdebug, qinfo, Encoder, Header}; +use neqo_common::{qdebug, Encoder, Header}; use neqo_transport::{ server::ActiveConnectionRef, AppError, Connection, DatagramTracking, StreamId, StreamType, }; @@ -189,7 +189,7 @@ impl Http3OrWebTransportStream { /// /// It may return `InvalidStreamId` if a stream does not exist anymore. pub fn send_data(&mut self, data: &[u8]) -> Res { - qinfo!([self], "Set new response."); + qdebug!([self], "Set new response."); self.stream_handler.send_data(data) } @@ -199,7 +199,7 @@ impl Http3OrWebTransportStream { /// /// It may return `InvalidStreamId` if a stream does not exist anymore. pub fn stream_close_send(&mut self) -> Res<()> { - qinfo!([self], "Set new response."); + qdebug!([self], "Set new response."); self.stream_handler.stream_close_send() } } @@ -270,7 +270,7 @@ impl WebTransportRequest { /// /// It may return `InvalidStreamId` if a stream does not exist anymore. pub fn response(&mut self, accept: &WebTransportSessionAcceptAction) -> Res<()> { - qinfo!([self], "Set a response for a WebTransport session."); + qdebug!([self], "Set a response for a WebTransport session."); self.stream_handler .handler .borrow_mut() diff --git a/neqo-transport/benches/range_tracker.rs b/neqo-transport/benches/range_tracker.rs index c2f78f4874..ee611cf4ea 100644 --- a/neqo-transport/benches/range_tracker.rs +++ b/neqo-transport/benches/range_tracker.rs @@ -11,30 +11,32 @@ const CHUNK: u64 = 1000; const END: u64 = 100_000; fn build_coalesce(len: u64) -> RangeTracker { let mut used = RangeTracker::default(); - used.mark_acked(0, CHUNK as usize); - used.mark_sent(CHUNK, END as usize); + let chunk = usize::try_from(CHUNK).expect("should fit"); + used.mark_acked(0, chunk); + used.mark_sent(CHUNK, usize::try_from(END).expect("should fit")); // leave a gap or it will coalesce here for i in 2..=len { // These do not get immediately coalesced when marking since they're not at the end or start - used.mark_acked(i * CHUNK, CHUNK as usize); + used.mark_acked(i * CHUNK, chunk); } used } fn coalesce(c: &mut Criterion, count: u64) { + let chunk = usize::try_from(CHUNK).expect("should fit"); c.bench_function( &format!("coalesce_acked_from_zero {count}+1 entries"), |b| { b.iter_batched_ref( || build_coalesce(count), |used| { - used.mark_acked(CHUNK, CHUNK as usize); + used.mark_acked(CHUNK, chunk); let tail = (count + 1) * CHUNK; - used.mark_sent(tail, CHUNK as usize); - used.mark_acked(tail, CHUNK as usize); + used.mark_sent(tail, chunk); + used.mark_acked(tail, chunk); }, criterion::BatchSize::SmallInput, - ) + ); }, ); } diff --git a/neqo-transport/benches/rx_stream_orderer.rs b/neqo-transport/benches/rx_stream_orderer.rs index 0a1e763e97..d58e11ee86 100644 --- a/neqo-transport/benches/rx_stream_orderer.rs +++ b/neqo-transport/benches/rx_stream_orderer.rs @@ -11,14 +11,14 @@ fn rx_stream_orderer() { let mut rx = RxStreamOrderer::new(); let data: &[u8] = &[0; 1337]; - for i in 0..100000 { + for i in 0..100_000 { rx.inbound_frame(i * 1337, data); } } fn criterion_benchmark(c: &mut Criterion) { c.bench_function("RxStreamOrderer::inbound_frame()", |b| { - b.iter(rx_stream_orderer) + b.iter(rx_stream_orderer); }); } diff --git a/neqo-transport/benches/transfer.rs b/neqo-transport/benches/transfer.rs index b13075a4ff..32959f6cb5 100644 --- a/neqo-transport/benches/transfer.rs +++ b/neqo-transport/benches/transfer.rs @@ -20,9 +20,10 @@ const ZERO: Duration = Duration::from_millis(0); const JITTER: Duration = Duration::from_millis(10); const TRANSFER_AMOUNT: usize = 1 << 22; // 4Mbyte -fn benchmark_transfer(c: &mut Criterion, label: &str, seed: Option>) { +fn benchmark_transfer(c: &mut Criterion, label: &str, seed: &Option>) { let mut group = c.benchmark_group("transfer"); group.throughput(Throughput::Bytes(u64::try_from(TRANSFER_AMOUNT).unwrap())); + group.noise_threshold(0.03); group.bench_function(label, |b| { b.iter_batched( || { @@ -44,7 +45,7 @@ fn benchmark_transfer(c: &mut Criterion, label: &str, seed: Option CongestionControl for ClassicCongestionControl { let mut is_app_limited = true; let mut new_acked = 0; for pkt in acked_pkts { - qinfo!( + qdebug!( "packet_acked this={:p}, pn={}, ps={}, ignored={}, lost={}, rtt_est={:?}", self, pkt.pn, @@ -198,7 +198,7 @@ impl CongestionControl for ClassicCongestionControl { if is_app_limited { self.cc_algorithm.on_app_limited(); - qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); + qdebug!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); return; } @@ -208,7 +208,7 @@ impl CongestionControl for ClassicCongestionControl { let increase = min(self.ssthresh - self.congestion_window, self.acked_bytes); self.congestion_window += increase; self.acked_bytes -= increase; - qinfo!([self], "slow start += {}", increase); + qdebug!([self], "slow start += {}", increase); if self.congestion_window == self.ssthresh { // This doesn't look like it is necessary, but it can happen // after persistent congestion. @@ -249,7 +249,7 @@ impl CongestionControl for ClassicCongestionControl { QlogMetric::BytesInFlight(self.bytes_in_flight), ], ); - qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); + qdebug!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); } /// Update congestion controller state based on lost packets. @@ -265,7 +265,7 @@ impl CongestionControl for ClassicCongestionControl { } for pkt in lost_packets.iter().filter(|pkt| pkt.cc_in_flight()) { - qinfo!( + qdebug!( "packet_lost this={:p}, pn={}, ps={}", self, pkt.pn, @@ -286,7 +286,7 @@ impl CongestionControl for ClassicCongestionControl { pto, lost_packets, ); - qinfo!( + qdebug!( "on_packets_lost this={:p}, bytes_in_flight={}, cwnd={}, state={:?}", self, self.bytes_in_flight, @@ -335,7 +335,7 @@ impl CongestionControl for ClassicCongestionControl { } self.bytes_in_flight += pkt.size; - qinfo!( + qdebug!( "packet_sent this={:p}, pn={}, ps={}", self, pkt.pn, @@ -498,7 +498,7 @@ impl ClassicCongestionControl { self.congestion_window = max(cwnd, CWND_MIN); self.acked_bytes = acked_bytes; self.ssthresh = self.congestion_window; - qinfo!( + qdebug!( [self], "Cong event -> recovery; cwnd {}, ssthresh {}", self.congestion_window, diff --git a/neqo-transport/src/connection/dump.rs b/neqo-transport/src/connection/dump.rs index 8a4f34dbb8..34ac58f55e 100644 --- a/neqo-transport/src/connection/dump.rs +++ b/neqo-transport/src/connection/dump.rs @@ -38,7 +38,8 @@ pub fn dump_packet( s.push_str(" [broken]..."); break; }; - if let Some(x) = f.dump() { + let x = f.dump(); + if !x.is_empty() { write!(&mut s, "\n {} {}", dir, &x).unwrap(); } } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index c81a3727c6..75c3490cba 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -461,7 +461,7 @@ impl Connection { } /// # Errors - /// When the operation fails. + /// When the operation fails. pub fn client_enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> { self.crypto.client_enable_ech(ech_config_list) } @@ -778,7 +778,7 @@ impl Connection { }); enc.encode(extra); let records = s.send_ticket(now, enc.as_ref())?; - qinfo!([self], "send session ticket {}", hex(&enc)); + qdebug!([self], "send session ticket {}", hex(&enc)); self.crypto.buffer_records(records)?; } else { unreachable!(); @@ -824,7 +824,7 @@ impl Connection { /// the connection to fail. However, if no packets have been /// exchanged, it's not OK. pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) { - qinfo!([self], "Authenticated {:?}", status); + qdebug!([self], "Authenticated {:?}", status); self.crypto.tls.authenticated(status); let res = self.handshake(now, self.version, PacketNumberSpace::Handshake, None); self.absorb_error(now, res); @@ -1154,7 +1154,7 @@ impl Connection { fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) { if self.crypto.discard(space) { - qinfo!([self], "Drop packet number space {}", space); + qdebug!([self], "Drop packet number space {}", space); let primary = self.paths.primary(); self.loss_recovery.discard(&primary, space, now); self.acks.drop_space(space); @@ -1560,24 +1560,8 @@ impl Connection { let mut ack_eliciting = false; let mut probing = true; let mut d = Decoder::from(&packet[..]); - let mut consecutive_padding = 0; while d.remaining() > 0 { - let mut f = Frame::decode(&mut d)?; - - // Skip padding - while f == Frame::Padding && d.remaining() > 0 { - consecutive_padding += 1; - f = Frame::decode(&mut d)?; - } - if consecutive_padding > 0 { - qdebug!( - [self], - "PADDING frame repeated {} times", - consecutive_padding - ); - consecutive_padding = 0; - } - + let f = Frame::decode(&mut d)?; ack_eliciting |= f.ack_eliciting(); probing &= f.path_probing(); let t = f.get_type(); @@ -2323,7 +2307,7 @@ impl Connection { } if encoder.is_empty() { - qinfo!("TX blocked, profile={:?} ", profile); + qdebug!("TX blocked, profile={:?} ", profile); Ok(SendOption::No(profile.paced())) } else { // Perform additional padding for Initial packets as necessary. @@ -2367,7 +2351,7 @@ impl Connection { } fn client_start(&mut self, now: Instant) -> Res<()> { - qinfo!([self], "client_start"); + qdebug!([self], "client_start"); debug_assert_eq!(self.role, Role::Client); qlog::client_connection_started(&mut self.qlog, &self.paths.primary()); qlog::client_version_information_initiated(&mut self.qlog, self.conn_params.get_versions()); @@ -2599,7 +2583,7 @@ impl Connection { fn confirm_version(&mut self, v: Version) { if self.version != v { - qinfo!([self], "Compatible upgrade {:?} ==> {:?}", self.version, v); + qdebug!([self], "Compatible upgrade {:?} ==> {:?}", self.version, v); } self.crypto.confirm_version(v); self.version = v; @@ -2694,9 +2678,8 @@ impl Connection { .input_frame(&frame, &mut self.stats.borrow_mut().frame_rx); } match frame { - Frame::Padding => { - // Note: This counts contiguous padding as a single frame. - self.stats.borrow_mut().frame_rx.padding += 1; + Frame::Padding(length) => { + self.stats.borrow_mut().frame_rx.padding += usize::from(length); } Frame::Ping => { // If we get a PING and there are outstanding CRYPTO frames, @@ -2899,7 +2882,7 @@ impl Connection { R: IntoIterator> + Debug, R::IntoIter: ExactSizeIterator, { - qinfo!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges); + qdebug!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges); let (acked_packets, lost_packets) = self.loss_recovery.on_ack_received( &self.paths.primary(), @@ -2953,7 +2936,7 @@ impl Connection { } fn set_connected(&mut self, now: Instant) -> Res<()> { - qinfo!([self], "TLS connection complete"); + qdebug!([self], "TLS connection complete"); if self.crypto.tls.info().map(SecretAgentInfo::alpn).is_none() { qwarn!([self], "No ALPN. Closing connection."); // 120 = no_application_protocol @@ -2996,7 +2979,7 @@ impl Connection { fn set_state(&mut self, state: State) { if state > self.state { - qinfo!([self], "State change from {:?} -> {:?}", self.state, state); + qdebug!([self], "State change from {:?} -> {:?}", self.state, state); self.state = state.clone(); if self.state.closed() { self.streams.clear_streams(); diff --git a/neqo-transport/src/connection/tests/fuzzing.rs b/neqo-transport/src/connection/tests/fuzzing.rs index 9924c06fa4..b12100f8ad 100644 --- a/neqo-transport/src/connection/tests/fuzzing.rs +++ b/neqo-transport/src/connection/tests/fuzzing.rs @@ -6,7 +6,7 @@ #![cfg(feature = "fuzzing")] -use neqo_crypto::FIXED_TAG_FUZZING; +use neqo_crypto::aead_fuzzing::FIXED_TAG_FUZZING; use test_fixture::now; use super::{connect_force_idle, default_client, default_server}; diff --git a/neqo-transport/src/connection/tests/handshake.rs b/neqo-transport/src/connection/tests/handshake.rs index af0352ce90..4b2a18642f 100644 --- a/neqo-transport/src/connection/tests/handshake.rs +++ b/neqo-transport/src/connection/tests/handshake.rs @@ -16,9 +16,10 @@ use neqo_common::{event::Provider, qdebug, Datagram}; use neqo_crypto::{ constants::TLS_CHACHA20_POLY1305_SHA256, generate_ech_keys, AuthenticationStatus, }; +#[cfg(not(feature = "fuzzing"))] +use test_fixture::datagram; use test_fixture::{ - assertions, assertions::assert_coalesced_0rtt, datagram, fixture_init, now, split_datagram, - DEFAULT_ADDR, + assertions, assertions::assert_coalesced_0rtt, fixture_init, now, split_datagram, DEFAULT_ADDR, }; use super::{ @@ -458,7 +459,7 @@ fn coalesce_05rtt() { assert_eq!(client.stats().dropped_rx, 0); // No Initial padding. assert_eq!(client.stats().packets_rx, 4); assert_eq!(client.stats().saved_datagrams, 1); - assert_eq!(client.stats().frame_rx.padding, 1); // Padding uses frames. + assert!(client.stats().frame_rx.padding > 0); // Padding uses frames. // Allow the handshake to complete. now += RTT / 2; diff --git a/neqo-transport/src/crypto.rs b/neqo-transport/src/crypto.rs index 9840eaa1e1..acc02172d5 100644 --- a/neqo-transport/src/crypto.rs +++ b/neqo-transport/src/crypto.rs @@ -317,7 +317,7 @@ impl Crypto { } pub fn acked(&mut self, token: &CryptoRecoveryToken) { - qinfo!( + qdebug!( "Acked crypto frame space={} offset={} length={}", token.space, token.offset, @@ -367,7 +367,7 @@ impl Crypto { }); enc.encode_vvec(new_token.unwrap_or(&[])); enc.encode(t.as_ref()); - qinfo!("resumption token {}", hex_snip_middle(enc.as_ref())); + qdebug!("resumption token {}", hex_snip_middle(enc.as_ref())); Some(ResumptionToken::new(enc.into(), t.expiration_time())) } else { None @@ -433,7 +433,7 @@ impl CryptoDxState { cipher: Cipher, fuzzing: bool, ) -> Self { - qinfo!( + qdebug!( "Making {:?} {} CryptoDxState, v={:?} cipher={}", direction, epoch, @@ -980,7 +980,7 @@ impl CryptoStates { }; for v in versions { - qinfo!( + qdebug!( [self], "Creating initial cipher state v={:?}, role={:?} dcid={}", v, diff --git a/neqo-transport/src/frame.rs b/neqo-transport/src/frame.rs index b3bb024a2c..5a86a07108 100644 --- a/neqo-transport/src/frame.rs +++ b/neqo-transport/src/frame.rs @@ -20,7 +20,7 @@ use crate::{ #[allow(clippy::module_name_repetitions)] pub type FrameType = u64; -const FRAME_TYPE_PADDING: FrameType = 0x0; +pub const FRAME_TYPE_PADDING: FrameType = 0x0; pub const FRAME_TYPE_PING: FrameType = 0x1; pub const FRAME_TYPE_ACK: FrameType = 0x2; const FRAME_TYPE_ACK_ECN: FrameType = 0x3; @@ -103,7 +103,7 @@ pub struct AckRange { #[derive(PartialEq, Eq, Debug, Clone)] pub enum Frame<'a> { - Padding, + Padding(u16), Ping, Ack { largest_acknowledged: u64, @@ -215,7 +215,7 @@ impl<'a> Frame<'a> { pub fn get_type(&self) -> FrameType { match self { - Self::Padding => FRAME_TYPE_PADDING, + Self::Padding { .. } => FRAME_TYPE_PADDING, Self::Ping => FRAME_TYPE_PING, Self::Ack { .. } => FRAME_TYPE_ACK, // We don't do ACK ECN. Self::ResetStream { .. } => FRAME_TYPE_RESET_STREAM, @@ -288,7 +288,7 @@ impl<'a> Frame<'a> { pub fn ack_eliciting(&self) -> bool { !matches!( self, - Self::Ack { .. } | Self::Padding | Self::ConnectionClose { .. } + Self::Ack { .. } | Self::Padding { .. } | Self::ConnectionClose { .. } ) } @@ -297,7 +297,7 @@ impl<'a> Frame<'a> { pub fn path_probing(&self) -> bool { matches!( self, - Self::Padding + Self::Padding { .. } | Self::NewConnectionId { .. } | Self::PathChallenge { .. } | Self::PathResponse { .. } @@ -347,36 +347,34 @@ impl<'a> Frame<'a> { Ok(acked_ranges) } - pub fn dump(&self) -> Option { + pub fn dump(&self) -> String { match self { - Self::Crypto { offset, data } => Some(format!( - "Crypto {{ offset: {}, len: {} }}", - offset, - data.len() - )), + Self::Crypto { offset, data } => { + format!("Crypto {{ offset: {}, len: {} }}", offset, data.len()) + } Self::Stream { stream_id, offset, fill, data, fin, - } => Some(format!( + } => format!( "Stream {{ stream_id: {}, offset: {}, len: {}{}, fin: {} }}", stream_id.as_u64(), offset, if *fill { ">>" } else { "" }, data.len(), fin, - )), - Self::Padding => None, - Self::Datagram { data, .. } => Some(format!("Datagram {{ len: {} }}", data.len())), - _ => Some(format!("{self:?}")), + ), + Self::Padding(length) => format!("Padding {{ len: {length} }}"), + Self::Datagram { data, .. } => format!("Datagram {{ len: {} }}", data.len()), + _ => format!("{self:?}"), } } pub fn is_allowed(&self, pt: PacketType) -> bool { match self { - Self::Padding | Self::Ping => true, + Self::Padding { .. } | Self::Ping => true, Self::Crypto { .. } | Self::Ack { .. } | Self::ConnectionClose { @@ -409,13 +407,23 @@ impl<'a> Frame<'a> { } // TODO(ekr@rtfm.com): check for minimal encoding - let t = d(dec.decode_varint())?; + let t = dv(dec)?; match t { - FRAME_TYPE_PADDING => Ok(Self::Padding), + FRAME_TYPE_PADDING => { + let mut length: u16 = 1; + while let Some(b) = dec.peek_byte() { + if u64::from(b) != FRAME_TYPE_PADDING { + break; + } + length += 1; + dec.skip(1); + } + Ok(Self::Padding(length)) + } FRAME_TYPE_PING => Ok(Self::Ping), FRAME_TYPE_RESET_STREAM => Ok(Self::ResetStream { stream_id: StreamId::from(dv(dec)?), - application_error_code: d(dec.decode_varint())?, + application_error_code: dv(dec)?, final_size: match dec.decode_varint() { Some(v) => v, _ => return Err(Error::NoMoreData), @@ -457,7 +465,7 @@ impl<'a> Frame<'a> { } FRAME_TYPE_STOP_SENDING => Ok(Self::StopSending { stream_id: StreamId::from(dv(dec)?), - application_error_code: d(dec.decode_varint())?, + application_error_code: dv(dec)?, }), FRAME_TYPE_CRYPTO => { let offset = dv(dec)?; @@ -563,7 +571,7 @@ impl<'a> Frame<'a> { Ok(Self::PathResponse { data: datav }) } FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT | FRAME_TYPE_CONNECTION_CLOSE_APPLICATION => { - let error_code = CloseError::from_type_bit(t, d(dec.decode_varint())?); + let error_code = CloseError::from_type_bit(t, dv(dec)?); let frame_type = if t == FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT { dv(dec)? } else { @@ -631,8 +639,10 @@ mod tests { #[test] fn padding() { - let f = Frame::Padding; + let f = Frame::Padding(1); just_dec(&f, "00"); + let f = Frame::Padding(2); + just_dec(&f, "0000"); } #[test] @@ -888,8 +898,8 @@ mod tests { #[test] fn test_compare() { - let f1 = Frame::Padding; - let f2 = Frame::Padding; + let f1 = Frame::Padding(1); + let f2 = Frame::Padding(1); let f3 = Frame::Crypto { offset: 0, data: &[1, 2, 3], diff --git a/neqo-transport/src/lib.rs b/neqo-transport/src/lib.rs index be482c466f..8fabbeb9a3 100644 --- a/neqo-transport/src/lib.rs +++ b/neqo-transport/src/lib.rs @@ -6,7 +6,7 @@ #![allow(clippy::module_name_repetitions)] // This lint doesn't work here. -use neqo_common::qinfo; +use neqo_common::qwarn; use neqo_crypto::Error as CryptoError; mod ackrate; @@ -165,7 +165,7 @@ impl Error { impl From for Error { fn from(err: CryptoError) -> Self { - qinfo!("Crypto operation failed {:?}", err); + qwarn!("Crypto operation failed {:?}", err); match err { CryptoError::EchRetry(config) => Self::EchRetry(config), _ => Self::CryptoError(err), diff --git a/neqo-transport/src/packet/mod.rs b/neqo-transport/src/packet/mod.rs index 8458f69779..d11b3423a4 100644 --- a/neqo-transport/src/packet/mod.rs +++ b/neqo-transport/src/packet/mod.rs @@ -18,6 +18,7 @@ use neqo_crypto::random; use crate::{ cid::{ConnectionId, ConnectionIdDecoder, ConnectionIdRef, MAX_CONNECTION_ID_LEN}, crypto::{CryptoDxState, CryptoSpace, CryptoStates}, + frame::FRAME_TYPE_PADDING, version::{Version, WireVersion}, Error, Res, }; @@ -257,7 +258,8 @@ impl PacketBuilder { /// Returns true if padding was added. pub fn pad(&mut self) -> bool { if self.padding && !self.is_long() { - self.encoder.pad_to(self.limit, 0); + self.encoder + .pad_to(self.limit, FRAME_TYPE_PADDING.try_into().unwrap()); true } else { false diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 4e8d9958ab..50e458ff36 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -216,7 +216,7 @@ impl Paths { /// to a migration from a peer, in which case the old path needs to be probed. #[must_use] fn select_primary(&mut self, path: &PathRef) -> Option { - qinfo!([path.borrow()], "set as primary path"); + qdebug!([path.borrow()], "set as primary path"); let old_path = self.primary.replace(Rc::clone(path)).map(|old| { old.borrow_mut().set_primary(false); old diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index 2572966104..a8ad986d2a 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -195,7 +195,7 @@ pub fn packet_sent( ) { qlog.add_event_with_stream(|stream| { let mut d = Decoder::from(body); - let header = PacketHeader::with_type(to_qlog_pkt_type(pt), Some(pn), None, None, None); + let header = PacketHeader::with_type(pt.into(), Some(pn), None, None, None); let raw = RawInfo { length: Some(plen as u64), payload_length: None, @@ -205,7 +205,7 @@ pub fn packet_sent( let mut frames = SmallVec::new(); while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(frame_to_qlogframe(&f)); + frames.push(QuicFrame::from(&f)); } else { qinfo!("qlog: invalid frame"); break; @@ -231,13 +231,8 @@ pub fn packet_sent( pub fn packet_dropped(qlog: &mut NeqoQlog, public_packet: &PublicPacket) { qlog.add_event_data(|| { - let header = PacketHeader::with_type( - to_qlog_pkt_type(public_packet.packet_type()), - None, - None, - None, - None, - ); + let header = + PacketHeader::with_type(public_packet.packet_type().into(), None, None, None, None); let raw = RawInfo { length: Some(public_packet.len() as u64), payload_length: None, @@ -259,8 +254,7 @@ pub fn packet_dropped(qlog: &mut NeqoQlog, public_packet: &PublicPacket) { pub fn packets_lost(qlog: &mut NeqoQlog, pkts: &[SentPacket]) { qlog.add_event_with_stream(|stream| { for pkt in pkts { - let header = - PacketHeader::with_type(to_qlog_pkt_type(pkt.pt), Some(pkt.pn), None, None, None); + let header = PacketHeader::with_type(pkt.pt.into(), Some(pkt.pn), None, None, None); let ev_data = EventData::PacketLost(PacketLost { header: Some(header), @@ -283,7 +277,7 @@ pub fn packet_received( let mut d = Decoder::from(&payload[..]); let header = PacketHeader::with_type( - to_qlog_pkt_type(public_packet.packet_type()), + public_packet.packet_type().into(), Some(payload.pn()), None, None, @@ -299,7 +293,7 @@ pub fn packet_received( while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(frame_to_qlogframe(&f)); + frames.push(QuicFrame::from(&f)); } else { qinfo!("qlog: invalid frame"); break; @@ -393,173 +387,180 @@ pub fn metrics_updated(qlog: &mut NeqoQlog, updated_metrics: &[QlogMetric]) { #[allow(clippy::too_many_lines)] // Yeah, but it's a nice match. #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)] // No choice here. -fn frame_to_qlogframe(frame: &Frame) -> QuicFrame { - match frame { - Frame::Padding => QuicFrame::Padding, - Frame::Ping => QuicFrame::Ping, - Frame::Ack { - largest_acknowledged, - ack_delay, - first_ack_range, - ack_ranges, - } => { - let ranges = - Frame::decode_ack_frame(*largest_acknowledged, *first_ack_range, ack_ranges).ok(); - - let acked_ranges = ranges.map(|all| { - AckedRanges::Double( - all.into_iter() - .map(RangeInclusive::into_inner) - .collect::>(), - ) - }); - - QuicFrame::Ack { - ack_delay: Some(*ack_delay as f32 / 1000.0), - acked_ranges, - ect1: None, - ect0: None, - ce: None, +impl From<&Frame<'_>> for QuicFrame { + fn from(frame: &Frame) -> Self { + match frame { + // TODO: Add payload length to `QuicFrame::Padding` once + // https://github.com/cloudflare/quiche/pull/1745 is available via the qlog crate. + Frame::Padding { .. } => QuicFrame::Padding, + Frame::Ping => QuicFrame::Ping, + Frame::Ack { + largest_acknowledged, + ack_delay, + first_ack_range, + ack_ranges, + } => { + let ranges = + Frame::decode_ack_frame(*largest_acknowledged, *first_ack_range, ack_ranges) + .ok(); + + let acked_ranges = ranges.map(|all| { + AckedRanges::Double( + all.into_iter() + .map(RangeInclusive::into_inner) + .collect::>(), + ) + }); + + QuicFrame::Ack { + ack_delay: Some(*ack_delay as f32 / 1000.0), + acked_ranges, + ect1: None, + ect0: None, + ce: None, + } } - } - Frame::ResetStream { - stream_id, - application_error_code, - final_size, - } => QuicFrame::ResetStream { - stream_id: stream_id.as_u64(), - error_code: *application_error_code, - final_size: *final_size, - }, - Frame::StopSending { - stream_id, - application_error_code, - } => QuicFrame::StopSending { - stream_id: stream_id.as_u64(), - error_code: *application_error_code, - }, - Frame::Crypto { offset, data } => QuicFrame::Crypto { - offset: *offset, - length: data.len() as u64, - }, - Frame::NewToken { token } => QuicFrame::NewToken { - token: qlog::Token { - ty: Some(qlog::TokenType::Retry), - details: None, - raw: Some(RawInfo { - data: Some(hex(token)), - length: Some(token.len() as u64), - payload_length: None, - }), + Frame::ResetStream { + stream_id, + application_error_code, + final_size, + } => QuicFrame::ResetStream { + stream_id: stream_id.as_u64(), + error_code: *application_error_code, + final_size: *final_size, + }, + Frame::StopSending { + stream_id, + application_error_code, + } => QuicFrame::StopSending { + stream_id: stream_id.as_u64(), + error_code: *application_error_code, + }, + Frame::Crypto { offset, data } => QuicFrame::Crypto { + offset: *offset, + length: data.len() as u64, + }, + Frame::NewToken { token } => QuicFrame::NewToken { + token: qlog::Token { + ty: Some(qlog::TokenType::Retry), + details: None, + raw: Some(RawInfo { + data: Some(hex(token)), + length: Some(token.len() as u64), + payload_length: None, + }), + }, }, - }, - Frame::Stream { - fin, - stream_id, - offset, - data, - .. - } => QuicFrame::Stream { - stream_id: stream_id.as_u64(), - offset: *offset, - length: data.len() as u64, - fin: Some(*fin), - raw: None, - }, - Frame::MaxData { maximum_data } => QuicFrame::MaxData { - maximum: *maximum_data, - }, - Frame::MaxStreamData { - stream_id, - maximum_stream_data, - } => QuicFrame::MaxStreamData { - stream_id: stream_id.as_u64(), - maximum: *maximum_stream_data, - }, - Frame::MaxStreams { - stream_type, - maximum_streams, - } => QuicFrame::MaxStreams { - stream_type: match stream_type { - NeqoStreamType::BiDi => StreamType::Bidirectional, - NeqoStreamType::UniDi => StreamType::Unidirectional, + Frame::Stream { + fin, + stream_id, + offset, + data, + .. + } => QuicFrame::Stream { + stream_id: stream_id.as_u64(), + offset: *offset, + length: data.len() as u64, + fin: Some(*fin), + raw: None, }, - maximum: *maximum_streams, - }, - Frame::DataBlocked { data_limit } => QuicFrame::DataBlocked { limit: *data_limit }, - Frame::StreamDataBlocked { - stream_id, - stream_data_limit, - } => QuicFrame::StreamDataBlocked { - stream_id: stream_id.as_u64(), - limit: *stream_data_limit, - }, - Frame::StreamsBlocked { - stream_type, - stream_limit, - } => QuicFrame::StreamsBlocked { - stream_type: match stream_type { - NeqoStreamType::BiDi => StreamType::Bidirectional, - NeqoStreamType::UniDi => StreamType::Unidirectional, + Frame::MaxData { maximum_data } => QuicFrame::MaxData { + maximum: *maximum_data, }, - limit: *stream_limit, - }, - Frame::NewConnectionId { - sequence_number, - retire_prior, - connection_id, - stateless_reset_token, - } => QuicFrame::NewConnectionId { - sequence_number: *sequence_number as u32, - retire_prior_to: *retire_prior as u32, - connection_id_length: Some(connection_id.len() as u8), - connection_id: hex(connection_id), - stateless_reset_token: Some(hex(stateless_reset_token)), - }, - Frame::RetireConnectionId { sequence_number } => QuicFrame::RetireConnectionId { - sequence_number: *sequence_number as u32, - }, - Frame::PathChallenge { data } => QuicFrame::PathChallenge { - data: Some(hex(data)), - }, - Frame::PathResponse { data } => QuicFrame::PathResponse { - data: Some(hex(data)), - }, - Frame::ConnectionClose { - error_code, - frame_type, - reason_phrase, - } => QuicFrame::ConnectionClose { - error_space: match error_code { - CloseError::Transport(_) => Some(ErrorSpace::TransportError), - CloseError::Application(_) => Some(ErrorSpace::ApplicationError), + Frame::MaxStreamData { + stream_id, + maximum_stream_data, + } => QuicFrame::MaxStreamData { + stream_id: stream_id.as_u64(), + maximum: *maximum_stream_data, }, - error_code: Some(error_code.code()), - error_code_value: Some(0), - reason: Some(String::from_utf8_lossy(reason_phrase).to_string()), - trigger_frame_type: Some(*frame_type), - }, - Frame::HandshakeDone => QuicFrame::HandshakeDone, - Frame::AckFrequency { .. } => QuicFrame::Unknown { - frame_type_value: None, - raw_frame_type: frame.get_type(), - raw: None, - }, - Frame::Datagram { data, .. } => QuicFrame::Datagram { - length: data.len() as u64, - raw: None, - }, + Frame::MaxStreams { + stream_type, + maximum_streams, + } => QuicFrame::MaxStreams { + stream_type: match stream_type { + NeqoStreamType::BiDi => StreamType::Bidirectional, + NeqoStreamType::UniDi => StreamType::Unidirectional, + }, + maximum: *maximum_streams, + }, + Frame::DataBlocked { data_limit } => QuicFrame::DataBlocked { limit: *data_limit }, + Frame::StreamDataBlocked { + stream_id, + stream_data_limit, + } => QuicFrame::StreamDataBlocked { + stream_id: stream_id.as_u64(), + limit: *stream_data_limit, + }, + Frame::StreamsBlocked { + stream_type, + stream_limit, + } => QuicFrame::StreamsBlocked { + stream_type: match stream_type { + NeqoStreamType::BiDi => StreamType::Bidirectional, + NeqoStreamType::UniDi => StreamType::Unidirectional, + }, + limit: *stream_limit, + }, + Frame::NewConnectionId { + sequence_number, + retire_prior, + connection_id, + stateless_reset_token, + } => QuicFrame::NewConnectionId { + sequence_number: *sequence_number as u32, + retire_prior_to: *retire_prior as u32, + connection_id_length: Some(connection_id.len() as u8), + connection_id: hex(connection_id), + stateless_reset_token: Some(hex(stateless_reset_token)), + }, + Frame::RetireConnectionId { sequence_number } => QuicFrame::RetireConnectionId { + sequence_number: *sequence_number as u32, + }, + Frame::PathChallenge { data } => QuicFrame::PathChallenge { + data: Some(hex(data)), + }, + Frame::PathResponse { data } => QuicFrame::PathResponse { + data: Some(hex(data)), + }, + Frame::ConnectionClose { + error_code, + frame_type, + reason_phrase, + } => QuicFrame::ConnectionClose { + error_space: match error_code { + CloseError::Transport(_) => Some(ErrorSpace::TransportError), + CloseError::Application(_) => Some(ErrorSpace::ApplicationError), + }, + error_code: Some(error_code.code()), + error_code_value: Some(0), + reason: Some(String::from_utf8_lossy(reason_phrase).to_string()), + trigger_frame_type: Some(*frame_type), + }, + Frame::HandshakeDone => QuicFrame::HandshakeDone, + Frame::AckFrequency { .. } => QuicFrame::Unknown { + frame_type_value: None, + raw_frame_type: frame.get_type(), + raw: None, + }, + Frame::Datagram { data, .. } => QuicFrame::Datagram { + length: data.len() as u64, + raw: None, + }, + } } } -fn to_qlog_pkt_type(ptype: PacketType) -> qlog::events::quic::PacketType { - match ptype { - PacketType::Initial => qlog::events::quic::PacketType::Initial, - PacketType::Handshake => qlog::events::quic::PacketType::Handshake, - PacketType::ZeroRtt => qlog::events::quic::PacketType::ZeroRtt, - PacketType::Short => qlog::events::quic::PacketType::OneRtt, - PacketType::Retry => qlog::events::quic::PacketType::Retry, - PacketType::VersionNegotiation => qlog::events::quic::PacketType::VersionNegotiation, - PacketType::OtherVersion => qlog::events::quic::PacketType::Unknown, +impl From for qlog::events::quic::PacketType { + fn from(value: PacketType) -> Self { + match value { + PacketType::Initial => qlog::events::quic::PacketType::Initial, + PacketType::Handshake => qlog::events::quic::PacketType::Handshake, + PacketType::ZeroRtt => qlog::events::quic::PacketType::ZeroRtt, + PacketType::Short => qlog::events::quic::PacketType::OneRtt, + PacketType::Retry => qlog::events::quic::PacketType::Retry, + PacketType::VersionNegotiation => qlog::events::quic::PacketType::VersionNegotiation, + PacketType::OtherVersion => qlog::events::quic::PacketType::Unknown, + } } } diff --git a/neqo-transport/src/stats.rs b/neqo-transport/src/stats.rs index 9eff503dcf..0a61097010 100644 --- a/neqo-transport/src/stats.rs +++ b/neqo-transport/src/stats.rs @@ -14,7 +14,7 @@ use std::{ time::Duration, }; -use neqo_common::qinfo; +use neqo_common::qwarn; use crate::packet::PacketNumber; @@ -168,7 +168,7 @@ impl Stats { pub fn pkt_dropped(&mut self, reason: impl AsRef) { self.dropped_rx += 1; - qinfo!( + qwarn!( [self.info], "Dropped received packet: {}; Total: {}", reason.as_ref(), @@ -206,7 +206,7 @@ impl Debug for Stats { " tx: {} lost {} lateack {} ptoack {}", self.packets_tx, self.lost, self.late_ack, self.pto_ack )?; - writeln!(f, " resumed: {} ", self.resumed)?; + writeln!(f, " resumed: {}", self.resumed)?; writeln!(f, " frames rx:")?; self.frame_rx.fmt(f)?; writeln!(f, " frames tx:")?; diff --git a/test/upload_test.sh b/test/upload_test.sh index 685a6a926c..8edb55e75d 100755 --- a/test/upload_test.sh +++ b/test/upload_test.sh @@ -2,6 +2,8 @@ set -e +export RUST_LOG=neqo_transport::cc=debug + server_address=127.0.0.1 server_port=4433 upload_size=8388608