Skip to content

Commit

Permalink
Replace VarStr object in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Dec 5, 2023
1 parent d59c9fd commit e77a258
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 69 deletions.
25 changes: 15 additions & 10 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use anyhow::{anyhow, bail};
use arroyo_rpc::OperatorConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use typify::import_types;

use arroyo_rpc::api_types::connections::{ConnectionProfile, ConnectionSchema, TestSourceMessage};
use arroyo_rpc::OperatorConfig;
use arroyo_types::VarStr;
use axum::response::sse::Event;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
ClientConfig, Offset, TopicPartitionList,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info, warn};
use typify::import_types;

use crate::{pull_opt, Connection, ConnectionType};

Expand All @@ -25,7 +25,10 @@ const CONFIG_SCHEMA: &str = include_str!("../../connector-schemas/kafka/connecti
const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/kafka/table.json");
const ICON: &str = include_str!("../resources/kafka.svg");

import_types!(schema = "../connector-schemas/kafka/connection.json",);
import_types!(
schema = "../connector-schemas/kafka/connection.json",
replace = { VarStr = VarStr }
);
import_types!(schema = "../connector-schemas/kafka/table.json");

pub struct KafkaConnector {}
Expand Down Expand Up @@ -150,14 +153,16 @@ impl Connector for KafkaConnector {
mechanism: pull_opt("auth.mechanism", options)?,
protocol: pull_opt("auth.protocol", options)?,
username: pull_opt("auth.username", options)?,
password: pull_opt("auth.password", options)?,
password: VarStr::new(pull_opt("auth.password", options)?),
},
Some(other) => bail!("unknown auth type '{}'", other),
};

let schema_registry = options.remove("schema_registry.endpoint").map(|endpoint| {
let api_key = options.remove("schema_registry.api_key");
let api_secret = options.remove("schema_registry.api_secret");
let api_secret = options
.remove("schema_registry.api_secret")
.map(|secret| VarStr::new(secret));
SchemaRegistry::ConfluentSchemaRegistry {
endpoint,
api_key,
Expand Down Expand Up @@ -251,7 +256,7 @@ impl KafkaTester {
client_config.set("sasl.mechanism", mechanism);
client_config.set("security.protocol", protocol);
client_config.set("sasl.username", username);
client_config.set("sasl.password", password);
client_config.set("sasl.password", password.sub_env_vars()?);
}
};

Expand Down
6 changes: 3 additions & 3 deletions arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType,
};
use arroyo_rpc::primitive_to_sql;
use arroyo_types::{string_to_map, sub_env_vars};
use arroyo_types::string_to_map;
use axum::response::sse::Event;
use blackhole::BlackholeConnector;
use fluvio::FluvioConnector;
Expand Down Expand Up @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF
}
}

fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result<Client> {
fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
};
Expand All @@ -344,7 +344,7 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re
Ok((
TryInto::<HeaderName>::try_into(&k)
.map_err(|_| anyhow!("invalid header name {}", k))?,
TryInto::<HeaderValue>::try_into(sub_env_vars(&v).map_err(|e| anyhow!(e))?)
TryInto::<HeaderValue>::try_into(&v)
.map_err(|_| anyhow!("invalid header value {}", v))?,
))
})
Expand Down
21 changes: 15 additions & 6 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;

use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use reqwest::{Client, Request};
use tokio::sync::mpsc::Sender;
Expand All @@ -20,7 +20,10 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json");

import_types!(schema = "../connector-schemas/polling_http/table.json");
import_types!(
schema = "../connector-schemas/polling_http/table.json",
replace = { VarStr = VarStr }
);
const ICON: &str = include_str!("../resources/http.svg");

pub struct PollingHTTPConnector {}
Expand Down Expand Up @@ -55,8 +58,14 @@ impl PollingHTTPConnector {
config: &PollingHttpTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = config
.headers
.as_ref()
.map(|s| s.sub_env_vars())
.transpose()
.map_err(|e| anyhow!(e))?;

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -166,7 +175,7 @@ impl Connector for PollingHTTPConnector {
EmptyConfig {},
PollingHttpTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr::new(s)),
method,
body,
poll_interval_ms: interval,
Expand All @@ -187,7 +196,7 @@ impl Connector for PollingHTTPConnector {
let description = format!("PollingHTTPSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down
10 changes: 7 additions & 3 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::convert::Infallible;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_types::VarStr;
use axum::response::sse::Event;
use reqwest::{Client, Request};
use serde_json::json;
Expand Down Expand Up @@ -47,8 +48,10 @@ impl WebhookConnector {
config: &WebhookTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let client = construct_http_client(
&config.endpoint,
config.headers.as_ref().map(|t| t.0.clone()),
)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -183,7 +186,8 @@ impl Connector for WebhookConnector {
.map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?;

let table = WebhookTable { endpoint, headers };
let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?;
let client =
construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| t.0.clone()))?;
let _ = Self::construct_test_request(&client, &table)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
Expand Down
23 changes: 23 additions & 0 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,29 @@ export function FormInner({
/>
</Box>
);
} else if (property.$ref?.endsWith('VarStr')) {
const valPath = nextPath + '.raw_val';
return (
<Box>
<StringWidget
path={valPath}
key={key}
title={property.title || key}
description={property.description}
required={schema.required?.includes(key)}
maxLength={property.maxLength}
placeholder={
// @ts-ignore
property.examples ? (property.examples[0] as string) : undefined
}
value={traversePath(values, valPath)}
errors={errors}
onChange={onChange}
// @ts-ignore
password={property.isSensitive || false}
/>
</Box>
);
} else {
console.warn('Unsupported property', property);
return <></>;
Expand Down
14 changes: 11 additions & 3 deletions arroyo-rpc/src/schema_resolver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, bail};
use apache_avro::Schema;
use arroyo_types::VarStr;
use async_trait::async_trait;
use reqwest::{Client, StatusCode, Url};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -103,15 +104,15 @@ pub struct ConfluentSchemaRegistry {
topic: String,
client: Client,
api_key: Option<String>,
api_secret: Option<String>,
api_secret: Option<VarStr>,
}

impl ConfluentSchemaRegistry {
pub fn new(
endpoint: &str,
topic: &str,
api_key: Option<String>,
api_secret: Option<String>,
api_secret: Option<VarStr>,
) -> anyhow::Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(5))
Expand Down Expand Up @@ -226,8 +227,15 @@ impl ConfluentSchemaRegistry {
async fn get_schema_for_url<T: DeserializeOwned>(&self, url: Url) -> anyhow::Result<T> {
let mut get_call = self.client.get(url.clone());

let secret: Option<String> = self
.api_secret // Option<VarStr>
.as_ref()
.map(|s| s.sub_env_vars()) // Option<Result<String, String>>
.transpose() // Result<Option<String>, String>
.map_err(|e| anyhow!(e))?;

if let Some(api_key) = self.api_key.as_ref() {
get_call = get_call.basic_auth(api_key, self.api_secret.as_ref());
get_call = get_call.basic_auth(api_key, secret);
}

let resp = get_call.send().await.map_err(|e| {
Expand Down
54 changes: 37 additions & 17 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,26 +869,37 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64> {
start..=end
}

pub fn sub_env_vars(input: &str) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VarStr {
raw_val: String,
}

impl VarStr {
pub fn new(raw_val: String) -> Self {
VarStr { raw_val }
}

pub fn sub_env_vars(&self) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());

let mut result = input.to_string();
let mut result = self.raw_val.to_string();

for caps in re.captures_iter(input) {
let var_name = caps.get(1).unwrap().as_str();
let full_match = caps.get(0).unwrap().as_str();
for caps in re.captures_iter(&self.raw_val) {
let var_name = caps.get(1).unwrap().as_str();
let full_match = caps.get(0).unwrap().as_str();

match env::var(var_name) {
Ok(value) => {
result = result.replace(full_match, &value);
match env::var(var_name) {
Ok(value) => {
result = result.replace(full_match, &value);
}
Err(_) => return Err(format!("Environment variable {} not found", var_name)),
}
Err(_) => return Err(format!("Environment variable {} not found", var_name)),
}
}

Ok(result)
Ok(result)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -931,15 +942,21 @@ mod tests {
#[test]
fn test_no_placeholders() {
let input = "This is a test string with no placeholders";
assert_eq!(sub_env_vars(input).unwrap(), input);
assert_eq!(
VarStr::new(input.to_string()).sub_env_vars().unwrap(),
input
);
}

#[test]
fn test_with_placeholders() {
env::set_var("TEST_VAR", "environment variable");
let input = "This is a {{ TEST_VAR }}";
let expected = "This is a environment variable";
assert_eq!(sub_env_vars(input).unwrap(), expected);
assert_eq!(
VarStr::new(input.to_string()).sub_env_vars().unwrap(),
input
);
}

#[test]
Expand All @@ -948,6 +965,9 @@ mod tests {
env::set_var("VAR2", "second");
let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}";
let expected = "Here is the first and here is the second";
assert_eq!(sub_env_vars(input).unwrap(), expected);
assert_eq!(
VarStr::new(input.to_string()).sub_env_vars().unwrap(),
input
);
}
}
11 changes: 9 additions & 2 deletions arroyo-worker/src/connectors/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::HashMap;

use arroyo_types::VarStr;
use rdkafka::Offset;
use serde::{Deserialize, Serialize};
use typify::import_types;

pub mod sink;
pub mod source;

import_types!(schema = "../connector-schemas/kafka/connection.json");
import_types!(
schema = "../connector-schemas/kafka/connection.json",
replace = { VarStr = VarStr }
);
import_types!(schema = "../connector-schemas/kafka/table.json");

impl SourceOffset {
Expand All @@ -33,7 +37,10 @@ pub fn client_configs(connection: &KafkaConfig) -> HashMap<String, String> {
client_configs.insert("sasl.mechanism".to_string(), mechanism.to_string());
client_configs.insert("security.protocol".to_string(), protocol.to_string());
client_configs.insert("sasl.username".to_string(), username.to_string());
client_configs.insert("sasl.password".to_string(), password.to_string());
client_configs.insert(
"sasl.password".to_string(),
password.sub_env_vars().unwrap_or_else(|e| panic!("{}", e)),
);
}
};

Expand Down
Loading

0 comments on commit e77a258

Please sign in to comment.