From 6b921de08f8fec010debbb94a5e75281342fcca9 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Wed, 18 Dec 2024 12:52:43 -0500 Subject: [PATCH 1/4] feat: add Auth::Digest to config and exclude where not needed --- src/http.rs | 22 ++++++- src/sinks/databend/config.rs | 3 + src/sinks/prometheus/exporter.rs | 4 ++ src/sinks/websocket/sink.rs | 1 + src/sources/util/http_client.rs | 106 +++++++++++++++++++++++++++++-- 5 files changed, 129 insertions(+), 7 deletions(-) diff --git a/src/http.rs b/src/http.rs index f8e1c939c58c8..c02de85eae460 100644 --- a/src/http.rs +++ b/src/http.rs @@ -56,12 +56,14 @@ pub enum HttpError { CallRequest { source: hyper::Error }, #[snafu(display("Failed to build HTTP request: {}", source))] BuildRequest { source: http::Error }, + #[snafu(display("Expected 401 with Digest Auth"))] + DigestAuthExpectation } impl HttpError { pub const fn is_retriable(&self) -> bool { match self { - HttpError::BuildRequest { .. } | HttpError::MakeProxyConnector { .. } => false, + HttpError::BuildRequest { .. } | HttpError::MakeProxyConnector { .. } | HttpError::DigestAuthExpectation => false, HttpError::CallRequest { .. } | HttpError::BuildTlsConnector { .. } | HttpError::MakeHttpsConnector { .. } => true, @@ -295,6 +297,20 @@ pub enum Auth { /// The bearer authentication token. token: SensitiveString, }, + /// Digest authentication. + /// + /// requires a round trip to the server to get the challenge and then send the response + Digest { + /// The digest authentication username. + #[configurable(metadata(docs::examples = "${USERNAME}"))] + #[configurable(metadata(docs::examples = "username"))] + user: String, + + /// The digest authentication password. + #[configurable(metadata(docs::examples = "${PASSWORD}"))] + #[configurable(metadata(docs::examples = "password"))] + password: SensitiveString, + }, } pub trait MaybeAuth: Sized { @@ -333,6 +349,10 @@ impl Auth { Ok(auth) => map.typed_insert(auth), Err(error) => error!(message = "Invalid bearer token.", token = %token, %error), }, + Auth::Digest { user, password } => { + let auth = Authorization::basic(user.as_str(), password.inner()); + map.typed_insert(auth); + }, } } } diff --git a/src/sinks/databend/config.rs b/src/sinks/databend/config.rs index f99c2f52a25f4..970c0154db5ae 100644 --- a/src/sinks/databend/config.rs +++ b/src/sinks/databend/config.rs @@ -130,6 +130,9 @@ impl SinkConfig for DatabendConfig { Some(Auth::Bearer { .. }) => { return Err("Bearer authentication is not supported currently".into()); } + Some(Auth::Digest { .. }) => { + return Err("Digest authentication is not supported currently".into()); + }, None => {} } if let Some(database) = &self.database { diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index b896e59cfee9c..678439c5a71a8 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -322,6 +322,10 @@ fn authorized(req: &Request, auth: &Option) -> bool { ), Auth::Bearer { token } => { HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str()) + }, + Auth::Digest { .. } => { + error!("Digest authentication is not supported."); + return false; } }; diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index d9cef141dfa6d..cbba804064fe6 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -586,6 +586,7 @@ mod tests { user: _user, password: _password, } => { /* Not needed for tests at the moment */ } + Auth::Digest { .. } => { /* Not needed for tests at the moment */ } } } Ok(res) diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index fd5ffb1b03260..a7cae6cfc96c3 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -12,6 +12,8 @@ use bytes::Bytes; use futures_util::{stream, FutureExt, StreamExt, TryFutureExt}; use http::{response::Parts, Uri}; use hyper::{Body, Request}; +use md5::Digest; +use vector_lib::sensitive_string::SensitiveString; use std::time::Duration; use std::{collections::HashMap, future::ready}; use tokio_stream::wrappers::IntervalStream; @@ -136,6 +138,8 @@ pub(crate) async fn call< // proxy and tls settings. let client = HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed"); + let headers = inputs.headers.clone(); + let content_type = inputs.content_type.clone(); let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval)) .take_until(inputs.shutdown) .map(move |_| stream::iter(inputs.urls.clone())) @@ -143,6 +147,9 @@ pub(crate) async fn call< .map(move |url| { let client = client.clone(); let endpoint = url.to_string(); + let uri = url.clone(); + let content_type_inner = content_type.clone(); + let auth_inner = inputs.auth.clone(); let context_builder = context_builder.clone(); let mut context = context_builder.build(&url); @@ -158,25 +165,112 @@ pub(crate) async fn call< }; // add user specified headers - for (header, values) in &inputs.headers { + for (header, values) in &headers { for value in values { builder = builder.header(header, value); } } // set ACCEPT header if not user specified - if !inputs.headers.contains_key(http::header::ACCEPT.as_str()) { - builder = builder.header(http::header::ACCEPT, &inputs.content_type); + if !headers.contains_key(http::header::ACCEPT.as_str()) { + builder = builder.header(http::header::ACCEPT, &content_type_inner); } // building an empty request should be infallible let mut request = builder.body(Body::empty()).expect("error creating request"); - - if let Some(auth) = &inputs.auth { + + let mut is_digest = false; + let mut username = "".to_string(); + let mut user_password = SensitiveString::default(); + if let Some(auth) = auth_inner { auth.apply(&mut request); + is_digest = match auth { + Auth::Digest { user, password } => { + username = user.clone(); + user_password = password.clone(); + true + }, + _ => false + }; } - + tokio::time::timeout(inputs.timeout, client.send(request)) + .then({ + let headers_value = headers.clone(); + let username_inner = username.clone(); + let user_password_inner = user_password.clone(); + move |result| async move { + // make another round trip using digest authentication + if !is_digest { + result + } else { + // deduce we have the correct response type: 401 Unauthorized + let response = match result { + Ok(x) => x, + Err(_) => return result, + }; + let (status, response_headers) = match response { + Ok(x) => { + let code = x.status(); + let x_headers = x.headers().clone(); + (code, x_headers) + }, + Err(x) => return Ok(Err(x.into())) + }; + if status != 401 { + return Ok(Err(crate::http::HttpError::DigestAuthExpectation)) + } + let parts = response_headers + .get("www-authenticate") + .unwrap() + .to_str() + .unwrap(); + let parts: Vec<&str> = parts.split(",").collect(); + let mut realm = ""; + let mut nonce = ""; + for part in parts { + if part.contains("realm") { + realm = part.split("=").collect::>()[1].trim_matches('"'); + } + if part.contains("nonce") { + nonce = part.split("=").collect::>()[1].trim_matches('"'); + } + println!("{}", part); + } + let ha1 = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}", username_inner, realm, user_password_inner.inner()))); + let ha2 = format!("{:x}", md5::Md5::digest(format!("GET:{}", uri.path()))); + let cnonce = "00000001"; // TODO: use rng for client nonce + let nonce_count = "00000001"; + let response_digest = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}:{}:{}:{}", ha1, nonce, nonce_count, cnonce, "auth", ha2))); + let auth_header = format!( + "Digest username=\"{}\", realm=\"{}\", nonce=\"{}\", uri=\"{}\", response=\"{}\", cnonce=\"{}\", nc=\"{}\", qop=\"auth\"", + username_inner, + realm, + nonce, + uri.path(), + response_digest, + cnonce, + nonce_count + ); + // make another trip but this time with auth digest impl'd + + let mut builder = Request::get(uri); + for (header, values) in &headers_value { + for value in values { + builder = builder.header(header, value); + } + } + if !headers_value.contains_key(http::header::ACCEPT.as_str()) { + builder = builder.header(http::header::ACCEPT, &content_type_inner); + } + builder = builder.header(http::header::AUTHORIZATION, auth_header); + + let request = builder.body(Body::empty()).expect("error creating request"); + let auth_response = client.send(request).await; + Ok(auth_response) + } + } + }) .then(move |result| async move { match result { Ok(Ok(response)) => Ok(response), From 59679e066d7982b8afae5538ee51944c2504b898 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Tue, 31 Dec 2024 13:42:07 -0500 Subject: [PATCH 2/4] fix: cargo fmt --- src/http.rs | 10 ++++++---- src/sinks/databend/config.rs | 2 +- src/sinks/prometheus/exporter.rs | 2 +- src/sources/util/http_client.rs | 5 ++--- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/http.rs b/src/http.rs index c02de85eae460..6d49f858ca2f0 100644 --- a/src/http.rs +++ b/src/http.rs @@ -57,13 +57,15 @@ pub enum HttpError { #[snafu(display("Failed to build HTTP request: {}", source))] BuildRequest { source: http::Error }, #[snafu(display("Expected 401 with Digest Auth"))] - DigestAuthExpectation + DigestAuthExpectation, } impl HttpError { pub const fn is_retriable(&self) -> bool { match self { - HttpError::BuildRequest { .. } | HttpError::MakeProxyConnector { .. } | HttpError::DigestAuthExpectation => false, + HttpError::BuildRequest { .. } + | HttpError::MakeProxyConnector { .. } + | HttpError::DigestAuthExpectation => false, HttpError::CallRequest { .. } | HttpError::BuildTlsConnector { .. } | HttpError::MakeHttpsConnector { .. } => true, @@ -298,7 +300,7 @@ pub enum Auth { token: SensitiveString, }, /// Digest authentication. - /// + /// /// requires a round trip to the server to get the challenge and then send the response Digest { /// The digest authentication username. @@ -352,7 +354,7 @@ impl Auth { Auth::Digest { user, password } => { let auth = Authorization::basic(user.as_str(), password.inner()); map.typed_insert(auth); - }, + } } } } diff --git a/src/sinks/databend/config.rs b/src/sinks/databend/config.rs index 970c0154db5ae..3dfde1ff91d7c 100644 --- a/src/sinks/databend/config.rs +++ b/src/sinks/databend/config.rs @@ -132,7 +132,7 @@ impl SinkConfig for DatabendConfig { } Some(Auth::Digest { .. }) => { return Err("Digest authentication is not supported currently".into()); - }, + } None => {} } if let Some(database) = &self.database { diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 678439c5a71a8..57f103baf8a08 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -322,7 +322,7 @@ fn authorized(req: &Request, auth: &Option) -> bool { ), Auth::Bearer { token } => { HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str()) - }, + } Auth::Digest { .. } => { error!("Digest authentication is not supported."); return false; diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index a7cae6cfc96c3..a223d93e3324e 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -13,11 +13,11 @@ use futures_util::{stream, FutureExt, StreamExt, TryFutureExt}; use http::{response::Parts, Uri}; use hyper::{Body, Request}; use md5::Digest; -use vector_lib::sensitive_string::SensitiveString; use std::time::Duration; use std::{collections::HashMap, future::ready}; use tokio_stream::wrappers::IntervalStream; use vector_lib::json_size::JsonSize; +use vector_lib::sensitive_string::SensitiveString; use crate::{ http::{Auth, HttpClient}, @@ -178,7 +178,6 @@ pub(crate) async fn call< // building an empty request should be infallible let mut request = builder.body(Body::empty()).expect("error creating request"); - let mut is_digest = false; let mut username = "".to_string(); let mut user_password = SensitiveString::default(); @@ -193,7 +192,7 @@ pub(crate) async fn call< _ => false }; } - + tokio::time::timeout(inputs.timeout, client.send(request)) .then({ let headers_value = headers.clone(); From 8631757ee8f8534779d5ae43f7f377699129bc6a Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Tue, 31 Dec 2024 13:53:44 -0500 Subject: [PATCH 3/4] fix: better error handling --- src/sources/util/http_client.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index a223d93e3324e..e8f5dca9f8cd4 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -219,11 +219,13 @@ pub(crate) async fn call< if status != 401 { return Ok(Err(crate::http::HttpError::DigestAuthExpectation)) } - let parts = response_headers - .get("www-authenticate") - .unwrap() - .to_str() - .unwrap(); + let parts = match response_headers.get("www-authenticate") { + Some(header_value) => match header_value.to_str() { + Ok(value) => value, + Err(_) => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)), + }, + None => return Ok(Err(crate::http::HttpError::DigestAuthExpectation)), + }; let parts: Vec<&str> = parts.split(",").collect(); let mut realm = ""; let mut nonce = ""; From a3168581a2dcf23333c9a3e73cba03575f8fe079 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Tue, 31 Dec 2024 13:56:14 -0500 Subject: [PATCH 4/4] fix: remove extraneous println --- src/sources/util/http_client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index e8f5dca9f8cd4..b4c5bf3f8a7f2 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -236,7 +236,6 @@ pub(crate) async fn call< if part.contains("nonce") { nonce = part.split("=").collect::>()[1].trim_matches('"'); } - println!("{}", part); } let ha1 = format!("{:x}", md5::Md5::digest(format!("{}:{}:{}", username_inner, realm, user_password_inner.inner()))); let ha2 = format!("{:x}", md5::Md5::digest(format!("GET:{}", uri.path())));