diff --git a/src/http.rs b/src/http.rs index f8e1c939c58c8..6d49f858ca2f0 100644 --- a/src/http.rs +++ b/src/http.rs @@ -56,12 +56,16 @@ pub enum HttpError { CallRequest { source: hyper::Error }, #[snafu(display("Failed to build HTTP request: {}", source))] BuildRequest { source: http::Error }, + #[snafu(display("Expected 401 with Digest Auth"))] + DigestAuthExpectation, } impl HttpError { pub const fn is_retriable(&self) -> bool { match self { - HttpError::BuildRequest { .. } | HttpError::MakeProxyConnector { .. } => false, + HttpError::BuildRequest { .. } + | HttpError::MakeProxyConnector { .. } + | HttpError::DigestAuthExpectation => false, HttpError::CallRequest { .. } | HttpError::BuildTlsConnector { .. } | HttpError::MakeHttpsConnector { .. } => true, @@ -295,6 +299,20 @@ pub enum Auth { /// The bearer authentication token. token: SensitiveString, }, + /// Digest authentication. + /// + /// requires a round trip to the server to get the challenge and then send the response + Digest { + /// The digest authentication username. + #[configurable(metadata(docs::examples = "${USERNAME}"))] + #[configurable(metadata(docs::examples = "username"))] + user: String, + + /// The digest authentication password. + #[configurable(metadata(docs::examples = "${PASSWORD}"))] + #[configurable(metadata(docs::examples = "password"))] + password: SensitiveString, + }, } pub trait MaybeAuth: Sized { @@ -333,6 +351,10 @@ impl Auth { Ok(auth) => map.typed_insert(auth), Err(error) => error!(message = "Invalid bearer token.", token = %token, %error), }, + Auth::Digest { user, password } => { + let auth = Authorization::basic(user.as_str(), password.inner()); + map.typed_insert(auth); + } } } } diff --git a/src/sinks/databend/config.rs b/src/sinks/databend/config.rs index f99c2f52a25f4..3dfde1ff91d7c 100644 --- a/src/sinks/databend/config.rs +++ b/src/sinks/databend/config.rs @@ -130,6 +130,9 @@ impl SinkConfig for DatabendConfig { Some(Auth::Bearer { .. }) => { return Err("Bearer authentication is not supported currently".into()); } + Some(Auth::Digest { .. }) => { + return Err("Digest authentication is not supported currently".into()); + } None => {} } if let Some(database) = &self.database { diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index b896e59cfee9c..57f103baf8a08 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -323,6 +323,10 @@ fn authorized(req: &Request, auth: &Option) -> bool { Auth::Bearer { token } => { HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str()) } + Auth::Digest { .. } => { + error!("Digest authentication is not supported."); + return false; + } }; if let Ok(encoded_credentials) = encoded_credentials { diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index d9cef141dfa6d..cbba804064fe6 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -586,6 +586,7 @@ mod tests { user: _user, password: _password, } => { /* Not needed for tests at the moment */ } + Auth::Digest { .. } => { /* Not needed for tests at the moment */ } } } Ok(res) diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index fd5ffb1b03260..b4c5bf3f8a7f2 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -12,10 +12,12 @@ use bytes::Bytes; use futures_util::{stream, FutureExt, StreamExt, TryFutureExt}; use http::{response::Parts, Uri}; use hyper::{Body, Request}; +use md5::Digest; use std::time::Duration; use std::{collections::HashMap, future::ready}; use tokio_stream::wrappers::IntervalStream; use vector_lib::json_size::JsonSize; +use vector_lib::sensitive_string::SensitiveString; use crate::{ http::{Auth, HttpClient}, @@ -136,6 +138,8 @@ pub(crate) async fn call< // proxy and tls settings. let client = HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed"); + let headers = inputs.headers.clone(); + let content_type = inputs.content_type.clone(); let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval)) .take_until(inputs.shutdown) .map(move |_| stream::iter(inputs.urls.clone())) @@ -143,6 +147,9 @@ pub(crate) async fn call< .map(move |url| { let client = client.clone(); let endpoint = url.to_string(); + let uri = url.clone(); + let content_type_inner = content_type.clone(); + let auth_inner = inputs.auth.clone(); let context_builder = context_builder.clone(); let mut context = context_builder.build(&url); @@ -158,25 +165,112 @@ pub(crate) async fn call< }; // add user specified headers - for (header, values) in &inputs.headers { + for (header, values) in &headers { for value in values { builder = builder.header(header, value); } } // set ACCEPT header if not user specified - if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) { - builder = builder.header(http::header::ACCEPT, &inputs.content_type); + if !headers.contains_key(http::header::ACCEPT.as_str()) { + builder = builder.header(http::header::ACCEPT, &content_type_inner); } // building an empty request should be infallible let mut request = builder.body(Body::empty()).expect("error creating request"); - - if let Some(auth) = &inputs.auth { + let mut is_digest = false; + let mut username = "".to_string(); + let mut user_password = SensitiveString::default(); + if let Some(auth) = auth_inner { auth.apply(&mut request); + is_digest = match auth { + Auth::Digest { user, password } => { + username = user.clone(); + user_password = password.clone(); + true + }, + _ => false + }; } tokio::time::timeout(inputs.timeout, client.send(request)) + .then({ + let headers_value = headers.clone(); + let username_inner = username.clone(); + let user_password_inner = user_password.clone(); + move |result| async move { + // make another round trip using digest authentication + if !is_digest { + result + } else { + // deduce we have the correct response type: 401 Unauthorized + let response = match result { + Ok(x) => x, + Err(_) => return result, + }; + let (status, response_headers) = match response { + Ok(x) => { + let code = x.status(); + let x_headers = x.headers().clone(); + (code, x_headers) + }, + Err(x) => return Ok(Err(x.into())) + }; + if status != 401 { + return Ok(Err(crate::http::HttpError::DigestAuthExpectation)) + } + let parts = match response_headers.get("www-authenticate") { + Some(header_value) => match header_value.to_str() { + Ok(value) => value, + Err(_) => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)), + }, + None => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)), + }; + let parts: Vec<&str> = parts.split(",").collect(); + let mut realm = ""; + let mut nonce = ""; + for part in parts { + if part.contains("realm") { + realm = part.split("=").collect::>()[1].trim_matches('"'); + } + if part.contains("nonce") { + nonce = part.split("=").collect::>()[1].trim_matches('"'); + } + } + let ha1 = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}", username_inner, realm, user_password_inner.inner()))); + let ha2 = format!("{:x}", md5::Md5::digest(format!("GET:{}", uri.path()))); + let cnonce = "00000001"; // TODO: use rng for client nonce + let nonce_count = "00000001"; + let response_digest = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}:{}:{}:{}", ha1, nonce, nonce_count, cnonce, "auth", ha2))); + let auth_header = format!( + "Digest username=\"{}\", realm=\"{}\", nonce=\"{}\", uri=\"{}\", response=\"{}\", cnonce=\"{}\", nc=\"{}\", qop=\"auth\"", + username_inner, + realm, + nonce, + uri.path(), + response_digest, + cnonce, + nonce_count + ); + // make another trip but this time with auth digest impl'd + + let mut builder = Request::get(uri); + for (header, values) in &headers_value { + for value in values { + builder = builder.header(header, value); + } + } + if !headers_value.contains_key(http::header::ACCEPT.as_str()) { + builder = builder.header(http::header::ACCEPT, &content_type_inner); + } + builder = builder.header(http::header::AUTHORIZATION, auth_header); + + let request = builder.body(Body::empty()).expect("error creating request"); + let auth_response = client.send(request).await; + Ok(auth_response) + } + } + }) .then(move |result| async move { match result { Ok(Ok(response)) => Ok(response),