Skip to content

Commit

Permalink
wrap the send in a tokio timeout with a 5s timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
nullren committed Jul 19, 2023
1 parent 3b91662 commit 25c982d
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
},
sources::util::http::HttpMethod,
tls::TlsSettings,
Error, SourceSender,
SourceSender,
};
use vector_common::shutdown::ShutdownSignal;
use vector_core::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
Expand All @@ -51,6 +51,9 @@ pub(crate) const fn default_interval() -> Duration {
Duration::from_secs(15)
}

/// The default timeout for the HTTP request if none is configured.
const DEFAULT_TARGET_TIMEOUT: Duration = Duration::from_secs(5);

/// Builds the context, allowing the source-specific implementation to leverage data from the
/// config and the current HTTP request.
pub(crate) trait HttpClientBuilder {
Expand Down Expand Up @@ -157,9 +160,19 @@ pub(crate) async fn call<
}

let start = Instant::now();
client
.send(request)
.map_err(Error::from)
let timeout = std::cmp::min(DEFAULT_TARGET_TIMEOUT, inputs.interval);
tokio::time::timeout(timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
timeout.as_secs_f32()
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
Expand Down Expand Up @@ -224,8 +237,9 @@ pub(crate) async fn call<
})
})
.flatten()
.boxed()
})
.flatten()
.flatten_unordered(None)
.boxed();

match out.send_event_stream(&mut stream).await {
Expand Down

0 comments on commit 25c982d

Please sign in to comment.