Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use std::io::BufWriter with a 64KB buffer for dumping data #1752

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions neqo-bin/src/bin/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
fs::File,
io::Write,
io::{BufWriter, Write},
net::SocketAddr,
path::PathBuf,
rc::Rc,
Expand All @@ -26,7 +26,7 @@ use neqo_transport::{
use url::Url;

use super::{get_output_file, Args, KeyUpdateState, Res};
use crate::qlog_new;
use crate::{qlog_new, BUFWRITER_BUFFER_SIZE};

pub struct Handler<'a> {
streams: HashMap<StreamId, Option<File>>,
Expand Down Expand Up @@ -219,7 +219,7 @@ impl<'b> Handler<'b> {
client: &mut Connection,
stream_id: StreamId,
output_read_data: bool,
maybe_out_file: &mut Option<File>,
maybe_out_file: &mut Option<BufWriter<File>>,
) -> Res<bool> {
let mut data = vec![0; 4096];
loop {
Expand All @@ -228,7 +228,7 @@ impl<'b> Handler<'b> {
return Ok(fin);
}

if let Some(out_file) = maybe_out_file {
if let Some(ref mut out_file) = maybe_out_file {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(ref mut out_file) = maybe_out_file {
if let Some(out_file) = maybe_out_file {

ref mut needed? Compiles for me without.

out_file.write_all(&data[..sz])?;
} else if !output_read_data {
println!("READ[{stream_id}]: {sz} bytes");
Expand All @@ -253,15 +253,21 @@ impl<'b> Handler<'b> {
return Ok(());
}
Some(maybe_out_file) => {
let mut buf_writer = maybe_out_file
.take()
.map(|file| BufWriter::with_capacity(BUFWRITER_BUFFER_SIZE, file));
Comment on lines +256 to +258
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why construct a new BufWriter on each read call? Shouldn't the BufWriter be wrapping the File for the lifetime of the File? That would be consistent with the http3.rs implementation.


let fin_recvd = Self::read_from_stream(
client,
stream_id,
self.args.output_read_data,
maybe_out_file,
&mut buf_writer,
)?;

if fin_recvd {
if maybe_out_file.is_none() {
if buf_writer.is_some() {
buf_writer.take().unwrap().flush()?;
} else {
println!("<FIN[{stream_id}]>");
}
self.streams.remove(&stream_id);
Expand Down
24 changes: 15 additions & 9 deletions neqo-bin/src/bin/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
collections::{HashMap, VecDeque},
fmt::Display,
fs::File,
io::Write,
io::{BufWriter, Write},
net::SocketAddr,
path::PathBuf,
rc::Rc,
Expand All @@ -26,7 +26,7 @@ use neqo_transport::{
};
use url::Url;

use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res};
use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res, BUFWRITER_BUFFER_SIZE};

pub(crate) struct Handler<'a> {
#[allow(
Expand Down Expand Up @@ -255,8 +255,10 @@ impl StreamHandlerType {
match handler_type {
Self::Download => {
let out_file = get_output_file(url, &args.output_dir, all_paths);
let buf_writer =
out_file.map(|file| BufWriter::with_capacity(BUFWRITER_BUFFER_SIZE, file));
Comment on lines +258 to +259
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this into get_output_file. get_output_file is called by both http3 and http09, thus no need for duplication.

client.stream_close_send(client_stream_id).unwrap();
Box::new(DownloadStreamHandler { out_file })
Box::new(DownloadStreamHandler { buf_writer })
}
Self::Upload => Box::new(UploadStreamHandler {
data: vec![42; args.upload_size],
Expand All @@ -269,12 +271,12 @@ impl StreamHandlerType {
}

struct DownloadStreamHandler {
out_file: Option<File>,
buf_writer: Option<BufWriter<File>>,
}

impl StreamHandler for DownloadStreamHandler {
fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>) {
if self.out_file.is_none() {
if self.buf_writer.is_none() {
println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}");
}
}
Expand All @@ -287,9 +289,9 @@ impl StreamHandler for DownloadStreamHandler {
sz: usize,
output_read_data: bool,
) -> Res<bool> {
if let Some(out_file) = &mut self.out_file {
if let Some(buf_writer) = &mut self.buf_writer {
if sz > 0 {
out_file.write_all(&data[..sz])?;
buf_writer.write_all(&data[..sz])?;
}
return Ok(true);
} else if !output_read_data {
Expand All @@ -300,8 +302,12 @@ impl StreamHandler for DownloadStreamHandler {
println!("READ[{}]: 0x{}", stream_id, hex(&data));
}

if fin && self.out_file.is_none() {
println!("<FIN[{stream_id}]>");
if fin {
if let Some(mut buf_writer) = self.buf_writer.take() {
buf_writer.flush()?;
} else {
println!("<FIN[{stream_id}]>");
}
}

Ok(true)
Expand Down
2 changes: 2 additions & 0 deletions neqo-bin/src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use url::{Origin, Url};
mod http09;
mod http3;

const BUFWRITER_BUFFER_SIZE: usize = 64 * 1024;

#[derive(Debug)]
pub enum ClientError {
ArgumentError(&'static str),
Expand Down