Skip to content

Commit

Permalink
Retry some pooled connections failing when server closes. Close forta…
Browse files Browse the repository at this point in the history
…nix#10

This is not a perfect solution. It works as long as we are not sending
any body bytes. We discover the error first when attempting to read
the response status line. That means we discover the error after
sending body bytes. To be able to re-send the body, we would need to
introduce a buffer to be able to replay the body on the next
request. We don't currently do that.
  • Loading branch information
algesten committed Oct 20, 2019
1 parent 1264213 commit 753d61b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 8 deletions.
12 changes: 6 additions & 6 deletions src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ pub(crate) fn send_body(
mut body: SizedReader,
do_chunk: bool,
stream: &mut Stream,
) -> IoResult<()> {
if do_chunk {
) -> IoResult<u64> {
let n = if do_chunk {
let mut chunker = chunked_transfer::Encoder::new(stream);
copy(&mut body.reader, &mut chunker)?;
copy(&mut body.reader, &mut chunker)?
} else {
copy(&mut body.reader, stream)?;
}
copy(&mut body.reader, stream)?
};

Ok(())
Ok(n)
}
15 changes: 15 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Error {
ConnectionFailed(String),
/// Too many redirects. Synthetic error `500`.
TooManyRedirects,
/// We fail to read the status line. This happens for pooled connections when
/// TLS fails and we don't notice until trying to read.
BadStatusRead,
/// A status line we don't understand `HTTP/1.1 200 OK`. Synthetic error `500`.
BadStatus,
/// A header line that couldn't be parsed. Synthetic error `500`.
Expand All @@ -23,6 +26,15 @@ pub enum Error {
}

impl Error {
// If the error is bad status read, which might happen if a TLS connections is
// closed and we only discover it when trying to read the status line from it.
pub(crate) fn is_bad_status_read(&self) -> bool {
match self {
Error::BadStatusRead => true,
_ => false,
}
}

/// For synthetic responses, this is the error code.
pub fn status(&self) -> u16 {
match self {
Expand All @@ -31,6 +43,7 @@ impl Error {
Error::DnsFailed(_) => 400,
Error::ConnectionFailed(_) => 500,
Error::TooManyRedirects => 500,
Error::BadStatusRead => 500,
Error::BadStatus => 500,
Error::BadHeader => 500,
Error::Io(_) => 500,
Expand All @@ -45,6 +58,7 @@ impl Error {
Error::DnsFailed(_) => "Dns Failed",
Error::ConnectionFailed(_) => "Connection Failed",
Error::TooManyRedirects => "Too Many Redirects",
Error::BadStatusRead => "Failed to read status line",
Error::BadStatus => "Bad Status",
Error::BadHeader => "Bad Header",
Error::Io(_) => "Network Error",
Expand All @@ -59,6 +73,7 @@ impl Error {
Error::DnsFailed(err) => format!("Dns Failed: {}", err),
Error::ConnectionFailed(err) => format!("Connection Failed: {}", err),
Error::TooManyRedirects => "Too Many Redirects".to_string(),
Error::BadStatusRead => "Failed to read status line".to_string(),
Error::BadStatus => "Bad Status".to_string(),
Error::BadHeader => "Bad Header".to_string(),
Error::Io(ioe) => format!("Network Error: {}", ioe),
Expand Down
5 changes: 4 additions & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,10 @@ impl Response {
fn do_from_read(mut reader: impl Read) -> Result<Response, Error> {
//
// HTTP/1.1 200 OK\r\n
let status_line = read_next_line(&mut reader).map_err(|_| Error::BadStatus)?;
let status_line = read_next_line(&mut reader).map_err(|e| match e.kind() {
ErrorKind::ConnectionAborted => Error::BadStatusRead,
_ => Error::BadStatus,
})?;

let (index, status) = parse_status_line(status_line.as_str())?;

Expand Down
26 changes: 26 additions & 0 deletions src/test/agent_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,29 @@ fn agent_cookies() {

agent.get("test://host/agent_cookies").call();
}

#[test]
fn connection_reuse() {
use std::io::Read;
use std::time::Duration;

let agent = Agent::default().build();
let resp = agent.get("https://fau.xxx/").call();

// use up the connection so it gets returned to the pool
assert_eq!(resp.status(), 200);
resp.into_reader().read_to_end(&mut vec![]).unwrap();

// wait for the server to close the connection. fau.xxx has a
// 2 second connection keep-alive. then it closes.
std::thread::sleep(Duration::from_secs(3));

// try and make a new request on the pool. this fails
// when we discover that the TLS connection is dead
// first when attempting to read from it.
let resp = agent.get("https://fau.xxx/").call();
if let Some(err) = resp.synthetic_error() {
panic!("Pooled connection failed! {:?}", err);
}
assert_eq!(resp.status(), 200);
}
14 changes: 13 additions & 1 deletion src/unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,23 @@ pub(crate) fn connect(
}

// send the body (which can be empty now depending on redirects)
body::send_body(body, unit.is_chunked, &mut stream)?;
let body_bytes_sent = body::send_body(body, unit.is_chunked, &mut stream)?;

// start reading the response to process cookies and redirects.
let mut resp = Response::from_read(&mut stream);

if let Some(err) = resp.synthetic_error() {
if err.is_bad_status_read() && body_bytes_sent == 0 && is_recycled {
// We try open a new connection, this happens if the remote server
// hangs a pooled connection and we only discover when trying to
// read from it. It's however only possible if we didn't send any
// body bytes. This is because we currently don't want to buffer
// any body to be able to replay it.
let empty = Payload::Empty.into_read();
return connect(req, unit, false, redirect_count, empty, redir);
}
}

// squirrel away cookies
if cfg!(feature = "cookies") {
save_cookies(&unit, &resp);
Expand Down

0 comments on commit 753d61b

Please sign in to comment.