From d66c09ba015349bc5ed03e8c3a7322c6b0fb40bf Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Tue, 5 Dec 2023 17:26:31 -0800 Subject: [PATCH] Support environment variable substitution In the Kafka, Polling HTTP, WebSocket, and SSE sources, support environment variables in the passwords, secrets, and headers fields. This is done by setting the format of the field in the JSON schema to 'var-str', which tells typify to replace the type with the VarStr struct, which has a function for substituting environment variables. The VarStr struct serializes/deserializes from a JSON string. --- Cargo.lock | 22 ++-- arroyo-connectors/src/kafka.rs | 31 +++-- arroyo-connectors/src/lib.rs | 2 +- arroyo-connectors/src/polling_http.rs | 22 ++-- arroyo-connectors/src/sse.rs | 16 +-- arroyo-connectors/src/webhook.rs | 28 +++-- arroyo-connectors/src/websocket.rs | 35 +++--- .../src/routes/connections/JsonForm.tsx | 1 + arroyo-rpc/src/schema_resolver.rs | 13 ++- arroyo-types/Cargo.toml | 2 + arroyo-types/src/lib.rs | 110 +++++++++++++++++- arroyo-worker/src/connectors/kafka/mod.rs | 16 ++- arroyo-worker/src/connectors/polling_http.rs | 38 +++--- arroyo-worker/src/connectors/sse.rs | 11 +- arroyo-worker/src/connectors/webhook.rs | 34 +++--- arroyo-worker/src/connectors/websocket.rs | 28 ++++- connector-schemas/kafka/connection.json | 10 +- connector-schemas/polling_http/table.json | 14 ++- connector-schemas/sse/table.json | 4 +- connector-schemas/webhook/table.json | 5 +- connector-schemas/websocket/table.json | 4 +- 21 files changed, 330 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49ba1fc99..279160296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,9 +875,11 @@ dependencies = [ name = "arroyo-types" version = "0.9.0-dev" dependencies = [ + "anyhow", "arrow", "arrow-array", "bincode 2.0.0-rc.3", + "regex", "serde", ] @@ -6369,14 +6371,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -6390,13 +6392,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -6417,6 +6419,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "regress" version = "0.6.0" diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index 27d6ebc3d..18eb4b791 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -1,21 +1,21 @@ use anyhow::{anyhow, bail}; -use arroyo_rpc::OperatorConfig; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::convert::Infallible; -use typify::import_types; - use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage}; +use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::BorrowedMessage, ClientConfig, Offset, TopicPartitionList, }; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::Infallible; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; use tonic::Status; use tracing::{error, info, warn}; +use typify::import_types; use crate::{pull_opt, Connection, ConnectionType}; @@ -25,7 +25,13 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json"); const ICON: &str = include_str!("../resources/kafka.svg"); -import_types!(schema = "../connector-schemas/kafka/connection.json",); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + convert = { + {type = "string", format = "var-str"} = VarStr, + {type = "string", format = "var-str", isSensitive = true} = VarStr + } +); import_types!(schema = "../connector-schemas/kafka/table.json"); pub struct KafkaConnector {} @@ -150,14 +156,16 @@ impl Connector for KafkaConnector { mechanism: pull_opt("auth.mechanism", options)?, protocol: pull_opt("auth.protocol", options)?, username: pull_opt("auth.username", options)?, - password: pull_opt("auth.password", options)?, + password: VarStr::new(pull_opt("auth.password", options)?), }, Some(other) => bail!("unknown auth type '{}'", other), }; let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| { let api_key = options.remove("schema_registry.api_key"); - let api_secret = options.remove("schema_registry.api_secret"); + let api_secret = options + .remove("schema_registry.api_secret") + .map(|secret| VarStr::new(secret)); SchemaRegistry::ConfluentSchemaRegistry { endpoint, api_key, @@ -251,7 +259,10 @@ impl KafkaTester { client_config.set("sasl.mechanism", mechanism); client_config.set("security.protocol", protocol); client_config.set("sasl.username", username); - client_config.set("sasl.password", password); + client_config.set( + "sasl.password", + password.sub_env_vars().map_err(|e| e.to_string())?, + ); } }; diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs index e1b3dea29..8df88e069 100644 --- a/arroyo-connectors/src/lib.rs +++ b/arroyo-connectors/src/lib.rs @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF } } -fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result { +fn construct_http_client(endpoint: &str, headers: Option) -> anyhow::Result { if let Err(e) = reqwest::Url::parse(&endpoint) { bail!("invalid endpoint '{}': {:?}", endpoint, e) }; diff --git a/arroyo-connectors/src/polling_http.rs b/arroyo-connectors/src/polling_http.rs index 652802b22..4f437a21b 100644 --- a/arroyo-connectors/src/polling_http.rs +++ b/arroyo-connectors/src/polling_http.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use reqwest::{Client, Request}; use tokio::sync::mpsc::Sender; @@ -20,7 +20,10 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json"); -import_types!(schema = "../connector-schemas/polling_http/table.json"); +import_types!( + schema = "../connector-schemas/polling_http/table.json", + convert = { {type = "string", format = "var-str"} = VarStr } +); const ICON: &str = include_str!("../resources/http.svg"); pub struct PollingHTTPConnector {} @@ -55,8 +58,13 @@ impl PollingHTTPConnector { config: &PollingHttpTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = config + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -126,7 +134,7 @@ impl Connector for PollingHTTPConnector { Err(err) => TestSourceMessage { error: true, done: true, - message: format!("{:?}", err), + message: format!("{:?}", err.root_cause()), }, }; @@ -166,7 +174,7 @@ impl Connector for PollingHTTPConnector { EmptyConfig {}, PollingHttpTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), method, body, poll_interval_ms: interval, @@ -187,7 +195,7 @@ impl Connector for PollingHTTPConnector { let description = format!("PollingHTTPSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" diff --git a/arroyo-connectors/src/sse.rs b/arroyo-connectors/src/sse.rs index a3ff9b01c..fa6735952 100644 --- a/arroyo-connectors/src/sse.rs +++ b/arroyo-connectors/src/sse.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::{anyhow, bail}; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use eventsource_client::Client; use futures::StreamExt; @@ -22,7 +22,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/sse/table.json"); -import_types!(schema = "../connector-schemas/sse/table.json"); +import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/sse.svg"); pub struct SSEConnector {} @@ -79,7 +79,7 @@ impl Connector for SSEConnector { let description = format!("SSESource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -134,7 +134,7 @@ impl Connector for SSEConnector { SseTable { endpoint, events, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), }, schema, ) @@ -174,11 +174,13 @@ impl SseTester { .map_err(|_| anyhow!("Endpoint URL is invalid"))?; let headers = string_to_map( - self.config + &self + .config .headers .as_ref() - .map(|t| t.0.as_str()) - .unwrap_or(""), + .map(|s| s.sub_env_vars()) + .transpose()? + .unwrap_or("".to_string()), ) .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs"))?; diff --git a/arroyo-connectors/src/webhook.rs b/arroyo-connectors/src/webhook.rs index 3016c2647..53e797b39 100644 --- a/arroyo-connectors/src/webhook.rs +++ b/arroyo-connectors/src/webhook.rs @@ -4,6 +4,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use reqwest::{Client, Request}; use serde_json::json; @@ -21,7 +22,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/webhook/table.json"); -import_types!(schema = "../connector-schemas/webhook/table.json"); +import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/webhook.svg"); pub struct WebhookConnector {} @@ -47,8 +48,13 @@ impl WebhookConnector { config: &WebhookTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = config + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -176,14 +182,18 @@ impl Connector for WebhookConnector { ) -> anyhow::Result { let endpoint = pull_opt("endpoint", options)?; - let headers = options - .remove("headers") - .map(|s| s.try_into()) - .transpose() - .map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?; + let headers = options.remove("headers").map(|s| VarStr::new(s)); let table = WebhookTable { endpoint, headers }; - let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?; + + let client = construct_http_client( + &table.endpoint, + table + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?, + )?; let _ = Self::construct_test_request(&client, &table)?; self.from_config(None, name, EmptyConfig {}, table, schema) diff --git a/arroyo-connectors/src/websocket.rs b/arroyo-connectors/src/websocket.rs index dc52f8116..e8393b080 100644 --- a/arroyo-connectors/src/websocket.rs +++ b/arroyo-connectors/src/websocket.rs @@ -8,7 +8,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, }; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; @@ -25,7 +25,7 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/websocket/table.json"); -import_types!(schema = "../connector-schemas/websocket/table.json"); +import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const ICON: &str = include_str!("../resources/websocket.svg"); pub struct WebsocketConnector {} @@ -79,16 +79,23 @@ impl Connector for WebsocketConnector { } }; - let headers = - match string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs")) - { - Ok(headers) => headers, - Err(e) => { - send(true, true, format!("Failed to parse headers: {:?}", e)).await; - return; - } - }; + let headers_str = match table.headers.as_ref().map(|s| s.sub_env_vars()).transpose() { + Ok(headers) => headers, + Err(e) => { + send(true, true, format!("{}", e)).await; + return; + } + }; + + let headers = match string_to_map(&headers_str.unwrap_or("".to_string())) + .ok_or_else(|| anyhow!("Headers are invalid; should be comma-separated pairs")) + { + Ok(headers) => headers, + Err(e) => { + send(true, true, format!("Failed to parse headers: {:?}", e)).await; + return; + } + }; let uri = match Uri::from_str(&table.endpoint.to_string()) { Ok(uri) => uri, @@ -215,7 +222,7 @@ impl Connector for WebsocketConnector { let description = format!("WebsocketSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars()?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" @@ -289,7 +296,7 @@ impl Connector for WebsocketConnector { EmptyConfig {}, WebsocketTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), subscription_message: None, subscription_messages, }, diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 19ee2ecfc..36f8943e0 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -635,6 +635,7 @@ export function JsonForm({ }) { let ajv = new Ajv(); ajv.addKeyword('isSensitive'); + ajv.addFormat('var-str', { validate: () => true }); const memoAjv = useMemo(() => addFormats(ajv), [schema]); const formik = useFormik({ diff --git a/arroyo-rpc/src/schema_resolver.rs b/arroyo-rpc/src/schema_resolver.rs index e57c63291..8ee1894bd 100644 --- a/arroyo-rpc/src/schema_resolver.rs +++ b/arroyo-rpc/src/schema_resolver.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, bail}; use apache_avro::Schema; +use arroyo_types::VarStr; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; use serde::de::DeserializeOwned; @@ -103,7 +104,7 @@ pub struct ConfluentSchemaRegistry { topic: String, client: Client, api_key: Option, - api_secret: Option, + api_secret: Option, } impl ConfluentSchemaRegistry { @@ -111,7 +112,7 @@ impl ConfluentSchemaRegistry { endpoint: &str, topic: &str, api_key: Option, - api_secret: Option, + api_secret: Option, ) -> anyhow::Result { let client = Client::builder() .timeout(Duration::from_secs(5)) @@ -226,8 +227,14 @@ impl ConfluentSchemaRegistry { async fn get_schema_for_url(&self, url: Url) -> anyhow::Result { let mut get_call = self.client.get(url.clone()); + let secret = self + .api_secret + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose()?; + if let Some(api_key) = self.api_key.as_ref() { - get_call = get_call.basic_auth(api_key, self.api_secret.as_ref()); + get_call = get_call.basic_auth(api_key, secret); } let resp = get_call.send().await.map_err(|e| { diff --git a/arroyo-types/Cargo.toml b/arroyo-types/Cargo.toml index 05a108232..e317bf3a4 100644 --- a/arroyo-types/Cargo.toml +++ b/arroyo-types/Cargo.toml @@ -8,3 +8,5 @@ bincode = "2.0.0-rc.3" serde = { version = "1.0", features = ["derive"] } arrow = { workspace = true } arrow-array = { workspace = true } +regex = "1.10.2" +anyhow = "1.0.75" diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index c5743b889..d3f8fb485 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -1,15 +1,16 @@ use arrow::datatypes::SchemaRef; use arrow_array::RecordBatch; use bincode::{config, Decode, Encode}; +use regex::Regex; use serde::ser::SerializeStruct; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; -use std::env; use std::fmt::Debug; use std::hash::Hash; use std::ops::{Range, RangeInclusive}; use std::str::FromStr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{env, fmt}; #[derive(Copy, Hash, Debug, Clone, Eq, PartialEq, Encode, Decode, PartialOrd, Ord, Deserialize)] pub struct Window { @@ -813,7 +814,10 @@ pub enum DateTruncPrecision { Second, } +use anyhow::bail; +use serde::de::Visitor; use std::convert::TryFrom; +use std::sync::OnceLock; impl TryFrom<&str> for DatePart { type Error = String; @@ -874,6 +878,76 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } +#[derive(Debug, Clone)] +pub struct VarStr { + raw_val: String, +} + +impl Serialize for VarStr { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.raw_val) + } +} + +struct VarStrVisitor; + +impl<'de> Visitor<'de> for VarStrVisitor { + type Value = VarStr; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a string") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(VarStr { + raw_val: value.to_owned(), + }) + } +} + +impl<'de> Deserialize<'de> for VarStr { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_string(VarStrVisitor) + } +} + +impl VarStr { + pub fn new(raw_val: String) -> Self { + VarStr { raw_val } + } + + pub fn sub_env_vars(&self) -> anyhow::Result { + // Regex to match patterns like {{ VAR_NAME }} + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); + + let mut result = self.raw_val.to_string(); + + for caps in re.captures_iter(&self.raw_val) { + let var_name = caps.get(1).unwrap().as_str(); + let full_match = caps.get(0).unwrap().as_str(); + + match env::var(var_name) { + Ok(value) => { + result = result.replace(full_match, &value); + } + Err(_) => bail!("Environment variable {} not found", var_name), + } + } + + Ok(result) + } +} + #[cfg(test)] mod tests { use super::*; @@ -910,4 +984,36 @@ mod tests { "u64::MAX is not in the correct range" ); } + + #[test] + fn test_no_placeholders() { + let input = "This is a test string with no placeholders"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + input + ); + } + + #[test] + fn test_with_placeholders() { + env::set_var("TEST_VAR", "environment variable"); + let input = "This is a {{ TEST_VAR }}"; + let expected = "This is a environment variable"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + expected + ); + } + + #[test] + fn test_multiple_placeholders() { + env::set_var("VAR1", "first"); + env::set_var("VAR2", "second"); + let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}"; + let expected = "Here is the first and here is the second"; + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + expected + ); + } } diff --git a/arroyo-worker/src/connectors/kafka/mod.rs b/arroyo-worker/src/connectors/kafka/mod.rs index 2282bbacb..f095294bf 100644 --- a/arroyo-worker/src/connectors/kafka/mod.rs +++ b/arroyo-worker/src/connectors/kafka/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use arroyo_types::VarStr; use rdkafka::Offset; use serde::{Deserialize, Serialize}; use typify::import_types; @@ -7,7 +8,13 @@ use typify::import_types; pub mod sink; pub mod source; -import_types!(schema = "../connector-schemas/kafka/connection.json"); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + convert = { + {type = "string", format = "var-str"} = VarStr, + {type = "string", format = "var-str", isSensitive = true} = VarStr + } +); import_types!(schema = "../connector-schemas/kafka/table.json"); impl SourceOffset { @@ -33,7 +40,12 @@ pub fn client_configs(connection: &KafkaConfig) -> HashMap { client_configs.insert("sasl.mechanism".to_string(), mechanism.to_string()); client_configs.insert("security.protocol".to_string(), protocol.to_string()); client_configs.insert("sasl.username".to_string(), username.to_string()); - client_configs.insert("sasl.password".to_string(), password.to_string()); + client_configs.insert( + "sasl.password".to_string(), + password + .sub_env_vars() + .expect("Failed to substitute env vars"), + ); } }; diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index 676fae667..c49a408b1 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration}; use arroyo_macro::source_fn; use arroyo_rpc::ControlMessage; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -26,8 +26,10 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/polling_http/table.json"); - +import_types!( + schema = "../connector-schemas/polling_http/table.json", + convert = { {type = "string", format = "var-str"} = VarStr } +); const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1); const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody @@ -66,18 +68,24 @@ where let table: PollingHttpTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (&v).try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); let deserializer = DataDeserializer::new( config diff --git a/arroyo-worker/src/connectors/sse.rs b/arroyo-worker/src/connectors/sse.rs index 52a4eb395..144ee4661 100644 --- a/arroyo-worker/src/connectors/sse.rs +++ b/arroyo-worker/src/connectors/sse.rs @@ -6,7 +6,7 @@ use arroyo_rpc::formats::{Format, Framing}; use arroyo_rpc::grpc::{StopMode, TableDescriptor}; use arroyo_rpc::{ControlMessage, ControlResp, OperatorConfig}; use arroyo_state::tables::global_keyed_map::GlobalKeyedState; -use arroyo_types::{string_to_map, Data, Message, Record, Watermark}; +use arroyo_types::{string_to_map, Data, Message, Record, VarStr, Watermark}; use bincode::{Decode, Encode}; use eventsource_client::{Client, SSE}; use futures::StreamExt; @@ -19,7 +19,7 @@ use tokio::select; use tracing::{debug, info}; use typify::import_types; -import_types!(schema = "../connector-schemas/sse/table.json"); +import_types!(schema = "../connector-schemas/sse/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); #[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd, Default)] pub struct SSESourceState { @@ -72,9 +72,14 @@ where let table: SseTable = serde_json::from_value(config.table).expect("Invalid table config for SSESource"); + let headers = table + .headers + .as_ref() + .map(|s| s.sub_env_vars().expect("Failed to substitute env vars")); + Self { url: table.endpoint, - headers: string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) + headers: string_to_map(&headers.unwrap_or("".to_string())) .expect("Invalid header map") .into_iter() .collect(), diff --git a/arroyo-worker/src/connectors/webhook.rs b/arroyo-worker/src/connectors/webhook.rs index 882b015f5..5cf40fedc 100644 --- a/arroyo-worker/src/connectors/webhook.rs +++ b/arroyo-worker/src/connectors/webhook.rs @@ -7,7 +7,7 @@ use std::{ use arroyo_macro::process_fn; use arroyo_rpc::ControlResp; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, CheckpointBarrier, Key, Record}; +use arroyo_types::{string_to_map, CheckpointBarrier, Key, Record, VarStr}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, Semaphore}; @@ -18,7 +18,7 @@ use typify::import_types; use crate::engine::{Context, StreamNode}; -import_types!(schema = "../connector-schemas/webhook/table.json"); +import_types!(schema = "../connector-schemas/webhook/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); const MAX_INFLIGHT: u32 = 50; @@ -48,18 +48,24 @@ where let table: WebhookTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (&v).try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); Self { url: Arc::new(table.endpoint), diff --git a/arroyo-worker/src/connectors/websocket.rs b/arroyo-worker/src/connectors/websocket.rs index 2c4c6852c..fa9196518 100644 --- a/arroyo-worker/src/connectors/websocket.rs +++ b/arroyo-worker/src/connectors/websocket.rs @@ -11,7 +11,7 @@ use arroyo_rpc::{ ControlMessage, OperatorConfig, }; use arroyo_state::tables::global_keyed_map::GlobalKeyedState; -use arroyo_types::{string_to_map, Data, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Data, Message, Record, UserError, VarStr, Watermark}; use bincode::{Decode, Encode}; use futures::{SinkExt, StreamExt}; use serde::de::DeserializeOwned; @@ -29,7 +29,7 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/websocket/table.json"); +import_types!(schema = "../connector-schemas/websocket/table.json", convert = { {type = "string", format = "var-str"} = VarStr }); #[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd, Default)] pub struct WebsocketSourceState {} @@ -72,12 +72,28 @@ where .map(|m| m.to_string()), ); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().expect("Failed to substitute env vars")) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); + Self { url: table.endpoint, - headers: string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .collect(), + headers, subscription_messages, deserializer: DataDeserializer::new( config.format.expect("WebsocketSource requires a format"), diff --git a/connector-schemas/kafka/connection.json b/connector-schemas/kafka/connection.json index cbe9c05a7..7ae797c56 100644 --- a/connector-schemas/kafka/connection.json +++ b/connector-schemas/kafka/connection.json @@ -45,8 +45,9 @@ }, "password": { "type": "string", - "description": "The password to use for SASL authentication", - "isSensitive": true + "description": "The password to use for SASL authentication. May be a [variable](https://doc.arroyo.dev/connectors/variables).", + "isSensitive": true, + "format": "var-str" } }, "additionalProperties": false @@ -81,11 +82,12 @@ "apiSecret": { "title": "API Secret", "type": "string", - "description": "Secret for your Confluent Schema Registry", + "description": "Secret for your Confluent Schema Registry. May be a [variable](https://doc.arroyo.dev/connectors/variables).", "isSensitive": true, "examples": [ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" - ] + ], + "format": "var-str" } }, "required": [ diff --git a/connector-schemas/polling_http/table.json b/connector-schemas/polling_http/table.json index f0a6ea03c..c96873d57 100644 --- a/connector-schemas/polling_http/table.json +++ b/connector-schemas/polling_http/table.json @@ -12,9 +12,9 @@ "headers": { "title": "Headers", "type": "string", - "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/connectors/variables).", + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "method": { "title": "Method", @@ -26,7 +26,9 @@ "PUT", "PATCH" ], - "examples": ["GET"] + "examples": [ + "GET" + ] }, "body": { "title": "Body", @@ -37,7 +39,9 @@ "title": "Polling Interval (ms)", "type": "integer", "description": "Number of milliseconds to wait between successful polls of the HTTP endpoint", - "examples": ["1000"] + "examples": [ + "1000" + ] }, "emit_behavior": { "title": "Emit Behavior", diff --git a/connector-schemas/sse/table.json b/connector-schemas/sse/table.json index 044c8e4e0..65b68c3e2 100644 --- a/connector-schemas/sse/table.json +++ b/connector-schemas/sse/table.json @@ -13,8 +13,8 @@ "title": "Headers", "type": "string", "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "events": { "title": "Events", diff --git a/connector-schemas/webhook/table.json b/connector-schemas/webhook/table.json index 3cfc6e8e3..08cfd37a9 100644 --- a/connector-schemas/webhook/table.json +++ b/connector-schemas/webhook/table.json @@ -14,12 +14,11 @@ "headers": { "title": "Headers", "type": "string", - "maxLength": 2048, "description": "Optional, comma separated list of headers to send with the webhook", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", "examples": [ "Authentication: Basic my-auth-secret,Content-Type: application/json" - ] + ], + "format": "var-str" } }, "required": [ diff --git a/connector-schemas/websocket/table.json b/connector-schemas/websocket/table.json index b8781ad8d..fffd7aa19 100644 --- a/connector-schemas/websocket/table.json +++ b/connector-schemas/websocket/table.json @@ -15,8 +15,8 @@ "title": "Headers", "type": "string", "description": "Comma separated list of headers to send with the request", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "format": "var-str" }, "subscription_message": { "title": "Subscription Message",