diff --git a/Cargo.lock b/Cargo.lock index 49ba1fc99..279160296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,9 +875,11 @@ dependencies = [ name = "arroyo-types" version = "0.9.0-dev" dependencies = [ + "anyhow", "arrow", "arrow-array", "bincode 2.0.0-rc.3", + "regex", "serde", ] @@ -6369,14 +6371,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -6390,13 +6392,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -6417,6 +6419,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "regress" version = "0.6.0" diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index 27d6ebc3d..18eb4b791 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -1,21 +1,21 @@ use anyhow::{anyhow, bail}; -use arroyo_rpc::OperatorConfig; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::convert::Infallible; -use typify::import_types; - use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage}; +use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::BorrowedMessage, ClientConfig, Offset, TopicPartitionList, }; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::Infallible; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; use tonic::Status; use tracing::{error, info, warn}; +use typify::import_types; use crate::{pull_opt, Connection, ConnectionType}; @@ -25,7 +25,13 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json"); const ICON: &str = include_str!("../resources/kafka.svg"); -import_types!(schema = "../connector-schemas/kafka/connection.json",); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + convert = { + {type = "string", format = "var-str"} = VarStr, + {type = "string", format = "var-str", isSensitive = true} = VarStr + } +); import_types!(schema = "../connector-schemas/kafka/table.json"); pub struct KafkaConnector {} @@ -150,14 +156,16 @@ impl Connector for KafkaConnector { mechanism: pull_opt("auth.mechanism", options)?, protocol: pull_opt("auth.protocol", options)?, username: pull_opt("auth.username", options)?, - password: pull_opt("auth.password", options)?, + password: VarStr::new(pull_opt("auth.password", options)?), }, Some(other) => bail!("unknown auth type '{}'", other), }; let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| { let api_key = options.remove("schema_registry.api_key"); - let api_secret = options.remove("schema_registry.api_secret"); + let api_secret = options + .remove("schema_registry.api_secret") + .map(|secret| VarStr::new(secret)); SchemaRegistry::ConfluentSchemaRegistry { endpoint, api_key, @@ -251,7 +259,10 @@ impl KafkaTester { client_config.set("sasl.mechanism", mechanism); client_config.set("security.protocol", protocol); client_config.set("sasl.username", username); - client_config.set("sasl.password", password); + client_config.set( + "sasl.password", + password.sub_env_vars().map_err(|e| e.to_string())?, + ); } }; diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs index e1b3dea29..8df88e069 100644 --- a/arroyo-connectors/src/lib.rs +++ b/arroyo-connectors/src/lib.rs @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF } } -fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result { +fn construct_http_client(endpoint: &str, headers: Option) -> anyhow::Result { if let Err(e) = reqwest::Url::parse(&endpoint) { bail!("invalid endpoint '{}': {:?}", endpoint, e) }; diff --git a/arroyo-connectors/src/polling_http.rs b/arroyo-connectors/src/polling_http.rs index 652802b22..4f437a21b 100644 --- a/arroyo-connectors/src/polling_http.rs +++ b/arroyo-connectors/src/polling_http.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use reqwest::{Client, Request}; use tokio::sync::mpsc::Sender; @@ -20,7 +20,10 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json"); -import_types!(schema = "../connector-schemas/polling_http/table.json"); +import_types!( + schema = "../connector-schemas/polling_http/table.json", + convert = { {type = "string", format = "var-str"} = VarStr } +); const ICON: &str = include_str!("../resources/http.svg"); pub struct PollingHTTPConnector {} @@ -55,8 +58,13 @@ impl PollingHTTPConnector { config: &PollingHttpTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = config + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -126,7 +134,7 @@ impl Connector for PollingHTTPConnector { Err(err) => TestSourceMessage { error: true, done: true, - message: format!("{:?}", err), + message: format!("{:?}", err.root_cause()), }, }; @@ -166,7 +174,7 @@ impl Connector for PollingHTTPConnector { EmptyConfig {}, PollingHttpTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), method, body, poll_interval_ms: interval, @@ -187,7 +195,7 @@ impl Connector for PollingHTTPConnector { let description = format!("PollingHTTPSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" diff --git a/arroyo-connectors/src/sse.rs b/arroyo-connectors/src/sse.rs index a3ff9b01c..fa6735952 100644 --- a/arroyo-connectors/src/sse.rs +++ b/arroyo-connectors/src/sse.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::{anyhow, bail}; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use eventsource_client::Client; use futures::StreamExt; @@ -22,7 +22,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/sse/table.json"); -import_types!(schema = "../connector-schemas/sse/table.json"); +import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/sse.svg"); pub struct SSEConnector {} @@ -79,7 +79,7 @@ impl Connector for SSEConnector { let description = format!("SSESource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -134,7 +134,7 @@ impl Connector for SSEConnector { SseTable { endpoint, events, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), }, schema, ) @@ -174,11 +174,13 @@ impl SseTester { .map_err(|_| anyhow!("Endpoint URL is invalid"))?; let headers = string_to_map( - self.config + &self + .config .headers .as_ref() - .map(|t| t.0.as_str()) - .unwrap_or(""), + .map(|s| s.sub_env_vars()) + .transpose()? + .unwrap_or("".to_string()), ) .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))?; diff --git a/arroyo-connectors/src/webhook.rs b/arroyo-connectors/src/webhook.rs index 3016c2647..53e797b39 100644 --- a/arroyo-connectors/src/webhook.rs +++ b/arroyo-connectors/src/webhook.rs @@ -4,6 +4,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use reqwest::{Client, Request}; use serde_json::json; @@ -21,7 +22,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/webhook/table.json"); -import_types!(schema = "../connector-schemas/webhook/table.json"); +import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/webhook.svg"); pub struct WebhookConnector {} @@ -47,8 +48,13 @@ impl WebhookConnector { config: &WebhookTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = config + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -176,14 +182,18 @@ impl Connector for WebhookConnector { ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; - let headers = options - .remove("headers") - .map(|s| s.try_into()) - .transpose() - .map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?; + let headers = options.remove("headers").map(|s| VarStr::new(s)); let table = WebhookTable { endpoint, headers }; - let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?; + + let client = construct_http_client( + &table.endpoint, + table + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?, + )?; let _ = Self::construct_test_request(&client, &table)?; self.from_config(None, name, EmptyConfig {}, table, schema) diff --git a/arroyo-connectors/src/websocket.rs b/arroyo-connectors/src/websocket.rs index dc52f8116..e8393b080 100644 --- a/arroyo-connectors/src/websocket.rs +++ b/arroyo-connectors/src/websocket.rs @@ -8,7 +8,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, }; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; @@ -25,7 +25,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/websocket/table.json"); -import_types!(schema = "../connector-schemas/websocket/table.json"); +import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/websocket.svg"); pub struct WebsocketConnector {} @@ -79,16 +79,23 @@ impl Connector for WebsocketConnector { } }; - let headers = - match string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs")) - { - Ok(headers) => headers, - Err(e) => { - send(true, true, format!("Failed to parse headers: {:?}", e)).await; - return; - } - }; + let headers_str = match table.headers.as_ref().map(|s| s.sub_env_vars()).transpose() { + Ok(headers) => headers, + Err(e) => { + send(true, true, format!("{}", e)).await; + return; + } + }; + + let headers = match string_to_map(&headers_str.unwrap_or("".to_string())) + .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs")) + { + Ok(headers) => headers, + Err(e) => { + send(true, true, format!("Failed to parse headers: {:?}", e)).await; + return; + } + }; let uri = match Uri::from_str(&table.endpoint.to_string()) { Ok(uri) => uri, @@ -215,7 +222,7 @@ impl Connector for WebsocketConnector { let description = format!("WebsocketSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -289,7 +296,7 @@ impl Connector for WebsocketConnector { EmptyConfig {}, WebsocketTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), subscription_message: None, subscription_messages, }, diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 19ee2ecfc..36f8943e0 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -635,6 +635,7 @@ export function JsonForm({ }) { let ajv = new Ajv(); ajv.addKeyword('isSensitive'); + ajv.addFormat('var-str', { validate: () => true }); const memoAjv = useMemo(() => addFormats(ajv), [schema]); const formik = useFormik({ diff --git a/arroyo-rpc/src/schema_resolver.rs b/arroyo-rpc/src/schema_resolver.rs index e57c63291..8ee1894bd 100644 --- a/arroyo-rpc/src/schema_resolver.rs +++ b/arroyo-rpc/src/schema_resolver.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, bail}; use apache_avro::Schema; +use arroyo_types::VarStr; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; use serde::de::DeserializeOwned; @@ -103,7 +104,7 @@ pub struct ConfluentSchemaRegistry { topic: String, client: Client, api_key: Option, - api_secret: Option, + api_secret: Option, } impl ConfluentSchemaRegistry { @@ -111,7 +112,7 @@ impl ConfluentSchemaRegistry { endpoint: &str, topic: &str, api_key: Option, - api_secret: Option, + api_secret: Option, ) -> anyhow::Result { let client = Client::builder() .timeout(Duration::from_secs(5)) @@ -226,8 +227,14 @@ impl ConfluentSchemaRegistry { async fn get_schema_for_url(&self, url: Url) -> anyhow::Result { let mut get_call = self.client.get(url.clone()); + let secret = self + .api_secret + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + if let Some(api_key) = self.api_key.as_ref() { - get_call = get_call.basic_auth(api_key, self.api_secret.as_ref()); + get_call = get_call.basic_auth(api_key, secret); } let resp = get_call.send().await.map_err(|e| { diff --git a/arroyo-types/Cargo.toml b/arroyo-types/Cargo.toml index 05a108232..e317bf3a4 100644 --- a/arroyo-types/Cargo.toml +++ b/arroyo-types/Cargo.toml @@ -8,3 +8,5 @@ bincode = "2.0.0-rc.3" serde = { version = "1.0", features = ["derive"] } arrow = { workspace = true } arrow-array = { workspace = true } +regex = "1.10.2" +anyhow = "1.0.75" diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index c5743b889..d3f8fb485 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -1,15 +1,16 @@ use arrow::datatypes::SchemaRef; use arrow_array::RecordBatch; use bincode::{config, Decode, Encode}; +use regex::Regex; use serde::ser::SerializeStruct; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; -use std::env; use std::fmt::Debug; use std::hash::Hash; use std::ops::{Range, RangeInclusive}; use std::str::FromStr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{env, fmt}; #[derive(Copy, Hash, Debug, Clone, Eq, PartialEq, Encode, Decode, PartialOrd, Ord, Deserialize)] pub struct Window { @@ -813,7 +814,10 @@ pub enum DateTruncPrecision { Second, } +use anyhow::bail; +use serde::de::Visitor; use std::convert::TryFrom; +use std::sync::OnceLock; impl TryFrom<&str> for DatePart { type Error = String; @@ -874,6 +878,76 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } +#[derive(Debug, Clone)] +pub struct VarStr { + raw_val: String, +} + +impl Serialize for VarStr { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.raw_val) + } +} + +struct VarStrVisitor; + +impl<'de> Visitor<'de> for VarStrVisitor { + type Value = VarStr; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a string") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(VarStr { + raw_val: value.to_owned(), + }) + } +} + +impl<'de> Deserialize<'de> for VarStr { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_string(VarStrVisitor) + } +} + +impl VarStr { + pub fn new(raw_val: String) -> Self { + VarStr { raw_val } + } + + pub fn sub_env_vars(&self) -> anyhow::Result { + // Regex to match patterns like {{ VAR_NAME }} + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); + + let mut result = self.raw_val.to_string(); + + for caps in re.captures_iter(&self.raw_val) { + let var_name = caps.get(1).unwrap().as_str(); + let full_match = caps.get(0).unwrap().as_str(); + + match env::var(var_name) { + Ok(value) => { + result = result.replace(full_match, &value); + } + Err(_) => bail!("Environment variable {} not found", var_name), + } + } + + Ok(result) + } +} + #[cfg(test)] mod tests { use super::*; @@ -910,4 +984,36 @@ mod tests { "u64::MAX is not in the correct range" ); } + + #[test] + fn test_no_placeholders() { + let input = "This is a test string with no placeholders"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + input + ); + } + + #[test] + fn test_with_placeholders() { + env::set_var("TEST_VAR", "environment variable"); + let input = "This is a {{ TEST_VAR }}"; + let expected = "This is a environment variable"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + expected + ); + } + + #[test] + fn test_multiple_placeholders() { + env::set_var("VAR1", "first"); + env::set_var("VAR2", "second"); + let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}"; + let expected = "Here is the first and here is the second"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + expected + ); + } } diff --git a/arroyo-worker/src/connectors/kafka/mod.rs b/arroyo-worker/src/connectors/kafka/mod.rs index 2282bbacb..f095294bf 100644 --- a/arroyo-worker/src/connectors/kafka/mod.rs +++ b/arroyo-worker/src/connectors/kafka/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use arroyo_types::VarStr; use rdkafka::Offset; use serde::{Deserialize, Serialize}; use typify::import_types; @@ -7,7 +8,13 @@ use typify::import_types; pub mod sink; pub mod source; -import_types!(schema = "../connector-schemas/kafka/connection.json"); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + convert = { + {type = "string", format = "var-str"} = VarStr, + {type = "string", format = "var-str", isSensitive = true} = VarStr + } +); import_types!(schema = "../connector-schemas/kafka/table.json"); impl SourceOffset { @@ -33,7 +40,12 @@ pub fn client_configs(connection: &KafkaConfig) -> HashMap { client_configs.insert("sasl.mechanism".to_string(), mechanism.to_string()); client_configs.insert("security.protocol".to_string(), protocol.to_string()); client_configs.insert("sasl.username".to_string(), username.to_string()); - client_configs.insert("sasl.password".to_string(), password.to_string()); + client_configs.insert( + "sasl.password".to_string(), + password + .sub_env_vars() + .expect("Failed to substitute env vars"), + ); } }; diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index 676fae667..c49a408b1 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration}; use arroyo_macro::source_fn; use arroyo_rpc::ControlMessage; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -26,8 +26,10 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/polling_http/table.json"); - +import_types!( + schema = "../connector-schemas/polling_http/table.json", + convert = { {type = "string", format = "var-str"} = VarStr } +); const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1); const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody @@ -66,18 +68,24 @@ where let table: PollingHttpTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (&v).try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); let deserializer = DataDeserializer::new( config diff --git a/arroyo-worker/src/connectors/sse.rs b/arroyo-worker/src/connectors/sse.rs index 52a4eb395..144ee4661 100644 --- a/arroyo-worker/src/connectors/sse.rs +++ b/arroyo-worker/src/connectors/sse.rs @@ -6,7 +6,7 @@ use arroyo_rpc::formats::{Format, Framing}; use arroyo_rpc::grpc::{StopMode, TableDescriptor}; use arroyo_rpc::{ControlMessage, ControlResp, OperatorConfig}; use arroyo_state::tables::global_keyed_map::GlobalKeyedState; -use arroyo_types::{string_to_map, Data, Message, Record, Watermark}; +use arroyo_types::{string_to_map, Data, Message, Record, VarStr, Watermark}; use bincode::{Decode, Encode}; use eventsource_client::{Client, SSE}; use futures::StreamExt; @@ -19,7 +19,7 @@ use tokio::select; use tracing::{debug, info}; use typify::import_types; -import_types!(schema = "../connector-schemas/sse/table.json"); +import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); #[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd, Default)] pub struct SSESourceState { @@ -72,9 +72,14 @@ where let table: SseTable = serde_json::from_value(config.table).expect("Invalid table config for SSESource"); + let headers = table + .headers + .as_ref() + .map(|s| s.sub_env_vars().expect("Failed to substitute env vars")); + Self { url: table.endpoint, - headers: string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) + headers: string_to_map(&headers.unwrap_or("".to_string())) .expect("Invalid header map") .into_iter() .collect(), diff --git a/arroyo-worker/src/connectors/webhook.rs b/arroyo-worker/src/connectors/webhook.rs index 882b015f5..5cf40fedc 100644 --- a/arroyo-worker/src/connectors/webhook.rs +++ b/arroyo-worker/src/connectors/webhook.rs @@ -7,7 +7,7 @@ use std::{ use arroyo_macro::process_fn; use arroyo_rpc::ControlResp; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, CheckpointBarrier, Key, Record}; +use arroyo_types::{string_to_map, CheckpointBarrier, Key, Record, VarStr}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, Semaphore}; @@ -18,7 +18,7 @@ use typify::import_types; use crate::engine::{Context, StreamNode}; -import_types!(schema = "../connector-schemas/webhook/table.json"); +import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const MAX_INFLIGHT: u32 = 50; @@ -48,18 +48,24 @@ where let table: WebhookTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (&v).try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); Self { url: Arc::new(table.endpoint), diff --git a/arroyo-worker/src/connectors/websocket.rs b/arroyo-worker/src/connectors/websocket.rs index 2c4c6852c..fa9196518 100644 --- a/arroyo-worker/src/connectors/websocket.rs +++ b/arroyo-worker/src/connectors/websocket.rs @@ -11,7 +11,7 @@ use arroyo_rpc::{ ControlMessage, OperatorConfig, }; use arroyo_state::tables::global_keyed_map::GlobalKeyedState; -use arroyo_types::{string_to_map, Data, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Data, Message, Record, UserError, VarStr, Watermark}; use bincode::{Decode, Encode}; use futures::{SinkExt, StreamExt}; use serde::de::DeserializeOwned; @@ -29,7 +29,7 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/websocket/table.json"); +import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); #[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd, Default)] pub struct WebsocketSourceState {} @@ -72,12 +72,28 @@ where .map(|m| m.to_string()), ); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); + Self { url: table.endpoint, - headers: string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .collect(), + headers, subscription_messages, deserializer: DataDeserializer::new( config.format.expect("WebsocketSource requires a format"), diff --git a/connector-schemas/kafka/connection.json b/connector-schemas/kafka/connection.json index cbe9c05a7..7ae797c56 100644 --- a/connector-schemas/kafka/connection.json +++ b/connector-schemas/kafka/connection.json @@ -45,8 +45,9 @@ }, "password": { "type": "string", - "description": "The password to use for SASL authentication", - "isSensitive": true + "description": "The password to use for SASL authentication. May be a [variable](https://doc.arroyo.dev/connectors/variables).", + "isSensitive": true, + "format": "var-str" } }, "additionalProperties": false @@ -81,11 +82,12 @@ "apiSecret": { "title": "API Secret", "type": "string", - "description": "Secret for your Confluent Schema Registry", + "description": "Secret for your Confluent Schema Registry. May be a [variable](https://doc.arroyo.dev/connectors/variables).", "isSensitive": true, "examples": [ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" - ] + ], + "format": "var-str" } }, "required": [ diff --git a/connector-schemas/polling_http/table.json b/connector-schemas/polling_http/table.json index f0a6ea03c..c96873d57 100644 --- a/connector-schemas/polling_http/table.json +++ b/connector-schemas/polling_http/table.json @@ -12,9 +12,9 @@ "headers": { "title": "Headers", "type": "string", - "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/connectors/variables).", + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "method": { "title": "Method", @@ -26,7 +26,9 @@ "PUT", "PATCH" ], - "examples": ["GET"] + "examples": [ + "GET" + ] }, "body": { "title": "Body", @@ -37,7 +39,9 @@ "title": "Polling Interval (ms)", "type": "integer", "description": "Number of milliseconds to wait between successful polls of the HTTP endpoint", - "examples": ["1000"] + "examples": [ + "1000" + ] }, "emit_behavior": { "title": "Emit Behavior", diff --git a/connector-schemas/sse/table.json b/connector-schemas/sse/table.json index 044c8e4e0..65b68c3e2 100644 --- a/connector-schemas/sse/table.json +++ b/connector-schemas/sse/table.json @@ -13,8 +13,8 @@ "title": "Headers", "type": "string", "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "events": { "title": "Events", diff --git a/connector-schemas/webhook/table.json b/connector-schemas/webhook/table.json index 3cfc6e8e3..08cfd37a9 100644 --- a/connector-schemas/webhook/table.json +++ b/connector-schemas/webhook/table.json @@ -14,12 +14,11 @@ "headers": { "title": "Headers", "type": "string", - "maxLength": 2048, "description": "Optional, comma separated list of headers to send with the webhook", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", "examples": [ "Authentication: Basic my-auth-secret,Content-Type: application/json" - ] + ], + "format": "var-str" } }, "required": [ diff --git a/connector-schemas/websocket/table.json b/connector-schemas/websocket/table.json index b8781ad8d..fffd7aa19 100644 --- a/connector-schemas/websocket/table.json +++ b/connector-schemas/websocket/table.json @@ -15,8 +15,8 @@ "title": "Headers", "type": "string", "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "subscription_message": { "title": "Subscription Message",