From e77a2589178a251fbbe9b82ae4ba898074f942c2 Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Mon, 4 Dec 2023 19:11:18 -0800 Subject: [PATCH] Replace VarStr object in schema --- arroyo-connectors/src/kafka.rs | 25 +++++---- arroyo-connectors/src/lib.rs | 6 +-- arroyo-connectors/src/polling_http.rs | 21 +++++--- arroyo-connectors/src/webhook.rs | 10 ++-- .../src/routes/connections/JsonForm.tsx | 23 ++++++++ arroyo-rpc/src/schema_resolver.rs | 14 +++-- arroyo-types/src/lib.rs | 54 +++++++++++++------ arroyo-worker/src/connectors/kafka/mod.rs | 11 +++- arroyo-worker/src/connectors/polling_http.rs | 39 ++++++++------ connector-schemas/filesystem/table.json | 18 +++++++ connector-schemas/kafka/connection.json | 28 ++++++++-- connector-schemas/polling_http/table.json | 32 +++++++++-- 12 files changed, 212 insertions(+), 69 deletions(-) diff --git a/arroyo-connectors/src/kafka.rs b/arroyo-connectors/src/kafka.rs index 27d6ebc3d..85c0967b2 100644 --- a/arroyo-connectors/src/kafka.rs +++ b/arroyo-connectors/src/kafka.rs @@ -1,21 +1,21 @@ use anyhow::{anyhow, bail}; -use arroyo_rpc::OperatorConfig; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::convert::Infallible; -use typify::import_types; - use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage}; +use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use rdkafka::{ consumer::{BaseConsumer, Consumer}, message::BorrowedMessage, ClientConfig, Offset, TopicPartitionList, }; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::Infallible; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; use tonic::Status; use tracing::{error, info, warn}; +use typify::import_types; use crate::{pull_opt, Connection, ConnectionType}; @@ -25,7 +25,10 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json"); const ICON: &str = include_str!("../resources/kafka.svg"); -import_types!(schema = "../connector-schemas/kafka/connection.json",); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + replace = { VarStr = VarStr } +); import_types!(schema = "../connector-schemas/kafka/table.json"); pub struct KafkaConnector {} @@ -150,14 +153,16 @@ impl Connector for KafkaConnector { mechanism: pull_opt("auth.mechanism", options)?, protocol: pull_opt("auth.protocol", options)?, username: pull_opt("auth.username", options)?, - password: pull_opt("auth.password", options)?, + password: VarStr::new(pull_opt("auth.password", options)?), }, Some(other) => bail!("unknown auth type '{}'", other), }; let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| { let api_key = options.remove("schema_registry.api_key"); - let api_secret = options.remove("schema_registry.api_secret"); + let api_secret = options + .remove("schema_registry.api_secret") + .map(|secret| VarStr::new(secret)); SchemaRegistry::ConfluentSchemaRegistry { endpoint, api_key, @@ -251,7 +256,7 @@ impl KafkaTester { client_config.set("sasl.mechanism", mechanism); client_config.set("security.protocol", protocol); client_config.set("sasl.username", username); - client_config.set("sasl.password", password); + client_config.set("sasl.password", password.sub_env_vars()?); } }; diff --git a/arroyo-connectors/src/lib.rs b/arroyo-connectors/src/lib.rs index 4a19958b0..8df88e069 100644 --- a/arroyo-connectors/src/lib.rs +++ b/arroyo-connectors/src/lib.rs @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType, }; use arroyo_rpc::primitive_to_sql; -use arroyo_types::{string_to_map, sub_env_vars}; +use arroyo_types::string_to_map; use axum::response::sse::Event; use blackhole::BlackholeConnector; use fluvio::FluvioConnector; @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF } } -fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result { +fn construct_http_client(endpoint: &str, headers: Option) -> anyhow::Result { if let Err(e) = reqwest::Url::parse(&endpoint) { bail!("invalid endpoint '{}': {:?}", endpoint, e) }; @@ -344,7 +344,7 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re Ok(( TryInto::::try_into(&k) .map_err(|_| anyhow!("invalid header name {}", k))?, - TryInto::::try_into(sub_env_vars(&v).map_err(|e| anyhow!(e))?) + TryInto::::try_into(&v) .map_err(|_| anyhow!("invalid header value {}", v))?, )) }) diff --git a/arroyo-connectors/src/polling_http.rs b/arroyo-connectors/src/polling_http.rs index be0cf3934..a023e7a08 100644 --- a/arroyo-connectors/src/polling_http.rs +++ b/arroyo-connectors/src/polling_http.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; -use arroyo_types::string_to_map; +use arroyo_types::{string_to_map, VarStr}; use axum::response::sse::Event; use reqwest::{Client, Request}; use tokio::sync::mpsc::Sender; @@ -20,7 +20,10 @@ use super::Connector; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json"); -import_types!(schema = "../connector-schemas/polling_http/table.json"); +import_types!( + schema = "../connector-schemas/polling_http/table.json", + replace = { VarStr = VarStr } +); const ICON: &str = include_str!("../resources/http.svg"); pub struct PollingHTTPConnector {} @@ -55,8 +58,14 @@ impl PollingHTTPConnector { config: &PollingHttpTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let headers = config + .headers + .as_ref() + .map(|s| s.sub_env_vars()) + .transpose() + .map_err(|e| anyhow!(e))?; + + let client = construct_http_client(&config.endpoint, headers)?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -166,7 +175,7 @@ impl Connector for PollingHTTPConnector { EmptyConfig {}, PollingHttpTable { endpoint, - headers: headers.map(Headers), + headers: headers.map(|s| VarStr::new(s)), method, body, poll_interval_ms: interval, @@ -187,7 +196,7 @@ impl Connector for PollingHTTPConnector { let description = format!("PollingHTTPSource<{}>", table.endpoint); if let Some(headers) = &table.headers { - string_to_map(headers).ok_or_else(|| { + string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| { anyhow!( "Invalid format for headers; should be a \ comma-separated list of colon-separated key value pairs" diff --git a/arroyo-connectors/src/webhook.rs b/arroyo-connectors/src/webhook.rs index 3016c2647..c2434db5a 100644 --- a/arroyo-connectors/src/webhook.rs +++ b/arroyo-connectors/src/webhook.rs @@ -4,6 +4,7 @@ use std::convert::Infallible; use anyhow::anyhow; use arroyo_rpc::OperatorConfig; +use arroyo_types::VarStr; use axum::response::sse::Event; use reqwest::{Client, Request}; use serde_json::json; @@ -47,8 +48,10 @@ impl WebhookConnector { config: &WebhookTable, tx: Sender>, ) -> anyhow::Result<()> { - let client = - construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?; + let client = construct_http_client( + &config.endpoint, + config.headers.as_ref().map(|t| t.0.clone()), + )?; let req = Self::construct_test_request(&client, config)?; tx.send(Ok(Event::default() @@ -183,7 +186,8 @@ impl Connector for WebhookConnector { .map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?; let table = WebhookTable { endpoint, headers }; - let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?; + let client = + construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| t.0.clone()))?; let _ = Self::construct_test_request(&client, &table)?; self.from_config(None, name, EmptyConfig {}, table, schema) diff --git a/arroyo-console/src/routes/connections/JsonForm.tsx b/arroyo-console/src/routes/connections/JsonForm.tsx index 19ee2ecfc..135afa0fe 100644 --- a/arroyo-console/src/routes/connections/JsonForm.tsx +++ b/arroyo-console/src/routes/connections/JsonForm.tsx @@ -603,6 +603,29 @@ export function FormInner({ /> ); + } else if (property.$ref?.endsWith('VarStr')) { + const valPath = nextPath + '.raw_val'; + return ( + + + + ); } else { console.warn('Unsupported property', property); return <>; diff --git a/arroyo-rpc/src/schema_resolver.rs b/arroyo-rpc/src/schema_resolver.rs index e57c63291..9c530cb7e 100644 --- a/arroyo-rpc/src/schema_resolver.rs +++ b/arroyo-rpc/src/schema_resolver.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, bail}; use apache_avro::Schema; +use arroyo_types::VarStr; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; use serde::de::DeserializeOwned; @@ -103,7 +104,7 @@ pub struct ConfluentSchemaRegistry { topic: String, client: Client, api_key: Option, - api_secret: Option, + api_secret: Option, } impl ConfluentSchemaRegistry { @@ -111,7 +112,7 @@ impl ConfluentSchemaRegistry { endpoint: &str, topic: &str, api_key: Option, - api_secret: Option, + api_secret: Option, ) -> anyhow::Result { let client = Client::builder() .timeout(Duration::from_secs(5)) @@ -226,8 +227,15 @@ impl ConfluentSchemaRegistry { async fn get_schema_for_url(&self, url: Url) -> anyhow::Result { let mut get_call = self.client.get(url.clone()); + let secret: Option = self + .api_secret // Option + .as_ref() + .map(|s| s.sub_env_vars()) // Option> + .transpose() // Result, String> + .map_err(|e| anyhow!(e))?; + if let Some(api_key) = self.api_key.as_ref() { - get_call = get_call.basic_auth(api_key, self.api_secret.as_ref()); + get_call = get_call.basic_auth(api_key, secret); } let resp = get_call.send().await.map_err(|e| { diff --git a/arroyo-types/src/lib.rs b/arroyo-types/src/lib.rs index c83a644a3..a1839f5f7 100644 --- a/arroyo-types/src/lib.rs +++ b/arroyo-types/src/lib.rs @@ -869,26 +869,37 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive { start..=end } -pub fn sub_env_vars(input: &str) -> Result { - // Regex to match patterns like {{ VAR_NAME }} - static RE: OnceLock = OnceLock::new(); - let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VarStr { + raw_val: String, +} + +impl VarStr { + pub fn new(raw_val: String) -> Self { + VarStr { raw_val } + } + + pub fn sub_env_vars(&self) -> Result { + // Regex to match patterns like {{ VAR_NAME }} + static RE: OnceLock = OnceLock::new(); + let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap()); - let mut result = input.to_string(); + let mut result = self.raw_val.to_string(); - for caps in re.captures_iter(input) { - let var_name = caps.get(1).unwrap().as_str(); - let full_match = caps.get(0).unwrap().as_str(); + for caps in re.captures_iter(&self.raw_val) { + let var_name = caps.get(1).unwrap().as_str(); + let full_match = caps.get(0).unwrap().as_str(); - match env::var(var_name) { - Ok(value) => { - result = result.replace(full_match, &value); + match env::var(var_name) { + Ok(value) => { + result = result.replace(full_match, &value); + } + Err(_) => return Err(format!("Environment variable {} not found", var_name)), } - Err(_) => return Err(format!("Environment variable {} not found", var_name)), } - } - Ok(result) + Ok(result) + } } #[cfg(test)] @@ -931,7 +942,10 @@ mod tests { #[test] fn test_no_placeholders() { let input = "This is a test string with no placeholders"; - assert_eq!(sub_env_vars(input).unwrap(), input); + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + input + ); } #[test] @@ -939,7 +953,10 @@ mod tests { env::set_var("TEST_VAR", "environment variable"); let input = "This is a {{ TEST_VAR }}"; let expected = "This is a environment variable"; - assert_eq!(sub_env_vars(input).unwrap(), expected); + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + input + ); } #[test] @@ -948,6 +965,9 @@ mod tests { env::set_var("VAR2", "second"); let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}"; let expected = "Here is the first and here is the second"; - assert_eq!(sub_env_vars(input).unwrap(), expected); + assert_eq!( + VarStr::new(input.to_string()).sub_env_vars().unwrap(), + input + ); } } diff --git a/arroyo-worker/src/connectors/kafka/mod.rs b/arroyo-worker/src/connectors/kafka/mod.rs index 2282bbacb..49851d5cf 100644 --- a/arroyo-worker/src/connectors/kafka/mod.rs +++ b/arroyo-worker/src/connectors/kafka/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use arroyo_types::VarStr; use rdkafka::Offset; use serde::{Deserialize, Serialize}; use typify::import_types; @@ -7,7 +8,10 @@ use typify::import_types; pub mod sink; pub mod source; -import_types!(schema = "../connector-schemas/kafka/connection.json"); +import_types!( + schema = "../connector-schemas/kafka/connection.json", + replace = { VarStr = VarStr } +); import_types!(schema = "../connector-schemas/kafka/table.json"); impl SourceOffset { @@ -33,7 +37,10 @@ pub fn client_configs(connection: &KafkaConfig) -> HashMap { client_configs.insert("sasl.mechanism".to_string(), mechanism.to_string()); client_configs.insert("security.protocol".to_string(), protocol.to_string()); client_configs.insert("sasl.username".to_string(), username.to_string()); - client_configs.insert("sasl.password".to_string(), password.to_string()); + client_configs.insert( + "sasl.password".to_string(), + password.sub_env_vars().unwrap_or_else(|e| panic!("{}", e)), + ); } }; diff --git a/arroyo-worker/src/connectors/polling_http.rs b/arroyo-worker/src/connectors/polling_http.rs index f6ccdf5c9..666e7a8f1 100644 --- a/arroyo-worker/src/connectors/polling_http.rs +++ b/arroyo-worker/src/connectors/polling_http.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration}; use arroyo_macro::source_fn; use arroyo_rpc::ControlMessage; use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig}; -use arroyo_types::{string_to_map, sub_env_vars, Message, Record, UserError, Watermark}; +use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark}; use serde::{Deserialize, Serialize}; use tokio::select; @@ -26,8 +26,10 @@ use crate::{ SourceFinishType, }; -import_types!(schema = "../connector-schemas/polling_http/table.json"); - +import_types!( + schema = "../connector-schemas/polling_http/table.json", + replace = { VarStr = VarStr } +); const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1); const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody @@ -66,19 +68,24 @@ where let table: PollingHttpTable = serde_json::from_value(config.table).expect("Invalid table config for WebhookSink"); - let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or("")) - .expect("Invalid header map") - .into_iter() - .map(|(k, v)| { - ( - (&k).try_into() - .expect(&format!("invalid header name {}", k)), - (sub_env_vars(&v).unwrap()) - .try_into() - .expect(&format!("invalid header value {}", v)), - ) - }) - .collect(); + let headers = string_to_map( + &table + .headers + .as_ref() + .map(|t| t.sub_env_vars().unwrap_or_else(|e| panic!("{}", e))) + .unwrap_or("".to_string()), + ) + .expect("Invalid header map") + .into_iter() + .map(|(k, v)| { + ( + (&k).try_into() + .expect(&format!("invalid header name {}", k)), + (&v).try_into() + .expect(&format!("invalid header value {}", v)), + ) + }) + .collect(); let deserializer = DataDeserializer::new( config diff --git a/connector-schemas/filesystem/table.json b/connector-schemas/filesystem/table.json index a3264f1ed..bedb0797a 100644 --- a/connector-schemas/filesystem/table.json +++ b/connector-schemas/filesystem/table.json @@ -1,6 +1,24 @@ { "type": "object", "title": "FileSystemTable", + "$defs": { + "VarStr": { + "type": "object", + "properties": { + "raw_val": { + "type": "string", + "title": "Raw Value", + "description": "The raw value. May contain [variables](https://doc.arroyo.dev/todo).", + "examples": [ + "{{ MY_ENV_VAR }}" + ] + } + }, + "required": [ + "raw_val" + ] + } + }, "properties": { "tableType": { "type": "object", diff --git a/connector-schemas/kafka/connection.json b/connector-schemas/kafka/connection.json index cbe9c05a7..40a93a74c 100644 --- a/connector-schemas/kafka/connection.json +++ b/connector-schemas/kafka/connection.json @@ -1,6 +1,24 @@ { "type": "object", "title": "KafkaConfig", + "$defs": { + "VarStr": { + "type": "object", + "properties": { + "raw_val": { + "type": "string", + "title": "Raw Value", + "description": "The raw value. May contain [variables](https://doc.arroyo.dev/todo).", + "examples": [ + "{{ MY_ENV_VAR }}" + ] + } + }, + "required": [ + "raw_val" + ] + } + }, "properties": { "bootstrapServers": { "type": "string", @@ -44,9 +62,10 @@ "description": "The username to use for SASL authentication" }, "password": { - "type": "string", + "type": "object", "description": "The password to use for SASL authentication", - "isSensitive": true + "isSensitive": true, + "$ref": "#/$defs/VarStr" } }, "additionalProperties": false @@ -80,12 +99,13 @@ }, "apiSecret": { "title": "API Secret", - "type": "string", + "type": "object", "description": "Secret for your Confluent Schema Registry", "isSensitive": true, "examples": [ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=" - ] + ], + "$ref": "#/$defs/VarStr" } }, "required": [ diff --git a/connector-schemas/polling_http/table.json b/connector-schemas/polling_http/table.json index ee06bb29e..dd9edf0d3 100644 --- a/connector-schemas/polling_http/table.json +++ b/connector-schemas/polling_http/table.json @@ -1,6 +1,24 @@ { "type": "object", "title": "PollingHTTPTable", + "$defs": { + "VarStr": { + "type": "object", + "properties": { + "raw_val": { + "type": "string", + "title": "Raw Value", + "description": "The raw value. May contain [variables](https://doc.arroyo.dev/todo).", + "examples": [ + "{{ MY_ENV_VAR }}" + ] + } + }, + "required": [ + "raw_val" + ] + } + }, "properties": { "endpoint": { "title": "Endpoint", @@ -11,10 +29,10 @@ }, "headers": { "title": "Headers", - "type": "string", + "type": "object", "description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/todo).", - "pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)", - "examples": ["Authentication: digest 1234,Content-Type: application/json"] + "examples": ["Authentication: digest 1234,Content-Type: application/json"], + "$ref": "#/$defs/VarStr" }, "method": { "title": "Method", @@ -26,7 +44,9 @@ "PUT", "PATCH" ], - "examples": ["GET"] + "examples": [ + "GET" + ] }, "body": { "title": "Body", @@ -37,7 +57,9 @@ "title": "Polling Interval (ms)", "type": "integer", "description": "Number of milliseconds to wait between successful polls of the HTTP endpoint", - "examples": ["1000"] + "examples": [ + "1000" + ] }, "emit_behavior": { "title": "Emit Behavior",