diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 145f7ac386..e776acafdf 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -24,7 +24,7 @@ workspace = true [dependencies] # neqo-bin is not used in Firefox, so we can be liberal with dependency versions -clap = { version = "4.4", default-features = false, features = ["std", "color", "help", "usage", "error-context", "suggestions", "derive"] } +clap = { version = "4.4", default-features = false, features = ["std", "help", "usage", "error-context", "suggestions", "derive"] } clap-verbosity-flag = { version = "2.2", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/neqo-bin/src/lib.rs b/neqo-bin/src/lib.rs index 9b5de9e0c2..09ca9afa09 100644 --- a/neqo-bin/src/lib.rs +++ b/neqo-bin/src/lib.rs @@ -22,7 +22,7 @@ use neqo_transport::{ pub mod client; pub mod server; -mod udp; +pub mod udp; #[derive(Debug, Parser)] pub struct SharedArgs { diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 64b1e1be19..22f0281359 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -4,16 +4,14 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{ - cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant, -}; +use std::{cell::RefCell, collections::HashMap, fmt::Display, rc::Rc, time::Instant}; use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram}; -use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher}; +use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay}; use neqo_http3::Error; use neqo_transport::{ server::{ActiveConnectionRef, Server, ValidateAddress}, - ConnectionEvent, ConnectionIdGenerator, ConnectionParameters, Output, State, StreamId, + ConnectionEvent, ConnectionIdGenerator, Output, State, StreamId, }; use regex::Regex; @@ -29,29 +27,44 @@ pub struct HttpServer { server: Server, write_state: HashMap, read_state: HashMap>, + is_qns_test: bool, } impl HttpServer { pub fn new( - now: Instant, - certs: &[impl AsRef], - protocols: &[impl AsRef], + args: &Args, anti_replay: AntiReplay, cid_manager: Rc>, - conn_params: ConnectionParameters, ) -> Result { + let mut server = Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + Box::new(AllowZeroRtt {}), + cid_manager, + args.shared.quic_parameters.get(&args.shared.alpn), + )?; + + server.set_ciphers(&args.get_ciphers()); + server.set_qlog_dir(args.shared.qlog_dir.clone()); + if args.retry { + server.set_validation(ValidateAddress::Always); + } + if args.ech { + let (sk, pk) = generate_ech_keys().expect("generate ECH keys"); + server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .expect("enable ECH"); + let cfg = server.ech_config(); + qinfo!("ECHConfigList: {}", hex(cfg)); + } + Ok(Self { - server: Server::new( - now, - certs, - protocols, - anti_replay, - Box::new(AllowZeroRtt {}), - cid_manager, - conn_params, - )?, + server, write_state: HashMap::new(), read_state: HashMap::new(), + is_qns_test: args.shared.qns_test.is_some(), }) } @@ -100,12 +113,7 @@ impl HttpServer { } } - fn stream_readable( - &mut self, - stream_id: StreamId, - conn: &mut ActiveConnectionRef, - args: &Args, - ) { + fn stream_readable(&mut self, stream_id: StreamId, conn: &mut ActiveConnectionRef) { if !stream_id.is_client_initiated() || !stream_id.is_bidi() { qdebug!("Stream {} not client-initiated bidi, ignoring", stream_id); return; @@ -136,7 +144,7 @@ impl HttpServer { return; }; - let re = if args.shared.qns_test.is_some() { + let re = if self.is_qns_test { Regex::new(r"GET +/(\S+)(?:\r)?\n").unwrap() } else { Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap() @@ -150,7 +158,7 @@ impl HttpServer { let resp = { let path = path.as_str(); qdebug!("Path = '{path}'"); - if args.shared.qns_test.is_some() { + if self.is_qns_test { match qns_read_response(path) { Ok(data) => Some(data), Err(e) => { @@ -199,7 +207,7 @@ impl super::HttpServer for HttpServer { self.server.process(dgram, now) } - fn process_events(&mut self, args: &Args, now: Instant) { + fn process_events(&mut self, now: Instant) { let active_conns = self.server.active_connections(); for mut acr in active_conns { loop { @@ -213,7 +221,7 @@ impl super::HttpServer for HttpServer { .insert(stream_id, HttpStreamState::default()); } ConnectionEvent::RecvStreamReadable { stream_id } => { - self.stream_readable(stream_id, &mut acr, args); + self.stream_readable(stream_id, &mut acr); } ConnectionEvent::SendStreamWritable { stream_id } => { self.stream_writable(stream_id, &mut acr); @@ -233,26 +241,6 @@ impl super::HttpServer for HttpServer { } } - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("generate ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .expect("enable ECH"); - self.server.ech_config() - } - fn has_events(&self) -> bool { self.server.has_active_connections() } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs index 40a733ffb5..c309f19a29 100644 --- a/neqo-bin/src/server/http3.rs +++ b/neqo-bin/src/server/http3.rs @@ -10,13 +10,12 @@ use std::{ cmp::min, collections::HashMap, fmt::{self, Display}, - path::PathBuf, rc::Rc, time::Instant, }; -use neqo_common::{qdebug, qerror, qwarn, Datagram, Header}; -use neqo_crypto::{generate_ech_keys, random, AntiReplay, Cipher}; +use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; +use neqo_crypto::{generate_ech_keys, random, AntiReplay}; use neqo_http3::{ Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, }; @@ -29,6 +28,7 @@ pub struct HttpServer { /// Progress writing to each stream. remaining_data: HashMap, posts: HashMap, + is_qns_test: bool, } impl HttpServer { @@ -39,7 +39,7 @@ impl HttpServer { anti_replay: AntiReplay, cid_mgr: Rc>, ) -> Self { - let server = Http3Server::new( + let mut server = Http3Server::new( args.now(), &[args.key.clone()], &[args.shared.alpn.clone()], @@ -53,10 +53,25 @@ impl HttpServer { None, ) .expect("We cannot make a server!"); + + server.set_ciphers(&args.get_ciphers()); + server.set_qlog_dir(args.shared.qlog_dir.clone()); + if args.retry { + server.set_validation(ValidateAddress::Always); + } + if args.ech { + let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); + server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .unwrap(); + let cfg = server.ech_config(); + qinfo!("ECHConfigList: {}", hex(cfg)); + } Self { server, remaining_data: HashMap::new(), posts: HashMap::new(), + is_qns_test: args.shared.qns_test.is_some(), } } } @@ -72,7 +87,7 @@ impl super::HttpServer for HttpServer { self.server.process(dgram, now) } - fn process_events(&mut self, args: &Args, _now: Instant) { + fn process_events(&mut self, _now: Instant) { while let Some(event) = self.server.next_event() { match event { Http3ServerEvent::Headers { @@ -97,7 +112,7 @@ impl super::HttpServer for HttpServer { continue; }; - let mut response = if args.shared.qns_test.is_some() { + let mut response = if self.is_qns_test { match qns_read_response(path.value()) { Ok(data) => ResponseData::from(data), Err(e) => { @@ -166,26 +181,6 @@ impl super::HttpServer for HttpServer { } } - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .unwrap(); - self.server.ech_config() - } - fn has_events(&self) -> bool { self.server.has_events() } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index bc874e413d..9d35328ea1 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -21,12 +21,12 @@ use futures::{ future::{select, select_all, Either}, FutureExt, }; -use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; +use neqo_common::{qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init_db, AntiReplay, Cipher, }; -use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; +use neqo_transport::{Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; @@ -182,81 +182,34 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { fs::read(path) } -trait HttpServer: Display { +#[allow(clippy::module_name_repetitions)] +pub trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; - fn process_events(&mut self, args: &Args, now: Instant); + fn process_events(&mut self, now: Instant); fn has_events(&self) -> bool; - fn set_qlog_dir(&mut self, dir: Option); - fn set_ciphers(&mut self, ciphers: &[Cipher]); - fn validate_address(&mut self, when: ValidateAddress); - fn enable_ech(&mut self) -> &[u8]; } -struct ServersRunner { - args: Args, +#[allow(clippy::module_name_repetitions)] +pub struct ServerRunner { + now: Box Instant>, server: Box, timeout: Option>>, sockets: Vec<(SocketAddr, udp::Socket)>, } -impl ServersRunner { - pub fn new(args: Args) -> Result { - let hosts = args.listen_addresses(); - if hosts.is_empty() { - qerror!("No valid hosts defined"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts")); - } - let sockets = hosts - .into_iter() - .map(|host| { - let socket = udp::Socket::bind(host)?; - let local_addr = socket.local_addr()?; - qinfo!("Server waiting for connection on: {local_addr:?}"); - - Ok((host, socket)) - }) - .collect::>()?; - let server = Self::create_server(&args); - - Ok(Self { - args, +impl ServerRunner { + #[must_use] + pub fn new( + now: Box Instant>, + server: Box, + sockets: Vec<(SocketAddr, udp::Socket)>, + ) -> Self { + Self { + now, server, timeout: None, sockets, - }) - } - - fn create_server(args: &Args) -> Box { - // Note: this is the exception to the case where we use `Args::now`. - let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) - .expect("unable to setup anti-replay"); - let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); - - let mut svr: Box = if args.shared.use_old_http { - Box::new( - http09::HttpServer::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - args.shared.quic_parameters.get(&args.shared.alpn), - ) - .expect("We cannot make a server!"), - ) - } else { - Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr)) - }; - svr.set_ciphers(&args.get_ciphers()); - svr.set_qlog_dir(args.shared.qlog_dir.clone()); - if args.retry { - svr.validate_address(ValidateAddress::Always); } - if args.ech { - let cfg = svr.enable_ech(); - qinfo!("ECHConfigList: {}", hex(cfg)); - } - svr } /// Tries to find a socket, but then just falls back to sending from the first. @@ -275,7 +228,7 @@ impl ServersRunner { async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { loop { - match self.server.process(dgram.take(), self.args.now()) { + match self.server.process(dgram.take(), (self.now)()) { Output::Datagram(dgram) => { let socket = self.find_socket(dgram.source()); socket.writable().await?; @@ -313,9 +266,9 @@ impl ServersRunner { select(sockets_ready, timeout_ready).await.factor_first().0 } - async fn run(&mut self) -> Res<()> { + pub async fn run(mut self) -> Res<()> { loop { - self.server.process_events(&self.args, self.args.now()); + self.server.process_events((self.now)()); self.process(None).await?; @@ -402,6 +355,36 @@ pub async fn server(mut args: Args) -> Res<()> { } } - let mut servers_runner = ServersRunner::new(args)?; - servers_runner.run().await + let hosts = args.listen_addresses(); + if hosts.is_empty() { + qerror!("No valid hosts defined"); + Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts"))?; + } + let sockets = hosts + .into_iter() + .map(|host| { + let socket = udp::Socket::bind(host)?; + let local_addr = socket.local_addr()?; + qinfo!("Server waiting for connection on: {local_addr:?}"); + + Ok((host, socket)) + }) + .collect::>()?; + + // Note: this is the exception to the case where we use `Args::now`. + let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) + .expect("unable to setup anti-replay"); + let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); + + let server: Box = if args.shared.use_old_http { + Box::new( + http09::HttpServer::new(&args, anti_replay, cid_mgr).expect("We cannot make a server!"), + ) + } else { + Box::new(http3::HttpServer::new(&args, anti_replay, cid_mgr)) + }; + + ServerRunner::new(Box::new(move || args.now()), server, sockets) + .run() + .await }