diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs index 4a19958b0..8df88e069 100644 --- a/arroyo-connectors/src/lib.rs +++ b/arroyo-connectors/src/lib.rs @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType, }; use arroyo_rpc::primitive_to_sql; -use arroyo_types::{string_to_map, sub_env_vars}; +use arroyo_types::string_to_map; use axum::response::sse::Event; use blackhole::BlackholeConnector; use fluvio::FluvioConnector; @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF } } -fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result { +fn construct_http_client(endpoint: &str, headers: Option) -> anyhow::Result { if let Err(e) = reqwest::Url::parse(&endpoint) { bail!("invalid endpoint '{}': {:?}", endpoint, e) }; @@ -344,7 +344,7 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re Ok(( TryInto::::try_into(&k) .map_err(|_| anyhow!("invalid header name {}", k))?, - TryInto::::try_into(sub_env_vars(&v).map_err(|e| anyhow!(e))?) + TryInto::::try_into(&v) .map_err(|_| anyhow!("invalid header value {}", v))?, )) }) diff --git a/arroyo-connectors/src/polling_http.rs b/arroyo-connectors/src/polling_http.rs index be0cf3934..4ede759cb 100644 --- a/arroyo-connectors/src/polling_http.rs +++ b/arroyo-connectors/src/polling_http.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use reqwest::{Client, Request}; use tokio::sync::mpsc::Sender; @@ -20,7 +20,10 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json"); -import_types!(schema = "../connector-schemas/polling_http/table.json"); +import_types!( + schema = "../connector-schemas/polling_http/table.json", + replace = { VarStr = VarStr } +); const ICON: &str = include_str!("../resources/http.svg"); pub struct PollingHTTPConnector {} @@ -55,8 +58,18 @@ impl PollingHTTPConnector { config: &PollingHttpTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = match &config.headers { + Some(headers) => { + let s = match headers.sub_env_vars() { + Ok(s) => s, + Err(e) => return Err(anyhow!(e)), + }; + Some(s) + } + None => None, + }; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -166,7 +179,7 @@ impl Connector for PollingHTTPConnector { EmptyConfig {}, PollingHttpTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr { raw_val: s }), method, body, poll_interval_ms: interval, @@ -187,7 +200,7 @@ impl Connector for PollingHTTPConnector { let description = format!("PollingHTTPSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" diff --git a/arroyo-connectors/src/webhook.rs b/arroyo-connectors/src/webhook.rs index 3016c2647..c2434db5a 100644 --- a/arroyo-connectors/src/webhook.rs +++ b/arroyo-connectors/src/webhook.rs @@ -4,6 +4,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use reqwest::{Client, Request}; use serde_json::json; @@ -47,8 +48,10 @@ impl WebhookConnector { config: &WebhookTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let client = construct_http_client( + &config.endpoint, + config.headers.as_ref().map(|t| t.0.clone()), + )?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -183,7 +186,8 @@ impl Connector for WebhookConnector { .map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?; let table = WebhookTable { endpoint, headers }; - let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?; + let client = + construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| t.0.clone()))?; let _ = Self::construct_test_request(&client, &table)?; self.from_config(None, name, EmptyConfig {}, table, schema) diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 19ee2ecfc..de45ac022 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -437,6 +437,8 @@ export function FormInner({ return value; } + // console.log(JSON.stringify(schema, null, 2)); + return ( {Object.keys(schema.properties || {}) @@ -603,6 +605,29 @@ export function FormInner({ /> ); + } else if (property.$ref?.endsWith('VarStr')) { + const valPath = nextPath + '.raw_val'; + return ( + + {JSON.stringify(property)} + + + ); } else { console.warn('Unsupported property', property); return <>; diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index c83a644a3..3d19504c5 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -869,26 +869,33 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } -pub fn sub_env_vars(input: &str) -> Result { - // Regex to match patterns like {{ VAR_NAME }} - static RE: OnceLock = OnceLock::new(); - let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VarStr { + pub raw_val: String, +} + +impl VarStr { + pub fn sub_env_vars(&self) -> Result { + // Regex to match patterns like {{ VAR_NAME }} + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); - let mut result = input.to_string(); + let mut result = self.raw_val.to_string(); - for caps in re.captures_iter(input) { - let var_name = caps.get(1).unwrap().as_str(); - let full_match = caps.get(0).unwrap().as_str(); + for caps in re.captures_iter(&self.raw_val) { + let var_name = caps.get(1).unwrap().as_str(); + let full_match = caps.get(0).unwrap().as_str(); - match env::var(var_name) { - Ok(value) => { - result = result.replace(full_match, &value); + match env::var(var_name) { + Ok(value) => { + result = result.replace(full_match, &value); + } + Err(_) => return Err(format!("Environment variable {} not found", var_name)), } - Err(_) => return Err(format!("Environment variable {} not found", var_name)), } - } - Ok(result) + Ok(result) + } } #[cfg(test)] diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index f6ccdf5c9..666e7a8f1 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration}; use arroyo_macro::source_fn; use arroyo_rpc::ControlMessage; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, sub_env_vars, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -26,8 +26,10 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/polling_http/table.json"); - +import_types!( + schema = "../connector-schemas/polling_http/table.json", + replace = { VarStr = VarStr } +); const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1); const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody @@ -66,19 +68,24 @@ where let table: PollingHttpTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (sub_env_vars(&v).unwrap()) - .try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().unwrap_or_else(|e| panic!("{}", e))) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); let deserializer = DataDeserializer::new( config diff --git a/connector-schemas/polling_http/table.json b/connector-schemas/polling_http/table.json index ee06bb29e..dd9edf0d3 100644 --- a/connector-schemas/polling_http/table.json +++ b/connector-schemas/polling_http/table.json @@ -1,6 +1,24 @@ { "type": "object", "title": "PollingHTTPTable", + "$defs": { + "VarStr": { + "type": "object", + "properties": { + "raw_val": { + "type": "string", + "title": "Raw Value", + "description": "The raw value. May contain [variables](https://doc.arroyo.dev/todo).", + "examples": [ + "{{ MY_ENV_VAR }}" + ] + } + }, + "required": [ + "raw_val" + ] + } + }, "properties": { "endpoint": { "title": "Endpoint", @@ -11,10 +29,10 @@ }, "headers": { "title": "Headers", - "type": "string", + "type": "object", "description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/todo).", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "$ref": "#/$defs/VarStr" }, "method": { "title": "Method", @@ -26,7 +44,9 @@ "PUT", "PATCH" ], - "examples": ["GET"] + "examples": [ + "GET" + ] }, "body": { "title": "Body", @@ -37,7 +57,9 @@ "title": "Polling Interval (ms)", "type": "integer", "description": "Number of milliseconds to wait between successful polls of the HTTP endpoint", - "examples": ["1000"] + "examples": [ + "1000" + ] }, "emit_behavior": { "title": "Emit Behavior",