diff --git a/Cargo.lock b/Cargo.lock index 49ba1fc99..fa415b31b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -878,6 +878,7 @@ dependencies = [ "arrow", "arrow-array", "bincode 2.0.0-rc.3", + "regex", "serde", ] @@ -6369,14 +6370,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -6390,13 +6391,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -6417,6 +6418,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "regress" version = "0.6.0" diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs index e1b3dea29..b37c4d0fe 100644 --- a/arroyo-connectors/src/lib.rs +++ b/arroyo-connectors/src/lib.rs @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType, }; use arroyo_rpc::primitive_to_sql; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, substitute_env_vars}; use axum::response::sse::Event; use blackhole::BlackholeConnector; use fluvio::FluvioConnector; @@ -344,8 +344,10 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re Ok(( TryInto::::try_into(&k) .map_err(|_| anyhow!("invalid header name {}", k))?, - TryInto::::try_into(&v) - .map_err(|_| anyhow!("invalid header value {}", v))?, + TryInto::::try_into( + substitute_env_vars(&v).map_err(|e| anyhow!(e))?, + ) + .map_err(|_| anyhow!("invalid header value {}", v))?, )) }) .collect(); diff --git a/arroyo-connectors/src/polling_http.rs b/arroyo-connectors/src/polling_http.rs index 652802b22..be0cf3934 100644 --- a/arroyo-connectors/src/polling_http.rs +++ b/arroyo-connectors/src/polling_http.rs @@ -126,7 +126,7 @@ impl Connector for PollingHTTPConnector { Err(err) => TestSourceMessage { error: true, done: true, - message: format!("{:?}", err), + message: format!("{:?}", err.root_cause()), }, }; diff --git a/arroyo-types/Cargo.toml b/arroyo-types/Cargo.toml index 05a108232..425ca2eef 100644 --- a/arroyo-types/Cargo.toml +++ b/arroyo-types/Cargo.toml @@ -8,3 +8,4 @@ bincode = "2.0.0-rc.3" serde = { version = "1.0", features = ["derive"] } arrow = { workspace = true } arrow-array = { workspace = true } +regex = "1.10.2" diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index e7110b729..dde57ada9 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -1,6 +1,7 @@ use arrow::datatypes::SchemaRef; use arrow_array::RecordBatch; use bincode::{config, Decode, Encode}; +use regex::Regex; use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -807,6 +808,7 @@ pub enum DateTruncPrecision { } use std::convert::TryFrom; +use std::sync::OnceLock; impl TryFrom<&str> for DatePart { type Error = String; @@ -867,6 +869,28 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } +pub fn substitute_env_vars(input: &str) -> Result { + // Regex to match patterns like {{ VAR_NAME }} + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); + + let mut result = input.to_string(); + + for caps in re.captures_iter(input) { + let var_name = caps.get(1).unwrap().as_str(); + let full_match = caps.get(0).unwrap().as_str(); + + match env::var(var_name) { + Ok(value) => { + result = result.replace(full_match, &value); + } + Err(_) => return Err(format!("Environment variable {} not found", var_name)), + } + } + + Ok(result) +} + #[cfg(test)] mod tests { use super::*; @@ -903,4 +927,27 @@ mod tests { "u64::MAX is not in the correct range" ); } + + #[test] + fn test_no_placeholders() { + let input = "This is a test string with no placeholders"; + assert_eq!(substitute_env_vars(input).unwrap(), input); + } + + #[test] + fn test_with_placeholders() { + env::set_var("TEST_VAR", "environment variable"); + let input = "This is a {{ TEST_VAR }}"; + let expected = "This is a environment variable"; + assert_eq!(substitute_env_vars(input).unwrap(), expected); + } + + #[test] + fn test_multiple_placeholders() { + env::set_var("VAR1", "first"); + env::set_var("VAR2", "second"); + let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}"; + let expected = "Here is the first and here is the second"; + assert_eq!(substitute_env_vars(input).unwrap(), expected); + } } diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index 676fae667..66d5e8aaa 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration}; use arroyo_macro::source_fn; use arroyo_rpc::ControlMessage; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, substitute_env_vars, Message, Record, UserError, Watermark}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -73,7 +73,8 @@ where ( (&k).try_into() .expect(&format!("invalid header name {}", k)), - (&v).try_into() + (substitute_env_vars(&v).unwrap()) + .try_into() .expect(&format!("invalid header value {}", v)), ) }) diff --git a/connector-schemas/polling_http/table.json b/connector-schemas/polling_http/table.json index f0a6ea03c..ee06bb29e 100644 --- a/connector-schemas/polling_http/table.json +++ b/connector-schemas/polling_http/table.json @@ -12,7 +12,7 @@ "headers": { "title": "Headers", "type": "string", - "description": "Comma separated list of headers to send with the request", + "description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/todo).", "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", "examples": ["Authentication: digest 1234,Content-Type: application/json"] },